Skip to content

genre_manager

Genre management functionality for Music Genre Updater.

This module handles determining dominant genres for artists and updating track genres accordingly.

GenreManager

GenreManager(
    track_processor,
    console_logger,
    error_logger,
    analytics,
    config,
    dry_run=False,
)

Bases: BaseProcessor

Manages genre determination and updates for tracks.

Initialize the GenreManager.

Parameters:

Name Type Description Default
track_processor TrackProcessor

Track processor for updating tracks

required
console_logger Logger

Logger for console output

required
error_logger Logger

Logger for error messages

required
analytics AnalyticsProtocol

Service for performance tracking

required
config AppConfig

Typed application configuration

required
dry_run bool

Whether to run in dry-run mode

False
Source code in src/core/tracks/genre_manager.py
def __init__(
    self,
    track_processor: TrackProcessor,
    console_logger: logging.Logger,
    error_logger: logging.Logger,
    analytics: AnalyticsProtocol,
    config: AppConfig,
    dry_run: bool = False,
) -> None:
    """Initialize the GenreManager.

    Args:
        track_processor: Track processor for updating tracks
        console_logger: Logger for console output
        error_logger: Logger for error messages
        analytics: Service for performance tracking
        config: Typed application configuration
        dry_run: Whether to run in dry-run mode

    """
    super().__init__(console_logger, error_logger, analytics, config, dry_run)
    self.track_processor = track_processor

is_missing_or_unknown_genre staticmethod

is_missing_or_unknown_genre(track)

Check if track has missing or unknown genre.

Parameters:

Name Type Description Default
track TrackDict

Track to check

required

Returns:

Type Description
bool

True if genre is missing, empty, or 'unknown'

Source code in src/core/tracks/genre_manager.py
@staticmethod
def is_missing_or_unknown_genre(track: TrackDict) -> bool:
    """Check if track has missing or unknown genre.

    Args:
        track: Track to check

    Returns:
        True if genre is missing, empty, or 'unknown'

    """
    return _is_missing_or_unknown_genre(track)

parse_date_added staticmethod

parse_date_added(track)

Parse track's date_added field to datetime.

Parameters:

Name Type Description Default
track TrackDict

Track with date_added field

required

Returns:

Type Description
datetime | None

Parsed datetime with UTC timezone, or None if parsing fails

Source code in src/core/tracks/genre_manager.py
@staticmethod
def parse_date_added(track: TrackDict) -> datetime | None:
    """Parse track's date_added field to datetime.

    Args:
        track: Track with date_added field

    Returns:
        Parsed datetime with UTC timezone, or None if parsing fails

    """
    return _parse_track_date_added(track)

filter_tracks_for_incremental_update

filter_tracks_for_incremental_update(tracks, last_run_time)

Filter tracks to only include those added since the last run.

Parameters:

Name Type Description Default
tracks list[TrackDict]

All tracks

required
last_run_time datetime | None

Time of last incremental run

required

Returns:

Type Description
list[TrackDict]

Filtered list of tracks added since last run

Source code in src/core/tracks/genre_manager.py
def filter_tracks_for_incremental_update(
    self,
    tracks: list[TrackDict],
    last_run_time: datetime | None,
) -> list[TrackDict]:
    """Filter tracks to only include those added since the last run.

    Args:
        tracks: All tracks
        last_run_time: Time of last incremental run

    Returns:
        Filtered list of tracks added since last run

    """
    if last_run_time is None:
        self.console_logger.info("No last run time found, processing all %d tracks", len(tracks))
        return tracks

    new_tracks: list[TrackDict] = []
    missing_genre_tracks: list[TrackDict] = []

    for track in tracks:
        # Always include tracks with empty/unknown genre to repair metadata
        if self.is_missing_or_unknown_genre(track):
            missing_genre_tracks.append(track)

        # Include if added after last run
        date_added = self.parse_date_added(track)
        if date_added and date_added > last_run_time:
            new_tracks.append(track)

    # Deduplicate by track id, prioritizing new_tracks entries
    # Use itertools.chain to avoid memory overhead of list concatenation
    seen: set[str] = set()
    combined: list[TrackDict] = []
    for track in itertools.chain(new_tracks, missing_genre_tracks):
        track_id = str(track.id or "")
        # Check for missing or empty ID (but allow '0' which is falsy but valid)
        if not track_id or track_id in seen:
            continue
        seen.add(track_id)
        combined.append(track)

    self.console_logger.info(
        "Found %d new tracks since %s; including %d with missing/unknown genre (combined %d)",
        len(new_tracks),
        last_run_time.strftime("%Y-%m-%d %H:%M:%S"),
        len(missing_genre_tracks),
        len(combined),
    )
    return combined

