Skip to content

music_updater

Refactored Music Updater core class.

This is a streamlined version that uses the new modular components.

MusicUpdater

MusicUpdater(deps)

Orchestrates music library updates using modular components.

Initialize MusicUpdater with dependency injection.

Parameters:

Name Type Description Default
deps DependencyContainer

Dependency container with all required services

required
Source code in src/app/music_updater.py
def __init__(self, deps: DependencyContainer) -> None:
    """Initialize MusicUpdater with dependency injection.

    Args:
        deps: Dependency container with all required services

    """
    self.deps = deps
    self.app_config = deps.app_config
    self.console_logger = deps.console_logger
    self.error_logger = deps.error_logger
    self.analytics = deps.analytics

    # Initialize components
    self.track_processor = TrackProcessor(
        ap_client=deps.ap_client,
        cache_service=deps.cache_service,
        library_snapshot_service=deps.library_snapshot_service,
        console_logger=deps.console_logger,
        error_logger=deps.error_logger,
        config=deps.app_config,
        analytics=deps.analytics,
        dry_run=deps.dry_run,
    )

    rename_config_path = self._resolve_artist_rename_config_path(deps)
    self.artist_renamer = ArtistRenamer(
        track_processor=self.track_processor,
        console_logger=deps.console_logger,
        error_logger=deps.error_logger,
        config_path=rename_config_path,
    )
    self.track_processor.set_artist_renamer(self.artist_renamer)

    self.genre_manager = GenreManager(
        track_processor=self.track_processor,
        console_logger=deps.console_logger,
        error_logger=deps.error_logger,
        analytics=deps.analytics,
        config=deps.app_config,
        dry_run=deps.dry_run,
    )

    self.year_retriever = YearRetriever(
        track_processor=self.track_processor,
        cache_service=deps.cache_service,
        external_api=deps.external_api_service,
        pending_verification=deps.pending_verification_service,
        retry_handler=deps.retry_handler,
        console_logger=deps.console_logger,
        error_logger=deps.error_logger,
        analytics=deps.analytics,
        config=deps.app_config,
        dry_run=deps.dry_run,
    )

    self.database_verifier = DatabaseVerifier(
        ap_client=deps.ap_client,
        console_logger=deps.console_logger,
        error_logger=deps.error_logger,
        db_verify_logger=deps.db_verify_logger,
        analytics=deps.analytics,
        config=deps.app_config,
        dry_run=deps.dry_run,
    )

    self.incremental_filter = IncrementalFilterService(
        console_logger=deps.console_logger,
        error_logger=deps.error_logger,
        analytics=deps.analytics,
        config=deps.app_config,
        dry_run=deps.dry_run,
        track_list_loader=load_track_list,
    )

    # Pipeline snapshot manager
    self.snapshot_manager = PipelineSnapshotManager(
        track_processor=self.track_processor,
        console_logger=deps.console_logger,
    )

    # Track cleaning service
    self.cleaning_service = TrackCleaningService(
        track_processor=self.track_processor,
        config=deps.app_config,
        console_logger=deps.console_logger,
        error_logger=deps.error_logger,
    )

    # Year update service
    self.year_service = YearUpdateService(
        track_processor=self.track_processor,
        year_retriever=self.year_retriever,
        snapshot_manager=self.snapshot_manager,
        config=deps.app_config,
        console_logger=deps.console_logger,
        error_logger=deps.error_logger,
        cleaning_service=self.cleaning_service,
        artist_renamer=self.artist_renamer,
    )

    # Genre update service
    self.genre_service = GenreUpdateService(
        track_processor=self.track_processor,
        genre_manager=self.genre_manager,
        config=deps.app_config,
        console_logger=self.console_logger,
        error_logger=self.error_logger,
        cleaning_service=self.cleaning_service,
        artist_renamer=self.artist_renamer,
    )

    # Dry run context
    self.dry_run_mode = ""
    self.dry_run_test_artists: set[str] = set()

set_dry_run_context

set_dry_run_context(mode, test_artists)

Set the dry-run context for the updater.

Parameters:

Name Type Description Default
mode str

The dry-run mode

required
test_artists set[str]

Set of test artists for filtering

required
Source code in src/app/music_updater.py
def set_dry_run_context(self, mode: str, test_artists: set[str]) -> None:
    """Set the dry-run context for the updater.

    Args:
        mode: The dry-run mode
        test_artists: Set of test artists for filtering

    """
    self.dry_run_mode = mode
    self.dry_run_test_artists = test_artists
    # Also set context on track_processor
    self.track_processor.set_dry_run_context(mode, test_artists)
    # Propagate test artists to year_service for filtering
    self.year_service.set_test_artists(test_artists)
    # Propagate test artists to genre_service for filtering
    self.genre_service.set_test_artists(test_artists)

run_clean_artist async

run_clean_artist(artist)

Clean track names for a specific artist.

Parameters:

Name Type Description Default
artist str

Artist name to process

