Skip to content

request_executor

API Request Executor module.

Handles HTTP request execution with retry logic, rate limiting, caching, and response processing for external API calls.

ApiRequestExecutor

ApiRequestExecutor(
    *,
    cache_service,
    rate_limiters,
    console_logger,
    error_logger,
    user_agent,
    discogs_token,
    cache_ttl_days,
    default_max_retries,
    default_retry_delay
)

Executes HTTP requests with retry logic, rate limiting, and caching.

Handles all low-level HTTP communication including: - Request preparation (headers, timeouts) - Rate limiting coordination - Retry with exponential backoff - Response parsing and validation - Cache integration

Important

Session lifecycle is managed by ExternalApiOrchestrator, NOT here. This executor only holds a session reference set via set_session(). It may clear this reference on errors (e.g., event loop closed), but it will NEVER close the session. Closing is the owner's responsibility.

Initialize the API request executor.

Parameters:

Name Type Description Default
cache_service CacheServiceProtocol

Cache service for storing/retrieving API responses

required
rate_limiters dict[str, ApiRateLimiter]

Dict mapping API names to rate limiters

required
console_logger Logger

Logger for info/debug messages

required
error_logger Logger

Logger for errors/warnings

required
user_agent str

User-Agent header for requests

required
discogs_token str | None

Discogs API authentication token

required
cache_ttl_days int

How long to cache API responses (days)

required
default_max_retries int

Default retry count for failed requests

required
default_retry_delay float

Base delay between retries (seconds)

required
Source code in src/services/api/request_executor.py
def __init__(
    self,
    *,
    cache_service: CacheServiceProtocol,
    rate_limiters: dict[str, ApiRateLimiter],
    console_logger: logging.Logger,
    error_logger: logging.Logger,
    user_agent: str,
    discogs_token: str | None,
    cache_ttl_days: int,
    default_max_retries: int,
    default_retry_delay: float,
) -> None:
    """Initialize the API request executor.

    Args:
        cache_service: Cache service for storing/retrieving API responses
        rate_limiters: Dict mapping API names to rate limiters
        console_logger: Logger for info/debug messages
        error_logger: Logger for errors/warnings
        user_agent: User-Agent header for requests
        discogs_token: Discogs API authentication token
        cache_ttl_days: How long to cache API responses (days)
        default_max_retries: Default retry count for failed requests
        default_retry_delay: Base delay between retries (seconds)
    """
    self.cache_service = cache_service
    self.rate_limiters = rate_limiters
    self.console_logger = console_logger
    self.error_logger = error_logger
    self.user_agent = user_agent
    self.discogs_token = discogs_token
    self.cache_ttl_days = cache_ttl_days
    self.default_max_retries = default_max_retries
    self.default_retry_delay = default_retry_delay

    # Session managed externally, set via set_session()
    self.session: aiohttp.ClientSession | None = None

    # Metrics - initialize with known API keys for backward compatibility
    self.request_counts: dict[str, int] = {
        "discogs": 0,
        "musicbrainz": 0,
        "itunes": 0,
    }
    self.api_call_durations: dict[str, list[float]] = {
        "discogs": [],
        "musicbrainz": [],
        "itunes": [],
    }

set_session

set_session(session)

Set the aiohttp session for making requests.

Source code in src/services/api/request_executor.py
def set_session(self, session: aiohttp.ClientSession | None) -> None:
    """Set the aiohttp session for making requests."""
    self.session = session

execute_request async

execute_request(
    api_name,
    url,
    params=None,
    headers_override=None,
    max_retries=None,
    base_delay=None,
    timeout_override=None,
)

Execute an API request with rate limiting, caching, and retry logic.

Parameters:

Name Type Description Default
api_name str

Name of the API (e.g., 'discogs', 'musicbrainz')

required
url str

Request URL

required
params dict[str, str] | None

Query parameters

None
headers_override dict[str, str] | None

Additional headers to merge

None
max_retries int | None

Override default retry count

None
base_delay float | None

Override default retry delay

None
timeout_override float | None

Override default timeout

None

Returns:

Type Description
dict[str, Any] | None

Parsed JSON response dict, or None if request failed

Source code in src/services/api/request_executor.py
async def execute_request(
    self,
    api_name: str,
    url: str,
    params: dict[str, str] | None = None,
    headers_override: dict[str, str] | None = None,
    max_retries: int | None = None,
    base_delay: float | None = None,
    timeout_override: float | None = None,
) -> dict[str, Any] | None:
    """Execute an API request with rate limiting, caching, and retry logic.

    Args:
        api_name: Name of the API (e.g., 'discogs', 'musicbrainz')
        url: Request URL
        params: Query parameters
        headers_override: Additional headers to merge
        max_retries: Override default retry count
        base_delay: Override default retry delay
        timeout_override: Override default timeout

    Returns:
        Parsed JSON response dict, or None if request failed
    """
    # Debug logging for iTunes requests
    if api_name == "itunes":
        self.console_logger.debug(
            "[%s] Making API request to %s with params: %s",
            api_name,
            url,
            params,
        )

    # Build cache key and check cache first
    cache_key = self._build_cache_key(api_name, url, params)
    cached_result = await self._check_cache(cache_key, api_name, url)
    if cached_result is not None:
        if api_name == "itunes":
            self.console_logger.debug("[%s] Using cached result", api_name)
        return cached_result

    # Prepare request components
    prepared = self._prepare_request(api_name, url, headers_override, timeout_override)
    if prepared is None:
        return None

    request_headers, limiter, request_timeout = prepared

    # Execute with retry
    retry_attempts = max_retries if isinstance(max_retries, int) and max_retries > 0 else self.default_max_retries
    retry_delay = base_delay if isinstance(base_delay, (int, float)) and base_delay >= 0 else self.default_retry_delay

    result = await self._execute_with_retry(
        api_name,
        url,
        params,
        request_headers=request_headers,
        request_timeout=request_timeout,
        limiter=limiter,
        max_retries=retry_attempts,
        base_delay=retry_delay,
    )

    # Debug logging for iTunes results
    if api_name == "itunes":
        self.console_logger.debug(
            "[%s] Request execution result: %s",
            api_name,
            "Success" if result is not None else "Failed/None",
        )

    # Cache the result
    await self._cache_result(cache_key, result)
    return result