Skip to content

year_retriever

Year retrieval functionality for Music Genre Updater.

This module provides a facade for album year retrieval and updates. The actual logic is delegated to specialized components: - YearDeterminator: Core year determination logic - YearBatchProcessor: Batch processing with concurrency control - YearConsistencyChecker: Year consistency analysis - YearFallbackHandler: Fallback logic for uncertain updates

YearRetriever

YearRetriever(
    track_processor,
    cache_service,
    external_api,
    pending_verification,
    retry_handler,
    console_logger,
    error_logger,
    analytics,
    config,
    dry_run=False,
)

Facade for album year retrieval and updates.

This class coordinates the year retrieval subsystem, delegating actual work to specialized components while maintaining backward compatibility with existing code.

Initialize the YearRetriever.

Parameters:

Name Type Description Default
track_processor TrackProcessor

Track processor for updating tracks

required
cache_service CacheServiceProtocol

Cache service for storing years

required
external_api ExternalApiServiceProtocol

External API service for fetching years

required
pending_verification PendingVerificationServiceProtocol

Service for managing pending verifications

required
retry_handler DatabaseRetryHandler

Retry handler for transient error recovery

required
console_logger Logger

Logger for console output

required
error_logger Logger

Logger for error messages

required
analytics AnalyticsProtocol

Service for performance tracking

required
config AppConfig

Typed application configuration

required
dry_run bool

Whether to run in dry-run mode

False
Source code in src/core/tracks/year_retriever.py
def __init__(
    self,
    track_processor: TrackProcessor,
    cache_service: CacheServiceProtocol,
    external_api: ExternalApiServiceProtocol,
    pending_verification: PendingVerificationServiceProtocol,
    retry_handler: DatabaseRetryHandler,
    console_logger: logging.Logger,
    error_logger: logging.Logger,
    analytics: AnalyticsProtocol,
    config: AppConfig,
    dry_run: bool = False,
) -> None:
    """Initialize the YearRetriever.

    Args:
        track_processor: Track processor for updating tracks
        cache_service: Cache service for storing years
        external_api: External API service for fetching years
        pending_verification: Service for managing pending verifications
        retry_handler: Retry handler for transient error recovery
        console_logger: Logger for console output
        error_logger: Logger for error messages
        analytics: Service for performance tracking
        config: Typed application configuration
        dry_run: Whether to run in dry-run mode

    """
    # Store references
    self.track_processor = track_processor
    self.cache_service = cache_service
    self.external_api = external_api
    self.pending_verification = pending_verification
    self.console_logger = console_logger
    self.error_logger = error_logger
    self.analytics = analytics
    self.config = config
    self.dry_run = dry_run
    self._dry_run_actions: list[dict[str, object]] = []
    self._last_updated_tracks: list[TrackDict] = []

    # Extract configuration from typed model
    fallback_cfg = config.year_retrieval.fallback
    logic_cfg = config.year_retrieval.logic

    # Configuration values (exposed for backward compatibility)
    self.fallback_enabled = fallback_cfg.enabled
    self.year_difference_threshold = fallback_cfg.year_difference_threshold
    self.absurd_year_threshold = logic_cfg.absurd_year_threshold
    self.suspicion_threshold_years = logic_cfg.suspicion_threshold_years
    self.min_confidence_for_new_year = int(logic_cfg.min_confidence_for_new_year)

    # Initialize consistency checker
    self.year_consistency_checker = YearConsistencyChecker(
        console_logger=self.console_logger,
        top_years_count=self.TOP_YEARS_COUNT,
        parity_threshold=self.PARITY_THRESHOLD,
        dominance_min_share=self.DOMINANCE_MIN_SHARE,
        suspicion_threshold_years=self.suspicion_threshold_years,
    )

    # Initialize fallback handler
    self.year_fallback_handler = YearFallbackHandler(
        console_logger=self.console_logger,
        pending_verification=self.pending_verification,
        fallback_enabled=self.fallback_enabled,
        absurd_year_threshold=self.absurd_year_threshold,
        year_difference_threshold=self.year_difference_threshold,
        min_confidence_for_new_year=self.min_confidence_for_new_year,
        api_orchestrator=self.external_api,
    )

    # Initialize year determinator
    self._year_determinator = YearDeterminator(
        cache_service=self.cache_service,
        external_api=self.external_api,
        pending_verification=self.pending_verification,
        consistency_checker=self.year_consistency_checker,
        fallback_handler=self.year_fallback_handler,
        console_logger=self.console_logger,
        error_logger=self.error_logger,
        config=self.config,
    )

    # Initialize batch processor
    self._batch_processor = YearBatchProcessor(
        year_determinator=self._year_determinator,
        track_processor=self.track_processor,
        retry_handler=retry_handler,
        console_logger=self.console_logger,
        error_logger=self.error_logger,
        config=self.config,
        analytics=self.analytics,
        dry_run=self.dry_run,
    )

