Skip to content

orchestrator

Cache Orchestrator - Coordinates all specialized cache services.

This module provides a unified interface for interacting with multiple specialized cache services while maintaining the existing API contract.

Key Features: - Coordinates AlbumCacheService, ApiCacheService, and GenericCacheService - Maintains backward compatibility with existing CacheService API - Intelligent routing based on operation type and content - Centralized configuration management and metrics

CacheOrchestrator

CacheOrchestrator(config, logger=None)

Bases: CacheServiceProtocol

Orchestrates multiple specialized cache services with unified interface.

Initialize CacheOrchestrator with configuration.

Parameters:

Name Type Description Default
config AppConfig

Typed application configuration

required
logger Logger | None

Optional logger instance

None
Source code in src/services/cache/orchestrator.py
def __init__(self, config: AppConfig, logger: logging.Logger | None = None) -> None:
    """Initialize CacheOrchestrator with configuration.

    Args:
        config: Typed application configuration
        logger: Optional logger instance
    """
    self.config = config
    self.logger = logger or logging.getLogger(__name__)
    # Initialize configuration manager later when needed
    self.config_manager = None

    # Initialize specialized services
    self.album_service = AlbumCacheService(config, logger)
    self.api_service = ApiCacheService(config, logger)
    self.generic_service = GenericCacheService(config, logger)

    # Service mapping for routing
    self._services = {
        "album": self.album_service,
        "api": self.api_service,
        "generic": self.generic_service,
    }

cache property

cache

Generic cache for backward compatibility.

album_years_cache property

album_years_cache

Album years cache for backward compatibility.

api_cache property

api_cache

API cache for backward compatibility.

initialize async

initialize()

Initialize all cache services.

Source code in src/services/cache/orchestrator.py
async def initialize(self) -> None:
    """Initialize all cache services."""
    self.logger.info("Initializing %s...", LogFormat.entity("CacheOrchestrator"))

    service_tasks: list[tuple[str, Awaitable[Any]]] = [
        ("AlbumCacheService", self.album_service.initialize()),
        ("ApiCacheService", self.api_service.initialize()),
        ("GenericCacheService", self.generic_service.initialize()),
    ]

    results = await asyncio.gather(*(task for _, task in service_tasks), return_exceptions=True)

    failed_services: list[str] = []
    for (service_name, _), result in zip(service_tasks, results, strict=False):
        if isinstance(result, Exception):
            self.logger.error("Failed to initialize %s: %s", LogFormat.entity(service_name), result, exc_info=result)
            failed_services.append(service_name)

    if failed_services:
        failed_list = ", ".join(failed_services)
        msg = f"Cache service initialization failed for: {failed_list}"
        raise RuntimeError(msg)

    self.logger.info("%s initialized successfully", LogFormat.entity("CacheOrchestrator"))

get_album_year async

get_album_year(artist, album)

Get album release year from cache.

Parameters:

Name Type Description Default
artist str

Artist name

required
album str

Album name

required

Returns:

Type Description
str | None

Album release year if found, None otherwise

Source code in src/services/cache/orchestrator.py
async def get_album_year(self, artist: str, album: str) -> str | None:
    """Get album release year from cache.

    Args:
        artist: Artist name
        album: Album name

    Returns:
        Album release year if found, None otherwise
    """
    return await self.album_service.get_album_year(artist, album)

store_album_year async

store_album_year(artist, album, year, confidence=0)

Store album release year in cache.

Parameters:

Name Type Description Default
artist str

Artist name

required
album str

Album name

required
year str

Album release year

required
confidence int

Confidence score 0-100 (higher = more trustworthy)

0
Source code in src/services/cache/orchestrator.py
async def store_album_year(self, artist: str, album: str, year: str, confidence: int = 0) -> None:
    """Store album release year in cache.

    Args:
        artist: Artist name
        album: Album name
        year: Album release year
        confidence: Confidence score 0-100 (higher = more trustworthy)
    """
    await self.album_service.store_album_year(artist, album, year, confidence)