required
Source code in src/app/music_updater.py
async def run_clean_artist(self, artist: str) -> None:
    """Clean track names for a specific artist.

    Args:
        artist: Artist name to process

    """
    self.console_logger.info("Starting clean operation for artist: %s", artist)

    # Check if Music app is running
    if not is_music_app_running(self.error_logger):
        self.error_logger.error(
            "Music.app is not running - cannot perform clean_artist for '%s'. Please start Music.app before running this script.",
            artist,
        )
        return

    # Fetch tracks for artist
    tracks = await self.track_processor.fetch_tracks_async(artist=artist)
    if not tracks:
        self.console_logger.warning("No tracks found for artist: %s", artist)
        return

    self.console_logger.info("Found %d tracks for artist %s", len(tracks), artist)

    # Process tracks and collect results
    updated_tracks, changes_log = await self.cleaning_service.process_all_tracks(tracks, artist)

    # Save results if any tracks were updated
    if updated_tracks:
        await self._save_clean_results(changes_log)

    self.console_logger.info(
        "Clean operation complete. Updated %d tracks for artist %s",
        len(updated_tracks),
        artist,
    )

run_revert_years async

run_revert_years(artist, album, backup_csv=None)

Revert year updates for an artist (optionally per album).

Uses backup CSV if provided; otherwise uses the latest changes_report.csv.

Source code in src/app/music_updater.py
async def run_revert_years(self, artist: str, album: str | None, backup_csv: str | None = None) -> None:
    """Revert year updates for an artist (optionally per album).

    Uses backup CSV if provided; otherwise uses the latest changes_report.csv.
    """
    await self.year_service.run_revert_years(artist, album, backup_csv)

run_restore_release_years async

run_restore_release_years(
    artist=None, album=None, threshold=5
)

Restore year from Apple Music's release_year field.

Finds albums where 'year' differs dramatically from 'release_year' and updates year to match release_year.

Parameters:

Name Type Description Default
artist str | None

Optional artist name filter.

None
album str | None

Optional album name filter (requires artist).

None
threshold int

Year difference threshold (default: 5 years).

5
Source code in src/app/music_updater.py
async def run_restore_release_years(
    self,
    artist: str | None = None,
    album: str | None = None,
    threshold: int = 5,
) -> None:
    """Restore year from Apple Music's release_year field.

    Finds albums where 'year' differs dramatically from 'release_year'
    and updates year to match release_year.

    Args:
        artist: Optional artist name filter.
        album: Optional album name filter (requires artist).
        threshold: Year difference threshold (default: 5 years).

    """
    await self.year_service.run_restore_release_years(artist, album, threshold)

run_update_years async

run_update_years(artist, force, fresh=False)

Update album years for all or specific artist.

Parameters:

Name Type Description Default
artist str | None

Optional artist filter

required
force bool

Force update even if year exists

required
fresh bool

Fresh mode - invalidate cache before processing, implies force

False
Source code in src/app/music_updater.py
async def run_update_years(self, artist: str | None, force: bool, fresh: bool = False) -> None:
    """Update album years for all or specific artist.

    Args:
        artist: Optional artist filter
        force: Force update even if year exists
        fresh: Fresh mode - invalidate cache before processing, implies force
    """
    await self.year_service.run_update_years(artist, force, fresh)

run_update_genres async

run_update_genres(artist, force)

Update genres for all or specific artist.

Parameters:

Name Type Description Default
artist str | None

Optional artist filter.

required
force bool

Force update even if genre exists.

required
Source code in src/app/music_updater.py
async def run_update_genres(self, artist: str | None, force: bool) -> None:
    """Update genres for all or specific artist.

    Args:
        artist: Optional artist filter.
        force: Force update even if genre exists.
    """
    await self.genre_service.run_update_genres(artist, force)

run_verify_pending async

run_verify_pending(_force=False)

Re-verify albums that are pending year verification.

Parameters:

Name Type Description Default
_force bool

Force verification even if recently done (currently unused)

False
Source code in src/app/music_updater.py
async def run_verify_pending(self, _force: bool = False) -> None:
    """Re-verify albums that are pending year verification.

    Args:
        _force: Force verification even if recently done (currently unused)

    """
    start_time = time.time()
    pending_albums = await self.deps.pending_verification_service.get_all_pending_albums()

    if not pending_albums:
        self.console_logger.info(
            "%s %s | no albums pending",
            LogFormat.label("PENDING"),
            LogFormat.dim("SKIP"),
        )
        return

    # Filter albums that need verification (interval elapsed)
    albums_to_verify: list[PendingAlbumEntry] = [
        entry for entry in pending_albums if await self.deps.pending_verification_service.is_verification_needed(entry.artist, entry.album)
    ]
    skipped_count = len(pending_albums) - len(albums_to_verify)

    if not albums_to_verify:
        self.console_logger.info(
            "%s %s | %s pending, none due yet",
            LogFormat.label("PENDING"),
            LogFormat.dim("SKIP"),
            LogFormat.number(len(pending_albums)),
        )
        return

    self.console_logger.info(
        "%s %s | due: %s (skipped: %s)",
        LogFormat.label("PENDING"),
        LogFormat.success("START"),
        LogFormat.number(len(albums_to_verify)),
        LogFormat.dim(str(skipped_count)),
    )

    verified_count = 0
    failed_count = 0
    for entry in albums_to_verify:
        year_str, _, _, _ = await self.deps.external_api_service.get_album_year(entry.artist, entry.album)
        if not year_str:
            failed_count += 1
            continue

        if await self._verify_single_pending_album(entry.artist, entry.album, year_str):
            verified_count += 1
        else:
            failed_count += 1

    # Update last verification timestamp
    await self.deps.pending_verification_service.update_verification_timestamp()

    duration = time.time() - start_time
    if verified_count > 0:
        self.console_logger.info(
            "%s %s | verified: %s failed: %s %s",
            LogFormat.label("PENDING"),
            LogFormat.success("DONE"),
            LogFormat.success(str(verified_count)),
            LogFormat.dim(str(failed_count)),
            LogFormat.duration(duration),
        )
    else:
        self.console_logger.info(
            "%s %s | no years found %s",
            LogFormat.label("PENDING"),
            LogFormat.warning("DONE"),
            LogFormat.duration(duration),
        )

