Skip to content

orchestrator

External API Service Orchestrator.

This module provides the main coordination layer for fetching album release years from multiple API providers (MusicBrainz, Discogs). It replaces the legacy external API service with a modular architecture that maintains backward compatibility while providing better separation of concerns.

The orchestrator handles: - HTTP session management and connection pooling - Rate limiting coordination across all API providers - Request caching and response aggregation - Dependency injection for cache and verification services - Authentication token management with encryption support - Release year determination using the sophisticated scoring algorithm

ExternalApiOrchestrator

ExternalApiOrchestrator(
    config,
    console_logger,
    error_logger,
    analytics,
    cache_service,
    pending_verification_service,
)

External API service orchestrator.

Coordinates API calls across multiple providers (MusicBrainz, Discogs) to determine the original release year for music albums. Provides rate limiting, caching, authentication, and sophisticated scoring to identify the most likely original release.

This class implements a modular architecture for external API services, providing unified access to MusicBrainz and Discogs APIs.

Attributes:

Name Type Description
config

Configuration dictionary

console_logger

Logger for general output

error_logger

Logger for errors and warnings

cache_service

Service for caching API responses

pending_verification_service

Service for managing verification queue

session ClientSession | None

HTTP session for API requests

rate_limiters ClientSession | None

Rate limiters for each API provider

scoring_config ClientSession | None

Configuration for release scoring algorithm

release_scorer ClientSession | None

Scorer for evaluating release candidates

Initialize the API orchestrator with configuration, loggers, and dependencies.

Source code in src/services/api/orchestrator.py
def __init__(
    self,
    config: AppConfig,
    console_logger: logging.Logger,
    error_logger: logging.Logger,
    analytics: Analytics,
    cache_service: CacheOrchestrator,
    pending_verification_service: PendingVerificationService,
) -> None:
    """Initialize the API orchestrator with configuration, loggers, and dependencies."""
    self.config = config
    self.console_logger = console_logger
    self.error_logger = error_logger
    self.analytics = analytics
    self.session: aiohttp.ClientSession | None = None

    # Store injected dependencies
    self.cache_service = cache_service
    self.pending_verification_service = pending_verification_service

    # Initialize pending tasks for fire-and-forget async operations
    self._pending_tasks: set[asyncio.Task[Any]] = set()

    # Initialize artist period context for release scoring
    self.artist_period_context: ArtistPeriodContext | None = None

    # Initialize API client references (will be set in _initialize_api_clients)
    self.discogs_client: DiscogsClient
    self.musicbrainz_client: MusicBrainzClient
    self.applemusic_client: AppleMusicClient

    # Initialize SecureConfig for encrypted token storage
    self.secure_config: SecureConfig | None = None
    try:
        self.secure_config = SecureConfig(logger=self.error_logger)
        self.console_logger.debug("%s initialized for encrypted token storage", LogFormat.entity("SecureConfig"))
    except SecurityConfigError as e:
        self.error_logger.warning("Failed to initialize SecureConfig: %s", e)
        self.secure_config = None

    # Extract and validate configuration
    self._extract_configuration()

    # Initialize rate limiters
    self._initialize_rate_limiters()

    # Initialize API request executor (handles HTTP requests with retry/caching)
    self.request_executor = ApiRequestExecutor(
        cache_service=cache_service,
        rate_limiters=self.rate_limiters,
        console_logger=console_logger,
        error_logger=error_logger,
        user_agent=self.user_agent,
        discogs_token=self.discogs_token,
        cache_ttl_days=self.cache_ttl_days,
        default_max_retries=self.default_api_max_retries,
        default_retry_delay=self.default_api_retry_delay,
    )

    # Initialize the scoring system first (needed for API client injection)
    self._initialize_scoring_system()

    # Statistics tracking - delegate to request_executor
    self.request_counts = self.request_executor.request_counts
    self.api_call_durations = self.request_executor.api_call_durations

    # Initialize state flag
    self._initialized = False

initialize async

initialize(force=False)

Initialize the aiohttp ClientSession and API clients.

Parameters:

Name Type Description Default
force bool

If True, close existing session and reinitialize.

False

Raises:

Type Description
Exception

Re-raises any exception from initialization after cleanup.