get_async async

get_async(key_data, compute_func=None)

Asynchronous get with optional compute function.

Parameters:

Name Type Description Default
key_data CacheableKey

Cache key or "ALL" for all entries

required
compute_func Callable[[], Future[CacheableValue]] | None

Optional compute function to calculate value if not cached

None

Returns:

Type Description
list[TrackDict] | CacheableValue

Cached or computed value

Source code in src/services/cache/orchestrator.py
async def get_async(  # type: ignore[override]  # Protocol uses @overload with Literal["ALL"]; concrete impl uses a single unified signature
    self,
    key_data: CacheableKey,
    compute_func: Callable[[], asyncio.Future[CacheableValue]] | None = None,
) -> list[TrackDict] | CacheableValue:
    """Asynchronous get with optional compute function.

    Args:
        key_data: Cache key or "ALL" for all entries
        compute_func: Optional compute function to calculate value if not cached

    Returns:
        Cached or computed value
    """
    if compute_func:
        result = self.generic_service.get(key_data)
        if result is None:
            future = compute_func()
            computed = await future
            self.generic_service.set(key_data, computed)
            return computed
        return result
    return self.generic_service.get(key_data)

set

set(key_data, value, ttl=None)

Set value in generic cache.

Parameters:

Name Type Description Default
key_data CacheableKey

Cache key

required
value CacheableValue

Value to cache

required
ttl int | None

Optional TTL in seconds

None
Source code in src/services/cache/orchestrator.py
def set(self, key_data: CacheableKey, value: CacheableValue, ttl: int | None = None) -> None:
    """Set value in generic cache.

    Args:
        key_data: Cache key
        value: Value to cache
        ttl: Optional TTL in seconds
    """
    self.generic_service.set(key_data, value, ttl)

set_async async

set_async(key_data, value, ttl=None)

Async alias for set method (backward compatibility).

Parameters:

Name Type Description Default
key_data CacheableKey

Cache key

required
value CacheableValue

Value to cache

required
ttl int | None

Optional TTL in seconds

None
Source code in src/services/cache/orchestrator.py
async def set_async(self, key_data: CacheableKey, value: CacheableValue, ttl: int | None = None) -> None:
    """Async alias for set method (backward compatibility).

    Args:
        key_data: Cache key
        value: Value to cache
        ttl: Optional TTL in seconds
    """
    self.generic_service.set(key_data, value, ttl)

get

get(key_data)

Get value from generic cache.

Parameters:

Name Type Description Default
key_data CacheableKey

Cache key

required

Returns:

Type Description
CacheableValue | None

Cached value if found, None otherwise

Source code in src/services/cache/orchestrator.py
def get(self, key_data: CacheableKey) -> CacheableValue | None:
    """Get value from generic cache.

    Args:
        key_data: Cache key

    Returns:
        Cached value if found, None otherwise
    """
    return self.generic_service.get(key_data)

invalidate_for_track async

invalidate_for_track(track)

Invalidate all cache entries related to a track.

Parameters:

Name Type Description Default
track TrackDict

Track dictionary with artist and album information

required
Source code in src/services/cache/orchestrator.py
async def invalidate_for_track(self, track: TrackDict) -> None:
    """Invalidate all cache entries related to a track.

    Args:
        track: Track dictionary with artist and album information
    """
    track_payload = track.model_dump()

    artist = str(track_payload.get("artist", "") or "").strip()
    original_artist = str(track_payload.get("original_artist", "") or "").strip()
    album = str(track_payload.get("album", "") or "").strip()
    track_id = str(track_payload.get("id", "") or "").strip()

    # Invalidate generic caches (full snapshot + per artist variants)
    self.generic_service.invalidate("tracks_all")

    artist_candidates = {candidate for candidate in (artist, original_artist) if candidate}
    for candidate in artist_candidates:
        self.generic_service.invalidate(f"tracks_{candidate}")

    if artist and album:
        await self.album_service.invalidate_album(artist, album)
        await self.api_service.invalidate_for_album(artist, album)

        self.logger.debug("Invalidated caches for track: %s - %s", artist, album)

        cache_event = CacheEvent(
            event_type=CacheEventType.TRACK_MODIFIED,
            track_id=track_id or None,
            metadata={"artist": artist, "album": album},
        )
        self.api_service.event_manager.emit_event(cache_event)