process_batch_results staticmethod

process_batch_results(
    batch_results, updated_tracks, change_logs
)

Process batch results and update collections.

Parameters:

Name Type Description Default
batch_results list[Any]

Results from batch processing

required
updated_tracks list[TrackDict]

List to append updated tracks to

required
change_logs list[ChangeLogEntry]

List to append change logs to

required
Source code in src/core/tracks/genre_manager.py
@staticmethod
def process_batch_results(batch_results: list[Any], updated_tracks: list[TrackDict], change_logs: list[ChangeLogEntry]) -> None:
    """Process batch results and update collections.

    Args:
        batch_results: Results from batch processing
        updated_tracks: List to append updated tracks to
        change_logs: List to append change logs to

    """
    for result in batch_results:
        if result and result[0]:  # (updated_track, change_log)
            updated_tracks.append(result[0])
            if result[1]:
                change_logs.append(result[1])

update_genres_by_artist_async async

update_genres_by_artist_async(
    tracks, last_run_time=None, force=False, fresh=False
)

Update genres for all tracks, grouped by artist.

Uses two-level concurrency control: 1. Artist-level: concurrent_limit (default 5) - how many artists process in parallel 2. AppleScript-level: apple_script_concurrency (default 2) - global limit on concurrent AppleScript operations across ALL artists to prevent overwhelming Music.app

Parameters:

Name Type Description Default
tracks list[TrackDict]

All tracks to process

required
last_run_time datetime | None

Time of last run for incremental updates

None
force bool

Force update all tracks (bypass incremental filtering)

False
fresh bool

Fresh mode - implies force=True. For genres, equivalent to force since there's no cache to invalidate (included for API consistency)

False

Returns:

Type Description
tuple[list[TrackDict], list[ChangeLogEntry]]

Tuple of (all_updated_tracks, all_change_logs)

Source code in src/core/tracks/genre_manager.py
async def update_genres_by_artist_async(
    self,
    tracks: list[TrackDict],
    last_run_time: datetime | None = None,
    force: bool = False,
    fresh: bool = False,
) -> tuple[list[TrackDict], list[ChangeLogEntry]]:
    """Update genres for all tracks, grouped by artist.

    Uses two-level concurrency control:
    1. Artist-level: concurrent_limit (default 5) - how many artists process in parallel
    2. AppleScript-level: apple_script_concurrency (default 2) - global limit on
       concurrent AppleScript operations across ALL artists to prevent overwhelming
       Music.app

    Args:
        tracks: All tracks to process
        last_run_time: Time of last run for incremental updates
        force: Force update all tracks (bypass incremental filtering)
        fresh: Fresh mode - implies force=True. For genres, equivalent to force
            since there's no cache to invalidate (included for API consistency)

    Returns:
        Tuple of (all_updated_tracks, all_change_logs)

    """
    # fresh implies force (no cache to invalidate for genres)
    if fresh:
        force = True
        self.console_logger.info("Fresh mode: forcing genre recalculation for all tracks")

    # Group all tracks by artist (we will compute per-artist dominant on full set,
    # and then choose per-track updates incrementally)
    grouped_tracks = group_tracks_by_artist(tracks)

    if not grouped_tracks:
        self.console_logger.info("No tracks to process for genre updates")
        return [], []

    self.console_logger.info(
        "Processing genres for %d artists with %d total tracks",
        len(grouped_tracks),
        len(tracks),
    )

    # Process each artist
    all_updated_tracks: list[TrackDict] = []
    all_change_logs: list[ChangeLogEntry] = []

    # Two-level concurrency control:
    # 1. Artist-level semaphore: limits concurrent artist processing
    concurrent_limit = self.config.genre_update.concurrent_limit
    artist_semaphore = asyncio.Semaphore(concurrent_limit)

    # 2. AppleScript-level semaphore: GLOBAL limit on concurrent AppleScript calls
    #    Shared across all artists to prevent overwhelming Music.app
    applescript_concurrency = self.config.apple_script_concurrency
    applescript_semaphore = asyncio.Semaphore(applescript_concurrency)

    # Create tasks for all artists via a thin wrapper to reduce complexity here
    artist_tasks: list[Any] = []
    for artist_name, artist_tracks in grouped_tracks.items():
        task = asyncio.create_task(
            self._process_single_artist_wrapper(
                artist_name=artist_name,
                artist_tracks=artist_tracks,
                last_run=last_run_time,
                force=force,
                artist_semaphore=artist_semaphore,
                applescript_semaphore=applescript_semaphore,
            )
        )
        artist_tasks.append(task)

    # Process all artists
    results = await self._gather_with_error_handling(artist_tasks, "Artist processing")

    # Aggregate results
    for updated_tracks, change_logs in results:
        all_updated_tracks.extend(updated_tracks)
        all_change_logs.extend(change_logs)

    # Summary
    self.console_logger.info(
        "Genre update complete: %d tracks updated across %d artists",
        len(all_updated_tracks),
        len(grouped_tracks),
    )

    return all_updated_tracks, all_change_logs