Source code in src/services/api/orchestrator.py
async def initialize(self, force: bool = False) -> None:
    """Initialize the aiohttp ClientSession and API clients.

    Args:
        force: If True, close existing session and reinitialize.

    Raises:
        Exception: Re-raises any exception from initialization after cleanup.
    """
    if force and self.session and not self.session.closed:
        await self.session.close()
        self.session = None

    if self.session is None:
        self.session = self._create_client_session()
        try:
            self.request_executor.set_session(self.session)
            self._initialize_api_clients()
            self._initialize_year_search_coordinator()
        except (TypeError, ValueError, AttributeError, RuntimeError):
            # Clean up session on initialization failure to prevent resource leak
            if self.session and not self.session.closed:
                await self.session.close()
            self.session = None
            # Clear request executor's session reference to prevent stale session usage
            self.request_executor.set_session(None)
            raise

        forced_text = " (forced)" if force else ""
        self.console_logger.info(
            "External API session initialized with User-Agent: %s%s",
            self.user_agent,
            forced_text,
        )
        self.console_logger.debug("API clients initialized")

        # Mark as initialized
        self._initialized = True

close async

close()

Close the orchestrator and clean up resources gracefully.

This method: 1. Waits for pending fire-and-forget tasks to complete (PENDING_TASKS_SHUTDOWN_TIMEOUT) 2. Cancels any tasks that don't complete in time 3. Clears the _pending_tasks set 4. Logs API statistics 5. Closes the HTTP session

Source code in src/services/api/orchestrator.py
async def close(self) -> None:
    """Close the orchestrator and clean up resources gracefully.

    This method:
    1. Waits for pending fire-and-forget tasks to complete (PENDING_TASKS_SHUTDOWN_TIMEOUT)
    2. Cancels any tasks that don't complete in time
    3. Clears the _pending_tasks set
    4. Logs API statistics
    5. Closes the HTTP session
    """
    # Wait for pending tasks with timeout
    if self._pending_tasks:
        self.console_logger.debug(
            "Waiting for %d pending tasks to complete...",
            len(self._pending_tasks),
        )
        done, pending = await asyncio.wait(
            self._pending_tasks,
            timeout=PENDING_TASKS_SHUTDOWN_TIMEOUT,
            return_when=asyncio.ALL_COMPLETED,
        )
        # Mark done set as intentionally unused (we only need pending for cancellation)
        _ = done

        # Cancel any tasks that didn't complete in time
        for task in pending:
            task.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                await task

        self._pending_tasks.clear()

    if self.session is None or self.session.closed:
        return

    # Log API statistics before closing
    self.console_logger.info("--- API Call Statistics ---")
    total_api_calls = 0
    total_api_time = 0.0
    for api_name, limiter in self.rate_limiters.items():
        stats = limiter.get_stats()
        durations = self.api_call_durations.get(api_name, [])
        avg_duration = sum(durations) / max(1, len(durations)) if durations else 0.0
        total_api_calls += stats["total_requests"]
        total_api_time += sum(durations)
        self.console_logger.info(
            "API: %-12s | Requests: %-5d | Avg Wait: %.3fs | Avg Duration: %.3fs",
            api_name.title(),
            stats["total_requests"],
            stats["avg_wait_time"],
            avg_duration,
        )

    if total_api_calls > 0:
        avg_total_duration = total_api_time / total_api_calls
        self.console_logger.info(
            "Total API Calls: %d, Average Call Duration: %.3fs",
            total_api_calls,
            avg_total_duration,
        )
    else:
        self.console_logger.info("No API calls were made during this session.")
    self.console_logger.info("---------------------------")

    await self.session.close()
    self.console_logger.info("%s session closed", LogFormat.entity("ExternalApiOrchestrator"))

get_album_year async

get_album_year(
    artist,
    album,
    current_library_year=None,
    earliest_track_added_year=None,
)

Determine the original release year for an album using optimized API calls and revised scoring.

Returns:

Name Type Description
str | None

Tuple of (year, is_definitive, confidence_score, year_scores)

year_scores bool

dict mapping each year found by APIs to its max score

Source code in src/services/api/orchestrator.py
async def get_album_year(
    self,
    artist: str,
    album: str,
    current_library_year: str | None = None,
    earliest_track_added_year: int | None = None,
) -> tuple[str | None, bool, int, dict[str, int]]:
    """Determine the original release year for an album using optimized API calls and revised scoring.

    Returns:
        Tuple of (year, is_definitive, confidence_score, year_scores)
        year_scores: dict mapping each year found by APIs to its max score
    """
    # Initialize and prepare inputs
    try:
        inputs = await self._initialize_year_search(artist, album, current_library_year)
        if not inputs:
            return None, False, 0, {}
    except (OSError, ValueError, KeyError, TypeError, AttributeError) as e:
        if debug.year:
            self.error_logger.exception("Error in get_album_year initialization: %s", e)
        return None, False, 0, {}

    artist_norm, album_norm, log_artist, log_album, artist_region = inputs

    # Main processing
    try:
        # Fetch and process API results
        all_releases = await self._fetch_all_api_results(artist_norm, album_norm, artist_region, log_artist, log_album)

        if not all_releases:
            return await self._handle_no_results(artist, album, log_artist, log_album, current_library_year, earliest_track_added_year)

        return await self._process_api_results(
            all_releases, artist, album, log_artist, log_album, current_library_year, earliest_track_added_year
        )

    except (aiohttp.ClientError, TimeoutError, ValueError, KeyError, RuntimeError):
        return self._handle_year_search_error(log_artist, log_album, current_library_year, earliest_track_added_year)
    finally:
        self.release_scorer.clear_artist_period_context()