save_all_to_disk async

save_all_to_disk()

Save all persistent caches to disk.

Source code in src/services/cache/orchestrator.py
async def save_all_to_disk(self) -> None:
    """Save all persistent caches to disk."""
    self.logger.info("Saving all caches to disk...")

    # Save services that have disk persistence
    save_tasks: list[tuple[str, Awaitable[Any]]] = [
        ("AlbumCacheService", self.album_service.save_to_disk()),
        ("ApiCacheService", self.api_service.save_to_disk()),
        ("GenericCacheService", self.generic_service.save_to_disk()),
    ]

    results = await asyncio.gather(*(task for _, task in save_tasks), return_exceptions=True)

    for (service_name, _), result in zip(save_tasks, results, strict=False):
        if isinstance(result, Exception):
            self.logger.error("Failed to save %s to disk: %s", LogFormat.entity(service_name), result, exc_info=result)

    self.logger.info("All caches saved to disk")

invalidate

invalidate(key_data)

Invalidate specific cache entry.

Parameters:

Name Type Description Default
key_data CacheableKey

Cache key to invalidate

required
Source code in src/services/cache/orchestrator.py
def invalidate(self, key_data: CacheableKey) -> None:
    """Invalidate specific cache entry.

    Args:
        key_data: Cache key to invalidate

    """
    self.generic_service.invalidate(key_data)

invalidate_all async

invalidate_all()

Clear all cache entries across all services.

Source code in src/services/cache/orchestrator.py
async def invalidate_all(self) -> None:
    """Clear all cache entries across all services."""
    self.logger.info("Invalidating all cache entries...")

    # Album and API services have async invalidate_all methods
    await self.album_service.invalidate_all()
    await self.api_service.invalidate_all()
    self.generic_service.invalidate_all()

    self.logger.info("All cache entries invalidated")

load_cache async

load_cache()

Load persistent cache data from disk.

Source code in src/services/cache/orchestrator.py
async def load_cache(self) -> None:
    """Load persistent cache data from disk."""

save_cache async

save_cache()

Save cache data to disk for persistence.

Source code in src/services/cache/orchestrator.py
async def save_cache(self) -> None:
    """Save cache data to disk for persistence."""
    await self.save_all_to_disk()

get_last_run_timestamp async

get_last_run_timestamp()

Get the timestamp of the last cache run.

Returns:

Type Description
datetime

Last run timestamp, or epoch (1970-01-01) if never run

Source code in src/services/cache/orchestrator.py
async def get_last_run_timestamp(self) -> datetime:
    """Get the timestamp of the last cache run.

    Returns:
        Last run timestamp, or epoch (1970-01-01) if never run
    """
    tracker = IncrementalRunTracker(self.config)
    timestamp = await tracker.get_last_run_timestamp()
    # Protocol requires datetime, return epoch if None (never run)
    return timestamp or datetime(1970, 1, 1, tzinfo=UTC)

get_album_year_from_cache async

get_album_year_from_cache(artist, album)

Get cached album year for an artist/album pair.

Source code in src/services/cache/orchestrator.py
async def get_album_year_from_cache(self, artist: str, album: str) -> str | None:
    """Get cached album year for an artist/album pair."""
    return await self.get_album_year(artist, album)

get_album_year_entry_from_cache async

get_album_year_entry_from_cache(artist, album)

