Skip to content

track_processor

Track processing functionality for Music Genre Updater.

This module handles fetching tracks from Music.app, caching, and updating track properties.

TrackProcessor

TrackProcessor(
    ap_client,
    cache_service,
    *,
    library_snapshot_service=None,
    console_logger,
    error_logger,
    config,
    analytics,
    dry_run=False,
    security_validator=None
)

Handles track fetching and updating operations.

Initialize the TrackProcessor.

Parameters:

Name Type Description Default
ap_client AppleScriptClientProtocol

AppleScript client for Music.app communication

required
cache_service CacheServiceProtocol

Cache service for storing track data

required
library_snapshot_service LibrarySnapshotServiceProtocol | None

Optional library snapshot service for cached snapshots

None
console_logger Logger

Logger for console output

required
error_logger Logger

Logger for error messages

required
config AppConfig

Typed application configuration

required
analytics AnalyticsProtocol

Service for tracking method calls

required
dry_run bool

Whether to run in dry-run mode

False
security_validator SecurityValidator | None

Optional security validator for input sanitization

None
Source code in src/core/tracks/track_processor.py
def __init__(
    self,
    ap_client: AppleScriptClientProtocol,
    cache_service: CacheServiceProtocol,
    *,
    library_snapshot_service: LibrarySnapshotServiceProtocol | None = None,
    console_logger: logging.Logger,
    error_logger: logging.Logger,
    config: AppConfig,
    analytics: AnalyticsProtocol,
    dry_run: bool = False,
    security_validator: SecurityValidator | None = None,
) -> None:
    """Initialize the TrackProcessor.

    Args:
        ap_client: AppleScript client for Music.app communication
        cache_service: Cache service for storing track data
        library_snapshot_service: Optional library snapshot service for cached snapshots
        console_logger: Logger for console output
        error_logger: Logger for error messages
        config: Typed application configuration
        analytics: Service for tracking method calls
        dry_run: Whether to run in dry-run mode
        security_validator: Optional security validator for input sanitization

    """
    self.ap_client = ap_client
    self.cache_service = cache_service
    self.snapshot_service = library_snapshot_service
    self.console_logger = console_logger
    self.error_logger = error_logger
    self.config = config
    self.analytics = analytics
    self.dry_run = dry_run
    self._dry_run_actions: list[dict[str, Any]] = []
    # Use the provided validator or create a default one for backward compatibility
    self.security_validator = security_validator or SecurityValidator(error_logger)
    # Dry run context from MusicUpdater
    self.dry_run_mode: str = ""
    self.dry_run_test_artists: set[str] = set()
    self.artist_renamer: ArtistRenamer | None = None

    # Initialize cache manager for snapshot/cache operations
    self.cache_manager = TrackCacheManager(
        cache_service=cache_service,
        snapshot_service=library_snapshot_service,
        console_logger=console_logger,
        current_time_func=self._current_time,
    )

    # Initialize update executor for track update operations
    self.update_executor: TrackUpdateExecutor = TrackUpdateExecutor(
        ap_client=ap_client,
        cache_service=cache_service,
        security_validator=self.security_validator,
        config=config,
        console_logger=console_logger,
        error_logger=error_logger,
        analytics=analytics,
        dry_run=dry_run,
    )

    # Initialize batch fetcher for large library processing
    self.batch_fetcher: BatchTrackFetcher = BatchTrackFetcher(
        ap_client=ap_client,
        cache_service=cache_service,
        console_logger=console_logger,
        error_logger=error_logger,
        config=config,
        track_validator=self._validate_tracks_security,
        artist_processor=self._apply_artist_renames,
        snapshot_loader=self._load_tracks_from_snapshot,
        snapshot_persister=self._update_snapshot,
        can_use_snapshot=self._can_use_snapshot,
        dry_run=dry_run,
        analytics=analytics,
    )

set_dry_run_context

set_dry_run_context(mode, test_artists)

Set dry run context for test mode filtering.

Parameters:

Name Type Description Default
mode str

Dry run mode ('test' or other)

required
test_artists set[str]

Set of test artists for filtering

required
Source code in src/core/tracks/track_processor.py
def set_dry_run_context(self, mode: str, test_artists: set[str]) -> None:
    """Set dry run context for test mode filtering.

    Args:
        mode: Dry run mode ('test' or other)
        test_artists: Set of test artists for filtering

    """
    self.dry_run_mode = mode
    self.dry_run_test_artists = test_artists

set_artist_renamer

set_artist_renamer(renamer)

Attach artist renamer service for automatic post-fetch processing.

Source code in src/core/tracks/track_processor.py
def set_artist_renamer(self, renamer: ArtistRenamer) -> None:
    """Attach artist renamer service for automatic post-fetch processing."""
    self.artist_renamer = renamer