get_artist_activity_period async

get_artist_activity_period(artist_norm)

Retrieve the period of activity for an artist from MusicBrainz.

This method delegates to the MusicBrainz client.

Parameters:

Name Type Description Default
artist_norm str

Normalized artist name

required

Returns:

Type Description
tuple[int | None, int | None]

Tuple of (start_year, end_year) as integers or (None, None) if not found

Source code in src/services/api/orchestrator.py
async def get_artist_activity_period(
    self,
    artist_norm: str,
) -> tuple[int | None, int | None]:
    """Retrieve the period of activity for an artist from MusicBrainz.

    This method delegates to the MusicBrainz client.

    Args:
        artist_norm: Normalized artist name

    Returns:
        Tuple of (start_year, end_year) as integers or (None, None) if not found

    """
    # Delegate to MusicBrainz client
    activity_period = await self.musicbrainz_client.get_artist_activity_period(artist_norm)

    # Convert string years to integers to match protocol
    def safe_int(val: str | None) -> int | None:
        """Convert string to int, returning None for invalid or empty values."""
        if not val:
            return None
        try:
            return int(val)
        except ValueError:
            return None

    return safe_int(activity_period[0]), safe_int(activity_period[1])

get_artist_start_year async

get_artist_start_year(artist_norm)

Get artist's career start year with caching and fallback.

Uses MusicBrainz as primary source, iTunes as fallback. Results are cached in GenericCacheService.

Parameters:

Name Type Description Default
artist_norm str

Normalized artist name

required

Returns:

Type Description
int | None

Artist's career start year, or None if not found

Cache TTL
  • Positive result: 1 year (31,536,000 seconds)
  • Negative result: 1 day (86,400 seconds)
Source code in src/services/api/orchestrator.py
async def get_artist_start_year(self, artist_norm: str) -> int | None:
    """Get artist's career start year with caching and fallback.

    Uses MusicBrainz as primary source, iTunes as fallback.
    Results are cached in GenericCacheService.

    Args:
        artist_norm: Normalized artist name

    Returns:
        Artist's career start year, or None if not found

    Cache TTL:
        - Positive result: 1 year (31,536,000 seconds)
        - Negative result: 1 day (86,400 seconds)

    """
    cache_key = f"artist_start_year:{artist_norm}"

    # 1. Check cache first
    cached = self.cache_service.generic_service.get(cache_key)
    if cached is not None:
        # -1 is sentinel for "not found" (None can't be cached directly)
        if cached == -1:
            self.console_logger.debug(
                "[orchestrator] Artist start year cache hit (negative): %s",
                artist_norm,
            )
            return None
        # Ensure cached value is convertible to int
        if not isinstance(cached, (int, str)):
            self.console_logger.warning(
                "[orchestrator] Invalid cached artist start year type for '%s': %s",
                artist_norm,
                type(cached).__name__,
            )
            return None
        cached_year = int(cached)
        self.console_logger.debug(
            "[orchestrator] Artist start year cache hit: %s%d",
            artist_norm,
            cached_year,
        )
        return cached_year

    # 2. Try MusicBrainz (primary source)
    begin_year, _ = await self.get_artist_activity_period(artist_norm)
    if begin_year:
        self.cache_service.generic_service.set(cache_key, begin_year, ttl=31536000)
        self.console_logger.debug(
            "[orchestrator] Artist start year from MusicBrainz: %s%d",
            artist_norm,
            begin_year,
        )
        return begin_year

    # 3. Fallback to iTunes
    itunes_year = await self.applemusic_client.get_artist_start_year(artist_norm)
    if itunes_year:
        self.cache_service.generic_service.set(cache_key, itunes_year, ttl=31536000)
        self.console_logger.debug(
            "[orchestrator] Artist start year from iTunes (fallback): %s%d",
            artist_norm,
            itunes_year,
        )
        return itunes_year

    # 4. Cache negative result with shorter TTL
    self.cache_service.generic_service.set(cache_key, -1, ttl=86400)
    self.console_logger.debug(
        "[orchestrator] Artist start year not found, caching negative: %s",
        artist_norm,
    )
    return None

