Skip to content

applescript_client

AppleScript Client Module.

This module provides an abstraction for executing AppleScript commands asynchronously. It centralizes the logic for interacting with AppleScript via the osascript command, handles errors, applies concurrency limits via semaphore-based control, and ensures non-blocking execution.

AppleScriptClient

AppleScriptClient(
    config,
    analytics,
    console_logger=None,
    error_logger=None,
    retry_handler=None,
)

Bases: AppleScriptClientProtocol

A client to run AppleScript commands asynchronously using the osascript command.

Semaphore initialization is done in the async initialize method.

Initialize the AppleScript client.

Source code in src/services/apple/applescript_client.py
def __init__(
    self,
    config: AppConfig,
    analytics: Analytics,
    console_logger: logging.Logger | None = None,
    error_logger: logging.Logger | None = None,
    retry_handler: DatabaseRetryHandler | None = None,
) -> None:
    """Initialize the AppleScript client."""
    self.config = config
    self.analytics = analytics
    self.console_logger = console_logger if console_logger is not None else logging.getLogger(__name__)
    self.error_logger = error_logger if error_logger is not None else self.console_logger

    self.apple_scripts_dir = config.apple_scripts_dir
    if not self.apple_scripts_dir:
        self.error_logger.critical("Configuration error: 'apple_scripts_dir' is empty or not set.")

    # Semaphore and rate limiter are initialized in the async initialize method
    self.semaphore: asyncio.Semaphore | None = None
    self.rate_limiter: AppleScriptRateLimiter | None = None

    # Initialize the security sanitizer
    self.sanitizer = AppleScriptSanitizer(self.console_logger)

    # Initialize the file validator
    self.file_validator = AppleScriptFileValidator(
        self.apple_scripts_dir,
        self.error_logger,
        self.console_logger,
    )

    # Initialize the executor (semaphore will be set in initialize())
    self.executor = AppleScriptExecutor(
        semaphore=None,
        apple_scripts_directory=self.apple_scripts_dir,
        console_logger=self.console_logger,
        error_logger=self.error_logger,
        retry_handler=retry_handler,
    )

initialize async

initialize()

Asynchronously initializes the AppleScriptClient by creating the semaphore.

Must be called within an active event loop.

Source code in src/services/apple/applescript_client.py
async def initialize(self) -> None:
    """Asynchronously initializes the AppleScriptClient by creating the semaphore.

    Must be called within an active event loop.
    """
    if self.apple_scripts_dir is None:
        error_msg = "AppleScript directory is not set"
        self.error_logger.critical(error_msg)
        raise ValueError(error_msg)

    self.console_logger.debug("AppleScript directory: %s", self.apple_scripts_dir)

    # Check if the directory exists and is accessible
    if not Path(self.apple_scripts_dir).is_dir():
        error_msg = f"AppleScript directory not accessible: {self.apple_scripts_dir}"
        self.error_logger.critical(error_msg)
        raise FileNotFoundError(error_msg)

    # Count and validate scripts
    try:
        scripts_path = Path(self.apple_scripts_dir)
        scripts: list[str] = [f.name for f in scripts_path.iterdir() if f.name.endswith((".applescript", ".scpt"))]

        # Check for required scripts
        required_scripts: list[str] = [
            UPDATE_PROPERTY,
            FETCH_TRACKS,
        ]
        if missing_scripts := [script for script in required_scripts if not (Path(self.apple_scripts_dir) / script).exists()]:
            self.error_logger.warning("Missing required AppleScripts: %s", "', '".join(missing_scripts))

    except OSError as e:
        self.console_logger.warning("Could not list AppleScript directory: %s", e)
        scripts = []

    if self.semaphore is None:
        try:
            concurrent_limit = self.config.apple_script_concurrency
            if concurrent_limit <= 0:
                error_msg = f"Invalid concurrency limit: {concurrent_limit}. Must be positive."
                self.error_logger.critical(error_msg)
                raise ValueError(error_msg)

            # Check if rate limiting is enabled (provides better throughput stability)
            rate_limit_cfg = self.config.apple_script_rate_limit
            if rate_limit_cfg.enabled:
                # Use enhanced rate limiter (rate limiting + concurrency control)
                requests_per_window = rate_limit_cfg.requests_per_window
                window_size = rate_limit_cfg.window_size_seconds

                self.rate_limiter = AppleScriptRateLimiter(
                    requests_per_window=requests_per_window,
                    window_seconds=window_size,
                    max_concurrent=concurrent_limit,
                    logger=self.console_logger,
                )
                await self.rate_limiter.initialize()
                self.executor.update_rate_limiter(self.rate_limiter)
                self.console_logger.info(
                    "%s initialized (%d scripts, concurrency: %d, rate: %d/%ss)",
                    LogFormat.entity("AppleScriptClient"),
                    len(scripts),
                    concurrent_limit,
                    requests_per_window,
                    window_size,
                )
            else:
                # Use semaphore-only concurrency control (legacy behavior)
                self.console_logger.debug("Creating semaphore with concurrency limit: %d", concurrent_limit)
                self.semaphore = asyncio.Semaphore(concurrent_limit)
                self.executor.update_semaphore(self.semaphore)
                self.console_logger.info(
                    "%s initialized (%d scripts, concurrency: %d)",
                    LogFormat.entity("AppleScriptClient"),
                    len(scripts),
                    concurrent_limit,
                )
        except (ValueError, TypeError, RuntimeError, asyncio.InvalidStateError) as e:
            self.error_logger.exception("Error initializing AppleScriptClient: %s", e)
            raise
    else:
        self.console_logger.debug("Semaphore already initialized")