fetch_tracks_by_ids async

fetch_tracks_by_ids(track_ids)

Fetch detailed track metadata for the provided track IDs.

Source code in src/core/tracks/track_processor.py
@track_instance_method("track_fetch_by_ids")
async def fetch_tracks_by_ids(self, track_ids: list[str]) -> list[TrackDict]:
    """Fetch detailed track metadata for the provided track IDs."""

    if not track_ids:
        return []

    batch_size = min(max(self.config.batch_processing.ids_batch_size, 1), 1000)

    # Use dedicated timeout for ID-based batch fetch (default 120s = 2 min per batch)
    # This is much shorter than full_library_fetch because we're fetching by specific IDs
    timeout = self.config.applescript_timeouts.ids_batch_fetch

    collected: list[TrackDict] = []
    total_batches = (len(track_ids) + batch_size - 1) // batch_size

    for i in range(0, len(track_ids), batch_size):
        batch = track_ids[i : i + batch_size]
        batch_num = i // batch_size + 1
        ids_param = ",".join(batch)

        raw_output = await self.ap_client.run_script(
            FETCH_TRACKS_BY_IDS,
            [ids_param],
            timeout=timeout,
            label=f"{FETCH_TRACKS_BY_IDS} [{batch_num}/{total_batches}]",
        )

        if not raw_output:
            continue

        parsed_tracks = parse_tracks(raw_output, self.error_logger)
        validated_tracks = self._validate_tracks_security(parsed_tracks)
        await self._apply_artist_renames(validated_tracks)
        collected.extend(validated_tracks)

    return collected

fetch_tracks_async async

fetch_tracks_async(
    artist=None,
    force_refresh=False,
    dry_run_test_tracks=None,
    ignore_test_filter=False,
)

Fetch tracks from cache or Music.app with caching.

Parameters:

Name Type Description Default
artist str | None

Optional artist filter

None
force_refresh bool

Force refresh from Music.app

False
dry_run_test_tracks list[TrackDict] | None

Test tracks for dry-run mode

None
ignore_test_filter bool

Whether to ignore test_artists configuration

False

Returns:

Type Description
list[TrackDict]

List of track dictionaries

Source code in src/core/tracks/track_processor.py
@track_instance_method("track_fetch_all")
async def fetch_tracks_async(
    self,
    artist: str | None = None,
    force_refresh: bool = False,
    dry_run_test_tracks: list[TrackDict] | None = None,
    ignore_test_filter: bool = False,
) -> list[TrackDict]:
    """Fetch tracks from cache or Music.app with caching.

    Args:
        artist: Optional artist filter
        force_refresh: Force refresh from Music.app
        dry_run_test_tracks: Test tracks for dry-run mode
        ignore_test_filter: Whether to ignore test_artists configuration

    Returns:
        List of track dictionaries

    """
    # Handle dry-run test mode
    if dry_run_test_tracks is not None:
        self.console_logger.info("DRY RUN: Using %d test tracks", len(dry_run_test_tracks))
        return dry_run_test_tracks

    # Generate cache key
    cache_key = f"tracks_{artist}" if artist else "tracks_all"

    use_snapshot = self._can_use_snapshot(artist)
    result: list[TrackDict] | None = None

    if not force_refresh:
        result = await self._materialize_cached_tracks(cache_key, artist)

    if result is None:
        result = await self._try_fetch_test_tracks(force_refresh, ignore_test_filter, artist)

    if result is None:
        result = await self._try_fetch_snapshot_tracks(cache_key, use_snapshot, force_refresh)

    if result is None:
        tracks = await self._fetch_tracks_from_applescript(artist=artist)

        if use_snapshot and tracks and not self.dry_run:
            await self._update_snapshot(tracks, [track.id for track in tracks])

        if tracks:
            await self.cache_service.set_async(cache_key, tracks)
            self.console_logger.info("Cached %d tracks for key: %s", len(tracks), cache_key)

        result = tracks

    if not result:
        self.console_logger.warning(
            "No tracks fetched for key: %s. This may indicate an empty library or an upstream issue.",
            cache_key,
        )

    return result or []

fetch_tracks_in_batches async

fetch_tracks_in_batches(
    batch_size=500, *, skip_snapshot_check=False
)

Fetch all tracks from Music.app in batches to avoid timeout.

Delegates to BatchTrackFetcher for batch-based fetching with snapshot support.

Parameters:

Name Type Description Default
batch_size int

Number of tracks to fetch per batch

500
skip_snapshot_check bool

Skip snapshot validation (used when already validated upstream)

False

Returns:

Type Description
list[TrackDict]

List of all track dictionaries

