Skip to content

year_batch

Batch processing for album year updates.

This module handles batch processing of album year updates, including concurrency control and progress tracking.

Track-level update operations are delegated to TrackUpdater, and prerelease handling is delegated to PrereleaseHandler.

YearBatchProcessor

YearBatchProcessor(
    *,
    year_determinator,
    track_processor,
    retry_handler,
    console_logger,
    error_logger,
    config,
    analytics,
    dry_run=False
)

Handles batch processing of album years.

Responsibilities: - Batch album processing with rate limiting - Sequential and concurrent processing modes - Progress tracking and reporting - Orchestration of prerelease handling and track updates

Initialize the YearBatchProcessor.

Parameters:

Name Type Description Default
year_determinator YearDeterminator

Component for determining album years

required
track_processor TrackProcessor

Processor for track updates

required
retry_handler DatabaseRetryHandler

Retry handler for transient error recovery

required
console_logger Logger

Logger for console output

required
error_logger Logger

Logger for error messages

required
config AppConfig

Typed application configuration

required
analytics AnalyticsProtocol

Service for performance tracking

required
dry_run bool

Whether to run in dry-run mode

False
Source code in src/core/tracks/year_batch.py
def __init__(
    self,
    *,
    year_determinator: YearDeterminator,
    track_processor: TrackProcessor,
    retry_handler: DatabaseRetryHandler,
    console_logger: logging.Logger,
    error_logger: logging.Logger,
    config: AppConfig,
    analytics: AnalyticsProtocol,
    dry_run: bool = False,
) -> None:
    """Initialize the YearBatchProcessor.

    Args:
        year_determinator: Component for determining album years
        track_processor: Processor for track updates
        retry_handler: Retry handler for transient error recovery
        console_logger: Logger for console output
        error_logger: Logger for error messages
        config: Typed application configuration
        analytics: Service for performance tracking
        dry_run: Whether to run in dry-run mode

    """
    self.year_determinator = year_determinator
    self.console_logger = console_logger
    self.error_logger = error_logger
    self.config = config
    self.analytics = analytics
    self.dry_run = dry_run
    self._dry_run_actions: list[dict[str, Any]] = []

    self._prerelease_handler = PrereleaseHandler(
        console_logger=console_logger,
        config=config,
        pending_verification=year_determinator.pending_verification,
        prerelease_recheck_days=year_determinator.prerelease_recheck_days,
    )

    self._track_updater = TrackUpdater(
        track_processor=track_processor,
        retry_handler=retry_handler,
        console_logger=console_logger,
        error_logger=error_logger,
        config=config,
    )

process_albums_in_batches async

process_albums_in_batches(
    grouped_albums, updated_tracks, changes_log, force=False
)

Process albums in batches with rate limiting.

Parameters:

Name Type Description Default
grouped_albums dict[tuple[str, str], list[TrackDict]]

Dictionary mapping (artist, album) tuples to lists of tracks.

required
updated_tracks list[TrackDict]

List to append updated tracks to.

required
changes_log list[ChangeLogEntry]

List to append change entries to.

required
force bool

If True, bypass skip checks and re-query API for all albums.

False
Source code in src/core/tracks/year_batch.py
async def process_albums_in_batches(
    self,
    grouped_albums: dict[tuple[str, str], list[TrackDict]],
    updated_tracks: list[TrackDict],
    changes_log: list[ChangeLogEntry],
    force: bool = False,
) -> None:
    """Process albums in batches with rate limiting.

    Args:
        grouped_albums: Dictionary mapping (artist, album) tuples to lists of tracks.
        updated_tracks: List to append updated tracks to.
        changes_log: List to append change entries to.
        force: If True, bypass skip checks and re-query API for all albums.

    """
    processing = self.config.year_retrieval.processing

    batch_size = processing.batch_size
    delay_between_batches = int(processing.delay_between_batches)
    adaptive_delay = processing.adaptive_delay

    album_items = list(grouped_albums.items())
    total_albums = len(album_items)
    if total_albums == 0:
        return

    concurrency_limit = self._determine_concurrency_limit()

    if self._should_use_sequential_processing(adaptive_delay, concurrency_limit):
        await self._process_batches_sequentially(
            album_items,
            batch_size,
            delay_between_batches,
            total_albums,
            updated_tracks,
            changes_log,
            force=force,
        )
        return

    await self._process_batches_concurrently(
        album_items,
        batch_size,
        total_albums,
        concurrency_limit,
        updated_tracks,
        changes_log,
        force=force,
    )

group_tracks_by_album staticmethod

group_tracks_by_album(tracks)

Group tracks by album (album_artist, album) key.

Uses album_artist instead of artist to properly handle collaboration tracks where multiple artists appear on the same album.

Parameters:

Name Type Description Default
tracks list[TrackDict]

List of tracks to group

required

Returns:

Type Description
dict[tuple[str, str], list[TrackDict]]

Dictionary mapping (album_artist, album) tuples to lists of tracks

Source code in src/core/tracks/year_batch.py
@staticmethod
def group_tracks_by_album(
    tracks: list[TrackDict],
) -> dict[tuple[str, str], list[TrackDict]]:
    """Group tracks by album (album_artist, album) key.

    Uses album_artist instead of artist to properly handle collaboration tracks
    where multiple artists appear on the same album.

    Args:
        tracks: List of tracks to group

    Returns:
        Dictionary mapping (album_artist, album) tuples to lists of tracks

    """
    albums: dict[tuple[str, str], list[TrackDict]] = defaultdict(list)
    for track in tracks:
        album_artist = str(track.get("album_artist", ""))
        album = str(track.get("album", ""))

        # Fallback to normalized artist if album_artist is empty
        if not album_artist or not album_artist.strip():
            raw_artist = str(track.get("artist", ""))
            album_artist = normalize_collaboration_artist(raw_artist)

        album_key = (album_artist, album)
        albums[album_key].append(track)

    return albums

get_dry_run_actions

get_dry_run_actions()

Get a list of dry-run actions that would have been performed.

Source code in src/core/tracks/year_batch.py
def get_dry_run_actions(self) -> list[dict[str, Any]]:
    """Get a list of dry-run actions that would have been performed."""
    return self._dry_run_actions

update_album_tracks_bulk_async async

update_album_tracks_bulk_async(tracks, year, artist, album)

Update year for multiple tracks. Delegates to TrackUpdater.

Source code in src/core/tracks/year_batch.py
async def update_album_tracks_bulk_async(
    self,
    tracks: list[TrackDict],
    year: str,
    artist: str,
    album: str,
) -> tuple[int, int]:
    """Update year for multiple tracks. Delegates to TrackUpdater."""
    return await self._track_updater.update_album_tracks_bulk_async(
        tracks=tracks,
        year=year,
        artist=artist,
        album=album,
    )