Skip to content

retry_handler

Database retry handler with exponential backoff and transient error detection.

This module provides sophisticated retry mechanisms for database operations with intelligent error classification and adaptive delay strategies.

RetryMetadata

Bases: TypedDict

Type definition for retry operation metadata.

Defines the structure of metadata that can be stored in RetryOperationContext for tracking and debugging.

RetryPolicy dataclass

RetryPolicy(
    max_retries=3,
    base_delay_seconds=1.0,
    max_delay_seconds=60.0,
    exponential_base=2.0,
    jitter_range=0.1,
    operation_timeout_seconds=300.0,
)

Configuration for retry behavior with exponential backoff.

Defines retry parameters including maximum attempts, delay settings, and jitter for avoiding thundering herd problems.

RetryOperationContext dataclass

RetryOperationContext(
    operation_id,
    policy,
    start_time=(lambda: now(UTC))(),
    attempt_count=0,
    last_error=None,
    metadata=_create_empty_metadata(),
)

Context information for retry operations.

Tracks operation progress, attempt history, and metadata for comprehensive retry operation management.

total_elapsed_seconds property

total_elapsed_seconds

Calculate the total elapsed time since operation start.

has_exceeded_timeout property

has_exceeded_timeout

Check if the operation has exceeded total timeout.

DatabaseRetryHandler

DatabaseRetryHandler(logger, default_policy=None)

Advanced retry handler for database operations with intelligent error detection.

Provides exponential backoff with jitter, transient error classification, and comprehensive retry context management for reliable database operations.

Initialize retry handler with logging and default policy.

Parameters:

Name Type Description Default
logger Logger

Logger instance for retry operation tracking

required
default_policy RetryPolicy | None

Default retry policy for operations

None
Source code in src/core/retry_handler.py
def __init__(
    self,
    logger: logging.Logger,
    default_policy: RetryPolicy | None = None,
) -> None:
    """Initialize retry handler with logging and default policy.

    Args:
        logger: Logger instance for retry operation tracking
        default_policy: Default retry policy for operations

    """
    self.logger: logging.Logger = logger
    self.database_policy: RetryPolicy = default_policy or RetryPolicy(
        max_retries=5,
        max_delay_seconds=30.0,
        jitter_range=0.2,
    )

    # Error classification patterns
    self._transient_error_patterns: set[str] = {
        "connection refused",
        "connection reset",
        "timeout",
        "temporary failure",
        "resource temporarily unavailable",
        "too many connections",
        "deadlock",
        "lock wait timeout",
    }

is_transient_error

is_transient_error(error)

Determine if an error is transient and worth retrying.

Analyzes error type, message content, and errno codes to classify errors as transient (temporary) or permanent.

Parameters:

Name Type Description Default
error Exception

Exception to analyze

required

Returns:

Type Description
bool

True if error appears transient and operation should be retried

Source code in src/core/retry_handler.py
def is_transient_error(self, error: Exception) -> bool:
    """Determine if an error is transient and worth retrying.

    Analyzes error type, message content, and errno codes
    to classify errors as transient (temporary) or permanent.

    Args:
        error: Exception to analyze

    Returns:
        True if error appears transient and operation should be retried

    """
    error_message: str = str(error).lower()

    # Connection-related errors are typically transient
    if isinstance(error, ConnectionError | TimeoutError | OSError):
        # Check for specific OSError errno codes that indicate transient issues
        if hasattr(error, "errno"):
            # Common transient errno codes
            transient_errnos: set[int] = {
                111,  # Connection refused
                110,  # Connection timed out
                104,  # Connection reset by peer
                32,  # Broken pipe
                61,  # Connection refused (macOS)
            }
            if error.errno in transient_errnos:
                return True
        return True

    # Check error message for transient patterns
    for pattern in self._transient_error_patterns:
        if pattern in error_message:
            return True

    # Specific database-related transient errors
    database_error_patterns: list[str] = [
        "database is locked",
        "sqlite3.OperationalError",
        "cursor closed",
        "connection closed",
    ]

    return any(db_error.lower() in error_message for db_error in database_error_patterns)

calculate_delay_seconds staticmethod

calculate_delay_seconds(attempt_number, policy)