run_script async

run_script(
    script_name,
    arguments=None,
    timeout=None,
    context_artist=None,
    context_album=None,
    context_track=None,
    label=None,
)

Execute an AppleScript file and return its output.

Requires initialize() to be called first to set up the AppleScript directory path.

Parameters:

Name Type Description Default
script_name str

Name of the AppleScript file to execute.

required
arguments list[str] | None

Optional list of arguments to pass to the script.

None
timeout float | None

Optional timeout in seconds. Uses default from config if not specified.

None
context_artist str | None

Artist name for contextual logging.

None
context_album str | None

Album name for contextual logging.

None
context_track str | None

Track name for contextual logging.

None
label str | None

Custom label for logging (defaults to script_name).

None

Returns:

Type Description
str | None

The stdout output from the AppleScript execution, or None if an error occurred.

Raises:

Type Description
TimeoutError

If script execution times out.

OSError

If script execution fails.

Source code in src/services/apple/applescript_client.py
@track_instance_method("applescript_run_script")
async def run_script(
    self,
    script_name: str,
    arguments: list[str] | None = None,
    timeout: float | None = None,
    context_artist: str | None = None,
    context_album: str | None = None,
    context_track: str | None = None,
    label: str | None = None,
) -> str | None:
    """Execute an AppleScript file and return its output.

    Requires initialize() to be called first to set up the AppleScript
    directory path.

    Args:
        script_name: Name of the AppleScript file to execute.
        arguments: Optional list of arguments to pass to the script.
        timeout: Optional timeout in seconds. Uses default from config if not specified.
        context_artist: Artist name for contextual logging.
        context_album: Album name for contextual logging.
        context_track: Track name for contextual logging.
        label: Custom label for logging (defaults to script_name).

    Returns:
        The stdout output from the AppleScript execution, or None if an error occurred.

    Raises:
        TimeoutError: If script execution times out.
        OSError: If script execution fails.

    """
    self.console_logger.debug("run_script called: script='%s'", script_name)

    if self.apple_scripts_dir is None:
        error_msg = "AppleScript directory is not set. Cannot run script."
        self.error_logger.error(error_msg)
        return None

    script_path = str(Path(self.apple_scripts_dir) / script_name)
    self.console_logger.debug("Script path: %s", script_path)

    # Validate the script path is within the allowed directory
    if not self.file_validator.validate_script_path(script_path):
        self.error_logger.error("Invalid script path (security check failed): %s", script_path)
        return None

    # Validate file access
    if not self.file_validator.validate_script_file_access(script_path):
        return None

    # Build command with validated arguments
    cmd = self._build_command_with_args(script_path, arguments)
    if cmd is None:
        return None

    # Convert timeout to float, using configured default if not specified
    if timeout is None:
        timeout = self.config.applescript_timeouts.default or self.config.applescript_timeout_seconds
    timeout_float = float(timeout) if timeout is not None else DEFAULT_SCRIPT_TIMEOUT_SECONDS

    # Build contextual information
    context_parts: list[str] = []
    if context_artist:
        context_parts.append(f"Artist: {context_artist}")
    if context_album:
        context_parts.append(f"Album: {context_album}")
    if context_track:
        context_parts.append(f"Track: {context_track}")

    if context_parts:
        context_str = f" ({' | '.join(context_parts)})"
        self.console_logger.debug(
            "Executing AppleScript: %s%s [timeout: %ss]",
            script_name,
            context_str,
            timeout_float,
        )
    else:
        self.console_logger.debug(
            "Executing AppleScript: %s [timeout: %ss]",
            script_name,
            timeout_float,
        )

    try:
        result = await self.executor.run_osascript(cmd, label or script_name, timeout_float)
        self._log_script_result(result)
        return result
    except TimeoutError:
        error_msg = f"AppleScript execution timed out after {timeout_float} seconds"
        self.error_logger.exception(error_msg)
        raise
    except (OSError, subprocess.SubprocessError, asyncio.CancelledError) as e:
        error_msg = f"Error in AppleScript execution: {e}"
        self.error_logger.exception(error_msg)
        raise