deduplicate_tracks_by_id staticmethod

deduplicate_tracks_by_id(tracks)

Remove duplicate tracks based on track ID.

Parameters:

Name Type Description Default
tracks list[TrackDict]

List of tracks that may contain duplicates

required

Returns:

Type Description
list[TrackDict]

List of unique tracks without duplicates

Source code in src/core/tracks/genre_manager.py
@staticmethod
def deduplicate_tracks_by_id(tracks: list[TrackDict]) -> list[TrackDict]:
    """Remove duplicate tracks based on track ID.

    Args:
        tracks: List of tracks that may contain duplicates

    Returns:
        List of unique tracks without duplicates
    """
    seen_ids: set[str] = set()
    unique: list[TrackDict] = []
    for track in tracks:
        track_id = track.id or ""
        if not track_id or track_id in seen_ids:
            continue
        seen_ids.add(track_id)
        unique.append(track)
    return unique

get_dry_run_actions

get_dry_run_actions()

Get the list of dry-run actions recorded.

Returns:

Type Description
list[dict[str, Any]]

List of dry-run action dictionaries

Source code in src/core/tracks/genre_manager.py
def get_dry_run_actions(self) -> list[dict[str, Any]]:
    """Get the list of dry-run actions recorded.

    Returns:
        List of dry-run action dictionaries

    """
    return self._dry_run_actions

test_update_track_genre async

test_update_track_genre(track, new_genre, force_update)

Test-only access to _update_track_genre method.

Source code in src/core/tracks/genre_manager.py
async def test_update_track_genre(
    self,
    track: TrackDict,
    new_genre: str,
    force_update: bool,
) -> tuple[TrackDict | None, ChangeLogEntry | None]:
    """Test-only access to _update_track_genre method."""
    return await self._update_track_genre(track, new_genre, force_update)

test_gather_with_error_handling async

test_gather_with_error_handling(tasks, operation_name)

Test-only access to _gather_with_error_handling method.

Source code in src/core/tracks/genre_manager.py
async def test_gather_with_error_handling(
    self,
    tasks: list[Any],
    operation_name: str,
) -> list[Any]:
    """Test-only access to _gather_with_error_handling method."""
    return await self._gather_with_error_handling(tasks, operation_name)

test_filter_tracks_for_update

test_filter_tracks_for_update(
    artist_tracks, last_run, force_flag, dominant_genre
)

Test-only access to _filter_tracks_for_update method.

Source code in src/core/tracks/genre_manager.py
def test_filter_tracks_for_update(
    self,
    artist_tracks: list[TrackDict],
    last_run: datetime | None,
    force_flag: bool,
    dominant_genre: str | None,
) -> list[TrackDict]:
    """Test-only access to _filter_tracks_for_update method."""
    return self._filter_tracks_for_update(artist_tracks, last_run, force_flag, dominant_genre)

test_select_tracks_to_update_for_artist