process_album_years async

process_album_years(tracks, force=False, fresh=False)

Process and update album years for given tracks.

Parameters:

Name Type Description Default
tracks list[TrackDict]

Tracks to process

required
force bool

Force update even if year exists (bypasses cache/skip checks)

False
fresh bool

Fresh mode - invalidate cache before processing, implies force

False

Returns:

Type Description
bool

True if successful, False otherwise

Source code in src/core/tracks/year_retriever.py
async def process_album_years(
    self,
    tracks: list[TrackDict],
    force: bool = False,
    fresh: bool = False,
) -> bool:
    """Process and update album years for given tracks.

    Args:
        tracks: Tracks to process
        force: Force update even if year exists (bypasses cache/skip checks)
        fresh: Fresh mode - invalidate cache before processing, implies force

    Returns:
        True if successful, False otherwise

    """
    if not self.config.year_retrieval.enabled:
        self.console_logger.info("Year retrieval is disabled in config")
        return True

    # fresh implies force
    if fresh:
        force = True

    try:
        self.console_logger.info("Starting album year updates (force=%s, fresh=%s)", force, fresh)
        self._last_updated_tracks = []

        # FRESH mode: invalidate album years cache before processing
        if fresh:
            self.console_logger.info("Fresh mode: invalidating album years cache")
            await self.cache_service.invalidate_all_albums()

        # Initialize external API service if not already initialized
        # Note: initialize() is idempotent - safe to call multiple times
        if not getattr(self.external_api, "_initialized", False):
            await self.external_api.initialize()

        # Run the update logic with force flag
        updated_tracks, _changes_log = await self._update_album_years_logic(tracks, force=force)
        self._last_updated_tracks = updated_tracks

        # Summary with detailed statistics
        albums_processed = len({f"{t.get('artist', '')} - {t.get('album', '')}" for t in tracks if t.get("album")})
        albums_with_empty_year = len([t for t in tracks if is_empty_year(t.get("year"))])

        self.console_logger.info(
            "Album year update complete: %d tracks updated from %d albums processed (%d had empty years)",
            len(updated_tracks),
            albums_processed,
            albums_with_empty_year,
        )

        if len(updated_tracks) == 0 and albums_with_empty_year > 0:
            self.console_logger.warning(
                "No album years were updated despite %d albums having empty years. "
                "This likely means APIs could not find release information for these albums.",
                albums_with_empty_year,
            )

        # Generate report for problematic albums
        min_attempts = int(self.config.reporting.min_attempts_for_report)
        problematic_count = await self.pending_verification.generate_problematic_albums_report(min_attempts=min_attempts)
        if problematic_count > 0:
            self.console_logger.warning(
                "Found %d albums that failed to get year after %d+ attempts",
                problematic_count,
                min_attempts,
            )

    except (OSError, ValueError, RuntimeError):
        self.error_logger.exception("Error in the album year processing")
        return False

    return True

get_album_years_with_logs async

get_album_years_with_logs(tracks, force=False)

Get album year updates with change logs.

This is the public API for pipeline integration.