run_verify_database async

run_verify_database(force=False)

Verify track database against Music.app.

Parameters:

Name Type Description Default
force bool

Force verification even if recently done

False
Source code in src/app/music_updater.py
async def run_verify_database(self, force: bool = False) -> None:
    """Verify track database against Music.app.

    Args:
        force: Force verification even if recently done

    """
    self.console_logger.info("Starting database verification")

    removed_count = await self.database_verifier.verify_and_clean_track_database(
        force=force,
        apply_test_filter=self._should_apply_test_filter(),
    )

    self.console_logger.info(
        "Database verification complete. Removed %d invalid tracks",
        removed_count,
    )

run_main_pipeline async

run_main_pipeline(force=False, fresh=False)

Run the main update pipeline: clean names, update genres, update years.

Parameters:

Name Type Description Default
force bool

Force all operations (bypass incremental filtering)

False
fresh bool

Fresh mode - implies force, also invalidates caches

False
Source code in src/app/music_updater.py
async def run_main_pipeline(self, force: bool = False, fresh: bool = False) -> None:
    """Run the main update pipeline: clean names, update genres, update years.

    Args:
        force: Force all operations (bypass incremental filtering)
        fresh: Fresh mode - implies force, also invalidates caches

    """
    # fresh implies force
    if fresh:
        force = True
    self.console_logger.info("Starting main update pipeline")
    self.snapshot_manager.reset()

    # Fetch tracks based on mode (test or normal)
    tracks = await self._fetch_tracks_for_pipeline_mode(force=force)
    if not tracks:
        self.console_logger.warning("No tracks found in Music.app (force=%s)", force)
        return

    self.console_logger.info("Found %d tracks in Music.app", len(tracks))

    # Compute incremental scope - filter tracks that need processing
    incremental_tracks, should_skip_pipeline = await self._compute_incremental_scope(tracks, force)

    if should_skip_pipeline:
        self.console_logger.info("No new tracks to process, skipping pipeline")
        return

    self.console_logger.info("Processing %d tracks (%s mode)", len(incremental_tracks), "full" if force else "incremental")

    # Get last run time for incremental updates
    last_run_time = await self._get_last_run_time(force)

    # Execute the main steps with incremental scope and collect changes
    all_changes: list[ChangeLogEntry] = []

    # Step 1: Clean metadata
    cleaning_changes = await self.cleaning_service.clean_all_metadata_with_logs(incremental_tracks)
    all_changes.extend(cleaning_changes)

    # Step 2: Rename artists (if configured)
    if self.artist_renamer.has_mapping:
        self.console_logger.info("Step 2/4: Renaming artists based on configuration")
        renamed_tracks = await self.artist_renamer.rename_tracks(incremental_tracks)
        if renamed_tracks:
            self.console_logger.info("Renamed artists for %d tracks", len(renamed_tracks))
            # Artist renames create their own change log entries via track_processor
    else:
        self.console_logger.debug("No artist rename mappings configured, skipping rename step")

    # Step 3: Update genres (use ALL tracks - GenreManager handles incremental logic internally)
    genre_changes = await self._update_all_genres(tracks, last_run_time, force, fresh)
    all_changes.extend(genre_changes)

    # Step 4: Update years (use ALL tracks - YearBatchProcessor handles internal skip logic)
    year_changes = await self._update_all_years_with_logs(tracks, force, fresh)
    all_changes.extend(year_changes)

    # Save combined results including all changes
    await self._save_pipeline_results(all_changes)

    # Update last run timestamp if pipeline completed successfully
    if self._should_update_run_timestamp(force, incremental_tracks):
        await self.database_verifier.update_last_incremental_run()

    # Persist updated snapshot to disk (prevents stale data on next run)
    if not self.deps.dry_run:
        await self.snapshot_manager.persist_to_disk()

    self.snapshot_manager.clear()
    self.console_logger.info("Main update pipeline completed successfully")