Get full album cache entry for an artist/album pair.

Source code in src/services/cache/orchestrator.py
async def get_album_year_entry_from_cache(self, artist: str, album: str) -> AlbumCacheEntry | None:
    """Get full album cache entry for an artist/album pair."""
    return await self.album_service.get_album_year_entry(artist, album)

store_album_year_in_cache async

store_album_year_in_cache(
    artist, album, year, confidence=0
)

Store album year in persistent cache.

Source code in src/services/cache/orchestrator.py
async def store_album_year_in_cache(self, artist: str, album: str, year: str, confidence: int = 0) -> None:
    """Store album year in persistent cache."""
    await self.store_album_year(artist, album, year, confidence)

invalidate_album_cache async

invalidate_album_cache(artist, album)

Invalidate cached data for a specific album.

Source code in src/services/cache/orchestrator.py
async def invalidate_album_cache(self, artist: str, album: str) -> None:
    """Invalidate cached data for a specific album."""
    await self.album_service.invalidate_album(artist, album)

invalidate_all_albums async

invalidate_all_albums()

Invalidate all album cache entries.

Source code in src/services/cache/orchestrator.py
async def invalidate_all_albums(self) -> None:
    """Invalidate all album cache entries."""
    await self.album_service.invalidate_all()

sync_cache async

sync_cache()

Synchronize cache to persistent storage.

Source code in src/services/cache/orchestrator.py
async def sync_cache(self) -> None:
    """Synchronize cache to persistent storage."""
    await self.save_all_to_disk()

get_cached_api_result async

get_cached_api_result(artist, album, source)

Get cached API result for an artist/album from a specific source.

Source code in src/services/cache/orchestrator.py
async def get_cached_api_result(
    self,
    artist: str,
    album: str,
    source: str,
) -> CachedApiResult | None:
    """Get cached API result for an artist/album from a specific source."""
    return await self.api_service.get_cached_result(artist, album, source)

set_cached_api_result async

set_cached_api_result(
    artist,
    album,
    source,
    year,
    *,
    metadata=None,
    is_negative=False
)

Cache an API result for an artist/album from a specific source.

Source code in src/services/cache/orchestrator.py
async def set_cached_api_result(
    self,
    artist: str,
    album: str,
    source: str,
    year: str | None,
    *,
    metadata: dict[str, Any] | None = None,
    is_negative: bool = False,
) -> None:
    """Cache an API result for an artist/album from a specific source."""
    success = year is not None and not is_negative
    data = {"year": year}
    if metadata:
        data |= metadata
    await self.api_service.set_cached_result(artist, album, source, success, data=data)

generate_album_key staticmethod

generate_album_key(artist, album)

Generate a unique key for an artist/album pair.

Source code in src/services/cache/orchestrator.py
@staticmethod
def generate_album_key(artist: str, album: str) -> str:
    """Generate a unique key for an artist/album pair."""
    # Use the same key generation as the album service
    hash_service = UnifiedHashService()
    return hash_service.hash_album_key(artist, album)

clear async

clear()

Clear all entries from all caches (generic, album, and API caches).

Source code in src/services/cache/orchestrator.py
async def clear(self) -> None:
    """Clear all entries from all caches (generic, album, and API caches)."""
    self.generic_service.invalidate_all()
    await self.album_service.invalidate_all()
    await self.api_service.invalidate_all()

shutdown async

shutdown()

Gracefully shutdown background tasks for cache services.

Shuts down: 1. ApiCacheService background tasks (cache invalidation, etc.) 2. GenericCacheService cleanup task

Source code in src/services/cache/orchestrator.py
async def shutdown(self) -> None:
    """Gracefully shutdown background tasks for cache services.

    Shuts down:
    1. ApiCacheService background tasks (cache invalidation, etc.)
    2. GenericCacheService cleanup task
    """
    await self.api_service.shutdown()
    await self.generic_service.stop_cleanup_task()