Source code in src/core/tracks/track_processor.py
@track_instance_method("track_fetch_batches")
async def fetch_tracks_in_batches(
    self,
    batch_size: int = 500,
    *,
    skip_snapshot_check: bool = False,
) -> list[TrackDict]:
    """Fetch all tracks from Music.app in batches to avoid timeout.

    Delegates to BatchTrackFetcher for batch-based fetching with snapshot support.

    Args:
        batch_size: Number of tracks to fetch per batch
        skip_snapshot_check: Skip snapshot validation (used when already validated upstream)

    Returns:
        List of all track dictionaries
    """
    return await self.batch_fetcher.fetch_all_tracks(
        batch_size,
        skip_snapshot_check=skip_snapshot_check,
    )

update_track_async async

update_track_async(
    track_id,
    new_track_name=None,
    new_album_name=None,
    new_genre=None,
    new_year=None,
    track_status=None,
    original_artist=None,
    original_album=None,
    original_track=None,
)

Update multiple properties of a track.

Delegates to TrackUpdateExecutor.

Parameters:

Name Type Description Default
track_id str

ID of the track to update

required
new_track_name str | None

New track name (optional)

None
new_album_name str | None

New album name (optional)

None
new_genre str | None

New genre (optional)

None
new_year str | None

New year (optional)

None
track_status str | None

Track status to check for prerelease (optional)

None
original_artist str | None

Original artist name for contextual logging (optional)

None
original_album str | None

Original album name for contextual logging (optional)

None
original_track str | None

Original track name for contextual logging (optional)

None

Returns:

Type Description
bool

True if all updates are successful, False if any failed

Source code in src/core/tracks/track_processor.py
async def update_track_async(
    self,
    track_id: str,
    new_track_name: str | None = None,
    new_album_name: str | None = None,
    new_genre: str | None = None,
    new_year: str | None = None,
    track_status: str | None = None,
    original_artist: str | None = None,
    original_album: str | None = None,
    original_track: str | None = None,
) -> bool:
    """Update multiple properties of a track.

    Delegates to TrackUpdateExecutor.

    Args:
        track_id: ID of the track to update
        new_track_name: New track name (optional)
        new_album_name: New album name (optional)
        new_genre: New genre (optional)
        new_year: New year (optional)
        track_status: Track status to check for prerelease (optional)
        original_artist: Original artist name for contextual logging (optional)
        original_album: Original album name for contextual logging (optional)
        original_track: Original track name for contextual logging (optional)

    Returns:
        True if all updates are successful, False if any failed
    """
    result: bool = await self.update_executor.update_track_async(
        track_id=track_id,
        new_track_name=new_track_name,
        new_album_name=new_album_name,
        new_genre=new_genre,
        new_year=new_year,
        track_status=track_status,
        original_artist=original_artist,
        original_album=original_album,
        original_track=original_track,
    )
    return result

update_artist_async async

update_artist_async(
    track,
    new_artist_name,
    *,
    original_artist=None,
    update_album_artist=False
)

Update the artist name for a track.

Delegates to TrackUpdateExecutor.

Parameters:

Name Type Description Default
track TrackDict

Track dictionary representing the target track

required
new_artist_name str

Artist name to apply

required
original_artist str | None

Original artist for logging context (optional)

None
update_album_artist bool

If True, also update album_artist field

False

Returns:

Type Description
bool

True if update succeeded, False otherwise

Source code in src/core/tracks/track_processor.py
async def update_artist_async(
    self,
    track: TrackDict,
    new_artist_name: str,
    *,
    original_artist: str | None = None,
    update_album_artist: bool = False,
) -> bool:
    """Update the artist name for a track.

    Delegates to TrackUpdateExecutor.

    Args:
        track: Track dictionary representing the target track
        new_artist_name: Artist name to apply
        original_artist: Original artist for logging context (optional)
        update_album_artist: If True, also update album_artist field

    Returns:
        True if update succeeded, False otherwise
    """
    result: bool = await self.update_executor.update_artist_async(
        track=track,
        new_artist_name=new_artist_name,
        original_artist=original_artist,
        update_album_artist=update_album_artist,
    )
    return result

get_dry_run_actions

get_dry_run_actions()

Get the list of dry-run actions recorded.

Delegates to TrackUpdateExecutor.

Returns:

Type Description
list[dict[str, Any]]

List of dry-run action dictionaries

Source code in src/core/tracks/track_processor.py
def get_dry_run_actions(self) -> list[dict[str, Any]]:
    """Get the list of dry-run actions recorded.

    Delegates to TrackUpdateExecutor.

    Returns:
        List of dry-run action dictionaries
    """
    return self.update_executor.get_dry_run_actions()