Calculate delay for retry attempt with exponential backoff and jitter.

Implements exponential backoff with configurable jitter to prevent thundering herd problems and distribute retry attempts over time.

Parameters:

Name Type Description Default
attempt_number int

Current attempt number (0-based)

required
policy RetryPolicy

Retry policy configuration

required

Returns:

Type Description
float

Delay in seconds before next retry attempt

Source code in src/core/retry_handler.py
@staticmethod
def calculate_delay_seconds(
    attempt_number: int,
    policy: RetryPolicy,
) -> float:
    """Calculate delay for retry attempt with exponential backoff and jitter.

    Implements exponential backoff with configurable jitter to prevent
    thundering herd problems and distribute retry attempts over time.

    Args:
        attempt_number: Current attempt number (0-based)
        policy: Retry policy configuration

    Returns:
        Delay in seconds before next retry attempt

    """
    # Calculate exponential delay
    exponential_delay: float = policy.base_delay_seconds * (policy.exponential_base**attempt_number)

    # Apply maximum delay cap
    capped_delay: float = min(exponential_delay, policy.max_delay_seconds)

    # Add deterministic jitter to prevent thundering herd
    # Using hash-based jitter for reproducible but distributed delays
    jitter_amount: float = capped_delay * policy.jitter_range

    # Create deterministic jitter based on attempt number
    # This provides good distribution without cryptographic randomness
    jitter_seed: float = (attempt_number * 31 + 17) % 100 / 100.0  # 0.0-1.0
    jitter_offset: float = (jitter_seed - 0.5) * 2 * jitter_amount  # -jitter_amount to +jitter_amount

    final_delay: float = max(0.0, capped_delay + jitter_offset)

    return final_delay

async_retry_operation async

async_retry_operation(operation_id, policy=None)

Async context manager for retry operations with comprehensive tracking.

Provides retry context with operation tracking, error handling, and automatic retry logic for database operations.

Parameters:

Name Type Description Default
operation_id str

Unique identifier for the operation

required
policy RetryPolicy | None

Retry policy to use (defaults to database_policy)

None

Yields:

Name Type Description
RetryOperationContext AsyncGenerator[RetryOperationContext]

Context for tracking retry progress

Example

async with retry_handler.async_retry_operation("db_read") as ctx: ctx.metadata["table"] = "tracks" result = await database_operation()

Source code in src/core/retry_handler.py
@asynccontextmanager
async def async_retry_operation(
    self,
    operation_id: str,
    policy: RetryPolicy | None = None,
) -> AsyncGenerator[RetryOperationContext]:
    """Async context manager for retry operations with comprehensive tracking.

    Provides retry context with operation tracking, error handling,
    and automatic retry logic for database operations.

    Args:
        operation_id: Unique identifier for the operation
        policy: Retry policy to use (defaults to database_policy)

    Yields:
        RetryOperationContext: Context for tracking retry progress

    Example:
        async with retry_handler.async_retry_operation("db_read") as ctx:
            ctx.metadata["table"] = "tracks"
            result = await database_operation()

    """
    retry_policy: RetryPolicy = policy or self.database_policy
    context: RetryOperationContext = RetryOperationContext(
        operation_id=operation_id,
        policy=retry_policy,
    )

    self.logger.debug(
        "Starting retry operation '%s' with policy: max_retries=%d, base_delay=%.2fs",
        operation_id,
        retry_policy.max_retries,
        retry_policy.base_delay_seconds,
    )

    for attempt in range(retry_policy.max_retries + 1):
        context.attempt_count = attempt + 1

        try:
            # Check for total operation timeout
            if context.has_exceeded_timeout:
                self._raise_timeout_error(operation_id, context, retry_policy)

            # Yield context for operation execution
            yield context

        except (ValueError, RuntimeError, OSError) as error:
            context.last_error = error

            # Check if this is the last attempt
            if attempt >= retry_policy.max_retries:
                self.logger.exception(
                    "Operation '%s' failed permanently after %d attempts (%.2fs elapsed)",
                    operation_id,
                    attempt + 1,
                    context.total_elapsed_seconds,
                )
                raise

            # Check if error is worth retrying
            if not self.is_transient_error(error):
                self.logger.warning(
                    "Operation '%s' failed with non-transient error: %s",
                    operation_id,
                    error,
                )
                raise

            # Calculate delay for next attempt
            delay_seconds: float = DatabaseRetryHandler.calculate_delay_seconds(attempt, retry_policy)

            self.logger.warning(
                "Operation '%s' failed on attempt %d/%d: %s. Retrying in %.2fs...",
                operation_id,
                attempt + 1,
                retry_policy.max_retries + 1,
                error,
                delay_seconds,
            )

            # Wait before retry
            await asyncio.sleep(delay_seconds)

        else:
            # If we reach here, operation succeeded
            self.logger.debug(
                "Operation '%s' succeeded on attempt %d/%d (%.2fs elapsed)",
                operation_id,
                attempt + 1,
                retry_policy.max_retries + 1,
                context.total_elapsed_seconds,
            )
            return