fetch_tracks_by_ids async

fetch_tracks_by_ids(
    track_ids, batch_size=1000, timeout=None
)

Fetch tracks by their IDs using fetch_tracks_by_ids.applescript.

Parameters:

Name Type Description Default
track_ids list[str]

List of track IDs to fetch

required
batch_size int

Maximum number of IDs per batch (default: 1000)

1000
timeout float | None

Timeout in seconds for script execution

None

Returns:

Type Description
list[dict[str, str]]

List of track dictionaries with metadata

Source code in src/services/apple/applescript_client.py
@track_instance_method("applescript_fetch_by_ids")
async def fetch_tracks_by_ids(
    self,
    track_ids: list[str],
    batch_size: int = 1000,
    timeout: float | None = None,
) -> list[dict[str, str]]:
    """Fetch tracks by their IDs using fetch_tracks_by_ids.applescript.

    Args:
        track_ids: List of track IDs to fetch
        batch_size: Maximum number of IDs per batch (default: 1000)
        timeout: Timeout in seconds for script execution

    Returns:
        List of track dictionaries with metadata

    """
    if not track_ids:
        return []

    if batch_size <= 0:
        msg = f"Invalid batch_size: {batch_size}, must be positive"
        raise ValueError(msg)

    if timeout is None:
        timeout = self.config.applescript_timeouts.default or self.config.applescript_timeout_seconds

    timeout_float = float(timeout) if timeout is not None else DEFAULT_SCRIPT_TIMEOUT_SECONDS

    all_tracks: list[dict[str, str]] = []
    total_batches = (len(track_ids) + batch_size - 1) // batch_size

    # Use analytics batch_mode to suppress per-call console logging
    async with self.analytics.batch_mode("Fetching tracks by ID...") as status:
        for i in range(0, len(track_ids), batch_size):
            batch = track_ids[i : i + batch_size]
            ids_csv = ",".join(batch)
            batch_num = i // batch_size + 1

            status.update(f"[cyan]Fetching tracks by ID... ({batch_num}/{total_batches})[/cyan]")

            batch_label = f"{FETCH_TRACKS_BY_IDS} [{batch_num}/{total_batches}]"

            raw_output = await self.run_script(
                FETCH_TRACKS_BY_IDS,
                [ids_csv],
                timeout=timeout_float,
                label=batch_label,
            )

            if not raw_output or raw_output == NO_TRACKS_FOUND:
                continue

            # Parse output using same format as fetch_tracks.applescript
            batch_tracks = self._parse_track_output(raw_output)
            all_tracks.extend(batch_tracks)

    self.console_logger.info("Fetched %d tracks (requested: %d)", len(all_tracks), len(track_ids))
    return all_tracks

fetch_all_track_ids async

fetch_all_track_ids(timeout=None)

Fetch just track IDs from Music.app (lightweight operation).

This is used by Smart Delta to detect new/removed tracks without fetching full metadata. Much faster than fetching all track data.

Parameters:

Name Type Description Default
timeout float | None

Timeout in seconds for script execution

None

Returns:

Type Description
list[str]

List of track ID strings

Source code in src/services/apple/applescript_client.py
@track_instance_method("applescript_fetch_all_ids")
async def fetch_all_track_ids(self, timeout: float | None = None) -> list[str]:
    """Fetch just track IDs from Music.app (lightweight operation).

    This is used by Smart Delta to detect new/removed tracks without
    fetching full metadata. Much faster than fetching all track data.

    Args:
        timeout: Timeout in seconds for script execution

    Returns:
        List of track ID strings

    """
    if timeout is None:
        timeout = self.config.applescript_timeouts.default or self.config.applescript_timeout_seconds

    timeout_float = float(timeout) if timeout is not None else DEFAULT_FETCH_IDS_TIMEOUT_SECONDS

    async with spinner("Fetching all track IDs from Music.app..."):
        raw_output = await self.run_script(
            FETCH_TRACK_IDS,
            timeout=timeout_float,
        )

    if not raw_output:
        self.console_logger.warning("No track IDs returned from Music.app")
        return []

    # Check for error from AppleScript
    if raw_output.startswith("ERROR:"):
        self.error_logger.error("AppleScript error: %s", raw_output[6:])
        return []

    # Parse comma-separated IDs
    track_ids = [id_str.strip() for id_str in raw_output.split(",") if id_str.strip()]

    self.console_logger.info("Fetched %d track IDs", len(track_ids))
    return track_ids