get_year_from_discogs async

get_year_from_discogs(artist, album)

Fetch the earliest release year for an album from Discogs.

This method delegates to the Discogs client.

Parameters:

Name Type Description Default
artist str

Artist name

required
album str

Album name

required

Returns:

Type Description
str | None

Year string or None if not found

Source code in src/services/api/orchestrator.py
async def get_year_from_discogs(
    self,
    artist: str,
    album: str,
) -> str | None:
    """Fetch the earliest release year for an album from Discogs.

    This method delegates to the Discogs client.

    Args:
        artist: Artist name
        album: Album name

    Returns:
        Year string or None if not found

    """
    # Normalize inputs
    artist_norm = normalize_name(artist)
    album_norm = normalize_name(album)

    # Delegate to the Discogs client
    result: str | None = await self.discogs_client.get_year_from_discogs(artist_norm, album_norm)
    return result

normalize_name

normalize_name(name)

Normalize artist/album name for API queries.

Performs substitutions that improve API matching: - & → and (Karma & Effect → Karma and Effect) - w/ → with (Split w/ Band → Split with Band) - Strips trailing compilation markers (Album + 4 → Album) - Normalizes whitespace

Note: This is for API QUERIES, not for scoring/matching. Scoring uses ReleaseScorer._normalize_name which is more aggressive.

Source code in src/services/api/orchestrator.py
def normalize_name(name: str) -> str:
    """Normalize artist/album name for API queries.

    Performs substitutions that improve API matching:
    - & → and (Karma & Effect → Karma and Effect)
    - w/ → with (Split w/ Band → Split with Band)
    - Strips trailing compilation markers (Album + 4 → Album)
    - Normalizes whitespace

    Note: This is for API QUERIES, not for scoring/matching.
    Scoring uses ReleaseScorer._normalize_name which is more aggressive.
    """
    if not name:
        return name

    result = name

    # Common substitutions for better API matching
    substitutions = {
        " & ": " and ",
        "&": " and ",  # Handle no-space cases like "Fire&Water"
        " w/ ": " with ",
        " w/": " with ",
        " = ": " ",  # Liberation = Termination → Liberation Termination
        ":": " ",  # Issue #103: Colons break Lucene search (III:Trauma → III Trauma)
    }

    for old, new in substitutions.items():
        result = result.replace(old, new)

    # Strip trailing compilation markers: "+ 4", "+ 10" (number = # bonus tracks)
    # Pattern: " + " followed by digit(s) at end of string
    # More conservative than ".*" to preserve legitimate titles like "Album + Bonus Tracks"
    result = re.sub(r"\s*\+\s+\d+.*$", "", result)

    # Strip content after " / " (split albums - keep first part only)
    # "Robot Hive / Exodus" → "Robot Hive"
    # "House By the Cemetery / Mortal Massacre" → "House By the Cemetery"
    if " / " in result:
        result = result.split(" / ", maxsplit=1)[0].strip()

    # Normalize whitespace (multiple spaces to single)
    return re.sub(r"\s+", " ", result).strip()

create_external_api_orchestrator

create_external_api_orchestrator(
    config,
    console_logger,
    error_logger,
    analytics,
    cache_service,
    pending_verification_service,
)

Create the configured ExternalApiOrchestrator instance.

Parameters:

Name Type Description Default
config AppConfig

Typed application configuration

required
console_logger Logger

Logger for general output

required
error_logger Logger

Logger for error messages and warnings

required
analytics Analytics

Analytics service for performance tracking

required
cache_service CacheOrchestrator

Service for caching API responses

required
pending_verification_service PendingVerificationService

Service for managing verification queue

required

Returns:

Type Description
ExternalApiOrchestrator

The configured ExternalApiOrchestrator instance

Source code in src/services/api/orchestrator.py
def create_external_api_orchestrator(
    config: AppConfig,
    console_logger: logging.Logger,
    error_logger: logging.Logger,
    analytics: Analytics,
    cache_service: CacheOrchestrator,
    pending_verification_service: PendingVerificationService,
) -> ExternalApiOrchestrator:
    """Create the configured ExternalApiOrchestrator instance.

    Args:
        config: Typed application configuration
        console_logger: Logger for general output
        error_logger: Logger for error messages and warnings
        analytics: Analytics service for performance tracking
        cache_service: Service for caching API responses
        pending_verification_service: Service for managing verification queue

    Returns:
        The configured ExternalApiOrchestrator instance

    """
    return ExternalApiOrchestrator(
        config=config,
        console_logger=console_logger,
        error_logger=error_logger,
        analytics=analytics,
        cache_service=cache_service,
        pending_verification_service=pending_verification_service,
    )