execute_with_retry async

execute_with_retry(operation, operation_id, policy=None)

Execute operation with retry logic.

Implements retry loop directly for reliable async operation retry.

Parameters:

Name Type Description Default
operation Callable[[], Awaitable[RetryResult]]

Async callable to execute with retry

required
operation_id str

Unique identifier for the operation

required
policy RetryPolicy | None

Retry policy to use

None

Returns:

Type Description
RetryResult

Result from successful operation execution

Raises:

Type Description
(OSError, ValueError, RuntimeError)

If all retries exhausted

Source code in src/core/retry_handler.py
async def execute_with_retry(
    self,
    operation: Callable[[], Awaitable[RetryResult]],
    operation_id: str,
    policy: RetryPolicy | None = None,
) -> RetryResult:
    """Execute operation with retry logic.

    Implements retry loop directly for reliable async operation retry.

    Args:
        operation: Async callable to execute with retry
        operation_id: Unique identifier for the operation
        policy: Retry policy to use

    Returns:
        Result from successful operation execution

    Raises:
        OSError, ValueError, RuntimeError: If all retries exhausted

    """
    retry_policy: RetryPolicy = policy or self.database_policy
    context: RetryOperationContext = RetryOperationContext(
        operation_id=operation_id,
        policy=retry_policy,
    )

    self.logger.debug(
        "Starting retry operation '%s' with policy: max_retries=%d, base_delay=%.2fs",
        operation_id,
        retry_policy.max_retries,
        retry_policy.base_delay_seconds,
    )

    last_error: Exception | None = None

    for attempt in range(retry_policy.max_retries + 1):
        context.attempt_count = attempt + 1

        # Check for total operation timeout
        if context.has_exceeded_timeout:
            self._raise_timeout_error(operation_id, context, retry_policy)

        try:
            result = await operation()
            self.logger.debug(
                "Operation '%s' succeeded on attempt %d/%d (%.2fs elapsed)",
                operation_id,
                attempt + 1,
                retry_policy.max_retries + 1,
                context.total_elapsed_seconds,
            )
            return result

        except (ValueError, RuntimeError, OSError) as error:
            last_error = error
            context.last_error = error

            # Check if this is the last attempt
            if attempt >= retry_policy.max_retries:
                self.logger.exception(
                    "Operation '%s' failed permanently after %d attempts (%.2fs elapsed)",
                    operation_id,
                    attempt + 1,
                    context.total_elapsed_seconds,
                )
                raise

            # Check if error is worth retrying
            if not self.is_transient_error(error):
                self.logger.warning(
                    "Operation '%s' failed with non-transient error: %s",
                    operation_id,
                    error,
                )
                raise

            # Calculate delay for next attempt
            delay_seconds: float = DatabaseRetryHandler.calculate_delay_seconds(attempt, retry_policy)

            self.logger.warning(
                "Operation '%s' failed on attempt %d/%d: %s. Retrying in %.2fs...",
                operation_id,
                attempt + 1,
                retry_policy.max_retries + 1,
                error,
                delay_seconds,
            )

            # Wait before retry
            await asyncio.sleep(delay_seconds)

    # Should not reach here, but safety fallback
    if last_error:
        raise last_error
    msg = f"Operation '{operation_id}' failed without error (unexpected state)"
    raise RuntimeError(msg)