test_select_tracks_to_update_for_artist(
    artist_tracks, last_run, force_flag, dominant_genre
)

Test-only access to _select_tracks_to_update_for_artist method.

Source code in src/core/tracks/genre_manager.py
def test_select_tracks_to_update_for_artist(
    self,
    artist_tracks: list[TrackDict],
    last_run: datetime | None,
    force_flag: bool,
    dominant_genre: str | None,
) -> list[TrackDict]:
    """Test-only access to _select_tracks_to_update_for_artist method."""
    return self._select_tracks_to_update_for_artist(artist_tracks, last_run, force_flag, dominant_genre)

test_process_artist_genres async

test_process_artist_genres(
    artist_name,
    all_artist_tracks,
    force_update,
    applescript_semaphore,
    tracks_to_update=None,
)

Test-only access to _process_artist_genres method.

Parameters:

Name Type Description Default
artist_name str

Name of the artist.

required
all_artist_tracks list[TrackDict]

All tracks by this artist.

required
force_update bool

Whether to force update all tracks.

required
applescript_semaphore Semaphore

Semaphore for AppleScript concurrency control.

required
tracks_to_update list[TrackDict] | None

Specific tracks to update.

None

Returns:

Type Description
tuple[list[TrackDict], list[ChangeLogEntry]]

Tuple of (updated_tracks, change_logs).

Source code in src/core/tracks/genre_manager.py
async def test_process_artist_genres(
    self,
    artist_name: str,
    all_artist_tracks: list[TrackDict],
    force_update: bool,
    applescript_semaphore: asyncio.Semaphore,
    tracks_to_update: list[TrackDict] | None = None,
) -> tuple[list[TrackDict], list[ChangeLogEntry]]:
    """Test-only access to _process_artist_genres method.

    Args:
        artist_name: Name of the artist.
        all_artist_tracks: All tracks by this artist.
        force_update: Whether to force update all tracks.
        applescript_semaphore: Semaphore for AppleScript concurrency control.
        tracks_to_update: Specific tracks to update.

    Returns:
        Tuple of (updated_tracks, change_logs).
    """
    return await self._process_artist_genres(artist_name, all_artist_tracks, force_update, applescript_semaphore, tracks_to_update)

test_process_single_artist_wrapper async

test_process_single_artist_wrapper(
    artist_name,
    artist_tracks,
    last_run,
    force,
    artist_semaphore,
    applescript_semaphore=None,
)

Test-only access to _process_single_artist_wrapper method.

Parameters:

Name Type Description Default
artist_name str

The artist name.

required
artist_tracks list[TrackDict]

All tracks for the artist.

required
last_run datetime | None

Last run timestamp for incremental logic.

required
force bool

Force updates regardless of filters.

required
artist_semaphore Semaphore

Concurrency guard for artist-level processing.

required
applescript_semaphore Semaphore | None

Global semaphore for AppleScript concurrency control. If None, creates a default semaphore for backward compatibility.

None

Returns:

Type Description
tuple[list[TrackDict], list[ChangeLogEntry]]

Tuple of (updated_tracks, change_logs) for this artist.

Source code in src/core/tracks/genre_manager.py
async def test_process_single_artist_wrapper(
    self,
    artist_name: str,
    artist_tracks: list[TrackDict],
    last_run: datetime | None,
    force: bool,
    artist_semaphore: asyncio.Semaphore,
    applescript_semaphore: asyncio.Semaphore | None = None,
) -> tuple[list[TrackDict], list[ChangeLogEntry]]:
    """Test-only access to _process_single_artist_wrapper method.

    Args:
        artist_name: The artist name.
        artist_tracks: All tracks for the artist.
        last_run: Last run timestamp for incremental logic.
        force: Force updates regardless of filters.
        artist_semaphore: Concurrency guard for artist-level processing.
        applescript_semaphore: Global semaphore for AppleScript concurrency control.
            If None, creates a default semaphore for backward compatibility.

    Returns:
        Tuple of (updated_tracks, change_logs) for this artist.
    """
    if applescript_semaphore is None:
        applescript_semaphore = asyncio.Semaphore(self.config.apple_script_concurrency)
    return await self._process_single_artist_wrapper(artist_name, artist_tracks, last_run, force, artist_semaphore, applescript_semaphore)