Skip to content

pipeline_snapshot

Pipeline Snapshot Manager.

Manages track snapshots during pipeline execution for efficient delta processing.

PipelineSnapshotManager

PipelineSnapshotManager(track_processor, console_logger)

Manages track snapshots during pipeline execution.

Maintains an in-memory cache of tracks for efficient updates during the pipeline run, avoiding repeated fetches from Music.app.

Initialize the snapshot manager.

Parameters:

Name Type Description Default
track_processor TrackProcessor

Processor for fetching tracks by ID.

required
console_logger Logger

Logger for console output.

required
Source code in src/app/pipeline_snapshot.py
def __init__(
    self,
    track_processor: TrackProcessor,
    console_logger: logging.Logger,
) -> None:
    """Initialize the snapshot manager.

    Args:
        track_processor: Processor for fetching tracks by ID.
        console_logger: Logger for console output.
    """
    self._track_processor = track_processor
    self._console_logger = console_logger
    self._tracks_snapshot: list[TrackDict] | None = None
    self._tracks_index: dict[str, TrackDict] = {}
    self._captured_library_mtime: datetime | None = None

reset

reset()

Reset cached pipeline tracks before a fresh run.

Source code in src/app/pipeline_snapshot.py
def reset(self) -> None:
    """Reset cached pipeline tracks before a fresh run."""
    self._reset_state(None, None)

set_snapshot

set_snapshot(tracks, *, library_mtime=None)

Store the current pipeline track snapshot for downstream reuse.

Parameters:

Name Type Description Default
tracks list[TrackDict]

List of tracks to cache.

required
library_mtime datetime | None

Library modification time captured BEFORE fetching tracks. This prevents race conditions where new tracks added during fetch would be missed but metadata would incorrectly show library as unchanged.

None
Source code in src/app/pipeline_snapshot.py
def set_snapshot(
    self,
    tracks: list[TrackDict],
    *,
    library_mtime: datetime | None = None,
) -> None:
    """Store the current pipeline track snapshot for downstream reuse.

    Args:
        tracks: List of tracks to cache.
        library_mtime: Library modification time captured BEFORE fetching tracks.
            This prevents race conditions where new tracks added during fetch
            would be missed but metadata would incorrectly show library as unchanged.
    """
    self._reset_state(tracks, library_mtime)
    for track in tracks:
        if track_id := str(track.get("id", "")):
            self._tracks_index[track_id] = track

update_tracks

update_tracks(updated_tracks)

Apply field updates from processed tracks to the cached snapshot.

Source code in src/app/pipeline_snapshot.py
def update_tracks(self, updated_tracks: Iterable[TrackDict]) -> None:
    """Apply field updates from processed tracks to the cached snapshot."""
    if not self._tracks_index:
        return

    for updated in updated_tracks:
        track_id = str(updated.get("id", ""))
        if not track_id:
            continue

        current_track = self._tracks_index.get(track_id)
        if current_track is None:
            continue

        for field, value in updated.model_dump().items():
            try:
                setattr(current_track, field, value)
            except (AttributeError, TypeError, ValueError):
                object.__setattr__(current_track, field, value)

get_snapshot

get_snapshot()

Return the currently cached pipeline track snapshot.

Source code in src/app/pipeline_snapshot.py
def get_snapshot(self) -> list[TrackDict] | None:
    """Return the currently cached pipeline track snapshot."""
    return self._tracks_snapshot

clear

clear()

Release cached pipeline track data after finishing the run.

Source code in src/app/pipeline_snapshot.py
def clear(self) -> None:
    """Release cached pipeline track data after finishing the run."""
    self._reset_state(None, None)

persist_to_disk async

persist_to_disk()

Persist the current in-memory snapshot to disk.

This should be called at the end of the pipeline to ensure that genre/year changes are reflected in the disk snapshot.

Returns:

Type Description
bool

True if snapshot was persisted, False if no snapshot available.

Source code in src/app/pipeline_snapshot.py
async def persist_to_disk(self) -> bool:
    """Persist the current in-memory snapshot to disk.

    This should be called at the end of the pipeline to ensure
    that genre/year changes are reflected in the disk snapshot.

    Returns:
        True if snapshot was persisted, False if no snapshot available.
    """
    if self._tracks_snapshot is None:
        self._console_logger.debug("No snapshot to persist")
        return False

    try:
        await self._track_processor.cache_manager.update_snapshot(
            self._tracks_snapshot,
            processed_track_ids=[str(t.id) for t in self._tracks_snapshot if t.id],
            library_mtime_override=self._captured_library_mtime,
        )
        self._console_logger.info(
            "Persisted pipeline snapshot to disk (%d tracks)",
            len(self._tracks_snapshot),
        )
        return True
    except (OSError, TypeError, ValueError) as exc:
        self._console_logger.warning("Failed to persist snapshot: %s", exc)
        return False

merge_smart_delta async

merge_smart_delta(snapshot_tracks, delta)

Merge snapshot with Smart Delta changes.

Parameters:

Name Type Description Default
snapshot_tracks list[TrackDict]

Current snapshot tracks.

required
delta TrackDelta

Delta containing new, updated, and removed track IDs.

required

Returns:

Type Description
list[TrackDict] | None

Updated list of tracks if successful, otherwise None to indicate

list[TrackDict] | None

fallback should be used.

Source code in src/app/pipeline_snapshot.py
async def merge_smart_delta(
    self,
    snapshot_tracks: list[TrackDict],
    delta: TrackDelta,
) -> list[TrackDict] | None:
    """Merge snapshot with Smart Delta changes.

    Args:
        snapshot_tracks: Current snapshot tracks.
        delta: Delta containing new, updated, and removed track IDs.

    Returns:
        Updated list of tracks if successful, otherwise None to indicate
        fallback should be used.
    """
    removed_ids = {str(track_id) for track_id in delta.removed_ids if track_id}
    updated_ids = [str(track_id) for track_id in delta.updated_ids if track_id]
    new_ids = [str(track_id) for track_id in delta.new_ids if track_id]

    # Fetch updated tracks first (these MUST exist - they're in both snapshot and current)
    # Then fetch new tracks (may have been deleted between ID fetch and now)
    fetch_order: list[str] = list(dict.fromkeys(updated_ids + new_ids))

    fetched_map: dict[str, TrackDict]
    if fetch_order:
        fetched_tracks = await self._track_processor.fetch_tracks_by_ids(fetch_order)
        fetched_map = {str(track.id): track for track in fetched_tracks}

        if missing_updated := [track_id for track_id in updated_ids if track_id not in fetched_map]:
            self._console_logger.warning(
                "Smart Delta missing %d updated tracks (%s); falling back to batch scan",
                len(missing_updated),
                ", ".join(missing_updated[:5]),
            )
            return None

        if missing_new := [track_id for track_id in new_ids if track_id not in fetched_map]:
            self._console_logger.info(
                "Smart Delta: %d new tracks no longer exist (deleted?), skipping them",
                len(missing_new),
            )
    else:
        fetched_map = {}

    merged_tracks: list[TrackDict] = []
    seen_ids: set[str] = set()

    for track in snapshot_tracks:
        track_id = str(track.id)
        if track_id in removed_ids:
            continue
        merged_tracks.append(fetched_map.get(track_id, track))
        seen_ids.add(track_id)

    for track_id, track in fetched_map.items():
        if track_id not in seen_ids:
            merged_tracks.append(track)
            seen_ids.add(track_id)

    self._console_logger.info(
        "Smart Delta merged: %d updated, %d new, %d removed tracks",
        len(updated_ids),
        len(new_ids),
        len(removed_ids),
    )

    return merged_tracks