Parameters:

Name Type Description Default
tracks list[TrackDict]

Tracks to process

required
force bool

If True, bypass skip checks and re-query API for all albums

False

Returns:

Type Description
tuple[list[TrackDict], list[ChangeLogEntry]]

Tuple of (updated_tracks, change_logs)

Source code in src/core/tracks/year_retriever.py
async def get_album_years_with_logs(
    self,
    tracks: list[TrackDict],
    force: bool = False,
) -> tuple[list[TrackDict], list[ChangeLogEntry]]:
    """Get album year updates with change logs.

    This is the public API for pipeline integration.

    Args:
        tracks: Tracks to process
        force: If True, bypass skip checks and re-query API for all albums

    Returns:
        Tuple of (updated_tracks, change_logs)

    """
    return await self._update_album_years_logic(tracks, force=force)

update_years_from_discogs async

update_years_from_discogs(tracks, force=False)

Update years specifically from Discogs API.

Parameters:

Name Type Description Default
tracks list[TrackDict]

Tracks to process

required
force bool

If True, bypass skip checks and re-query API for all albums

False

Returns:

Type Description
tuple[list[TrackDict], list[ChangeLogEntry]]

Tuple of (updated_tracks, change_logs)

Source code in src/core/tracks/year_retriever.py
@track_instance_method("year_discogs_update")
async def update_years_from_discogs(
    self,
    tracks: list[TrackDict],
    force: bool = False,
) -> tuple[list[TrackDict], list[ChangeLogEntry]]:
    """Update years specifically from Discogs API.

    Args:
        tracks: Tracks to process
        force: If True, bypass skip checks and re-query API for all albums

    Returns:
        Tuple of (updated_tracks, change_logs)

    """
    return await self._update_album_years_logic(tracks, force=force)

get_dry_run_actions

get_dry_run_actions()

Get a list of dry-run actions that would have been performed.

Source code in src/core/tracks/year_retriever.py
def get_dry_run_actions(self) -> list[dict[str, Any]]:
    """Get a list of dry-run actions that would have been performed."""
    return self._batch_processor.get_dry_run_actions()

get_last_updated_tracks

get_last_updated_tracks()

Get the list of tracks updated in the last run.

Source code in src/core/tracks/year_retriever.py
def get_last_updated_tracks(self) -> list[TrackDict]:
    """Get the list of tracks updated in the last run."""
    return self._last_updated_tracks

set_last_updated_tracks

set_last_updated_tracks(tracks)

Set the list of last updated tracks.

Parameters:

Name Type Description Default
tracks list[TrackDict]

List of updated tracks

required
Source code in src/core/tracks/year_retriever.py
def set_last_updated_tracks(self, tracks: list[TrackDict]) -> None:
    """Set the list of last updated tracks.

    Args:
        tracks: List of updated tracks

    """
    self._last_updated_tracks = tracks

update_album_tracks_bulk_async async

update_album_tracks_bulk_async(tracks, year, artist, album)

Update year for multiple tracks. Delegates to YearBatchProcessor.

Parameters:

Name Type Description Default
tracks list[TrackDict]

List of tracks to update.

required
year str

Year value to set.

required
artist str

Artist name for contextual logging.

required
album str

Album name for contextual logging.

required

Returns:

Type Description
tuple[int, int]

Tuple of (successful_count, failed_count).

Source code in src/core/tracks/year_retriever.py
async def update_album_tracks_bulk_async(
    self,
    tracks: list[TrackDict],
    year: str,
    artist: str,
    album: str,
) -> tuple[int, int]:
    """Update year for multiple tracks.
    Delegates to YearBatchProcessor.

    Args:
        tracks: List of tracks to update.
        year: Year value to set.
        artist: Artist name for contextual logging.
        album: Album name for contextual logging.

    Returns:
        Tuple of (successful_count, failed_count).
    """
    return await self._batch_processor.update_album_tracks_bulk_async(
        tracks=tracks,
        year=year,
        artist=artist,
        album=album,
    )