Skip to content

applescript_executor

AppleScript subprocess execution module.

This module handles the low-level subprocess execution for AppleScript commands, including timeout handling and process cleanup.

AppleScriptExecutionError

AppleScriptExecutionError(message, label, errno_code=None)

Bases: OSError

Exception raised when AppleScript execution fails.

This exception is used to signal transient errors that may be retried by the DatabaseRetryHandler. It extends OSError to leverage the retry handler's transient error detection based on errno codes.

Initialize the execution error.

Parameters:

Name Type Description Default
message str

Error description

required
label str

Script label for context

required
errno_code int | None

Optional errno code for transient error detection

None
Source code in src/services/apple/applescript_executor.py
def __init__(self, message: str, label: str, errno_code: int | None = None) -> None:
    """Initialize the execution error.

    Args:
        message: Error description
        label: Script label for context
        errno_code: Optional errno code for transient error detection
    """
    super().__init__(errno_code, message)
    self.label = label

AppleScriptExecutor

AppleScriptExecutor(
    semaphore,
    apple_scripts_directory,
    console_logger,
    error_logger,
    retry_handler=None,
    rate_limiter=None,
)

Handles subprocess execution for AppleScript commands.

This class manages the execution lifecycle including: - Running osascript subprocesses - Handling timeouts and cancellation - Process cleanup - Temporary file execution for complex scripts

Initialize the executor.

Parameters:

Name Type Description Default
semaphore Semaphore | None

Semaphore for concurrency control (can be None initially)

required
apple_scripts_directory str | None

Directory for temporary script files

required
console_logger Logger

Logger for debug/info messages

required
error_logger Logger

Logger for error messages

required
retry_handler DatabaseRetryHandler | None

Optional retry handler for transient error recovery

None
rate_limiter AppleScriptRateLimiter | None

Optional rate limiter for enhanced throughput control

None
Source code in src/services/apple/applescript_executor.py
def __init__(
    self,
    semaphore: asyncio.Semaphore | None,
    apple_scripts_directory: str | None,
    console_logger: logging.Logger,
    error_logger: logging.Logger,
    retry_handler: DatabaseRetryHandler | None = None,
    rate_limiter: AppleScriptRateLimiter | None = None,
) -> None:
    """Initialize the executor.

    Args:
        semaphore: Semaphore for concurrency control (can be None initially)
        apple_scripts_directory: Directory for temporary script files
        console_logger: Logger for debug/info messages
        error_logger: Logger for error messages
        retry_handler: Optional retry handler for transient error recovery
        rate_limiter: Optional rate limiter for enhanced throughput control
    """
    self.semaphore = semaphore
    self.apple_scripts_directory = apple_scripts_directory
    self.console_logger = console_logger
    self.error_logger = error_logger
    self.retry_handler = retry_handler
    self.rate_limiter = rate_limiter

update_semaphore

update_semaphore(semaphore)

Update the semaphore after async initialization.

Parameters:

Name Type Description Default
semaphore Semaphore

The initialized semaphore

required
Source code in src/services/apple/applescript_executor.py
def update_semaphore(self, semaphore: asyncio.Semaphore) -> None:
    """Update the semaphore after async initialization.

    Args:
        semaphore: The initialized semaphore
    """
    self.semaphore = semaphore

update_rate_limiter

update_rate_limiter(rate_limiter)

Update the rate limiter after async initialization.

When a rate limiter is set, it takes precedence over the semaphore for concurrency control, providing both rate limiting and concurrency.

Parameters:

Name Type Description Default
rate_limiter AppleScriptRateLimiter

The initialized rate limiter

required
Source code in src/services/apple/applescript_executor.py
def update_rate_limiter(self, rate_limiter: AppleScriptRateLimiter) -> None:
    """Update the rate limiter after async initialization.

    When a rate limiter is set, it takes precedence over the semaphore
    for concurrency control, providing both rate limiting and concurrency.

    Args:
        rate_limiter: The initialized rate limiter
    """
    self.rate_limiter = rate_limiter

log_script_success

log_script_success(label, script_result, elapsed)

Log successful script execution with appropriate formatting.

Parameters:

Name Type Description Default
label str

Script label for logging

required
script_result str

Script output

required
elapsed float

Execution time in seconds

required
Source code in src/services/apple/applescript_executor.py
def log_script_success(self, label: str, script_result: str, elapsed: float) -> None:
    """Log successful script execution with appropriate formatting.

    Args:
        label: Script label for logging
        script_result: Script output
        elapsed: Execution time in seconds
    """
    # Skip verbose logging for update_property - higher-level logs are more informative
    if label.startswith(UPDATE_PROPERTY):
        self.console_logger.debug("◁ %s completed in %.1fs", label, elapsed)
        return

    if label.startswith(TRACK_DATA_SCRIPTS):
        # Count tracks by counting line separators (ASCII 29)
        track_count = script_result.count(LINE_SEPARATOR)
        size_kb = len(script_result.encode()) / 1024

        self.console_logger.info(
            "◁ %s: %d tracks (%.1fKB, %.1fs)",
            label,
            track_count,
            size_kb,
            elapsed,
        )
    elif label.startswith(FETCH_TRACK_IDS):
        # Just show count of IDs fetched - no preview needed
        id_count = script_result.count(",") + 1 if script_result.strip() else 0
        size_kb = len(script_result.encode()) / 1024
        self.console_logger.info(
            "◁ %s: %d IDs (%.1fKB, %.1fs)",
            label,
            id_count,
            size_kb,
            elapsed,
        )
    elif LINE_SEPARATOR in script_result or FIELD_SEPARATOR in script_result:
        # Other scripts with field/record separators - show count only
        record_count = script_result.count(LINE_SEPARATOR) or script_result.count(FIELD_SEPARATOR)
        size_kb = len(script_result.encode()) / 1024
        self.console_logger.info(
            "◁ %s: %d records (%.1fKB, %.1fs)",
            label,
            record_count,
            size_kb,
            elapsed,
        )
    else:
        # Create a preview for logging - this can be stripped
        preview_text = script_result.strip()
        preview = f"{preview_text[:RESULT_PREVIEW_LENGTH]}..." if len(preview_text) > RESULT_PREVIEW_LENGTH else preview_text
        # Log at appropriate level based on result content
        log_level = self.console_logger.debug if "No Change" in preview else self.console_logger.info
        log_level(
            "◁ %s (%dB, %.1fs) %s",
            label,
            len(script_result.encode()),
            elapsed,
            preview,
        )

cleanup_process async

cleanup_process(proc, label)

Clean up process resources.

Parameters:

Name Type Description Default
proc Process

Process to clean up

required
label str

Label for logging

required
Source code in src/services/apple/applescript_executor.py
async def cleanup_process(self, proc: asyncio.subprocess.Process, label: str) -> None:
    """Clean up process resources.

    Args:
        proc: Process to clean up
        label: Label for logging
    """
    try:
        # Wait briefly for process to exit naturally
        async with asyncio.timeout(PROCESS_EXIT_WAIT_SECONDS):
            await proc.wait()
        self.console_logger.debug("Process for %s exited naturally and cleaned up", label)
    except TimeoutError:
        # If still running, kill and wait for cleanup
        try:
            proc.kill()
            async with asyncio.timeout(PROCESS_KILL_WAIT_SECONDS):
                await proc.wait()
            self.console_logger.debug("Process for %s killed and cleaned up", label)
        except (TimeoutError, ProcessLookupError) as e:
            self.console_logger.warning(
                "Could not kill or wait for process %s during cleanup: %s",
                label,
                str(e),
            )

handle_subprocess_execution async

handle_subprocess_execution(cmd, label, timeout_seconds)

Handle subprocess execution with timeout, error handling, and optional retry.

If a retry_handler is configured, transient errors will be automatically retried with exponential backoff.

Parameters:

Name Type Description Default
cmd list[str]

Command to execute as a list of strings

required
label str

Label for logging

required
timeout_seconds float

Timeout in seconds

required

Returns:

Type Description
str | None

Command output if successful, None otherwise

Source code in src/services/apple/applescript_executor.py
async def handle_subprocess_execution(
    self,
    cmd: list[str],
    label: str,
    timeout_seconds: float,
) -> str | None:
    """Handle subprocess execution with timeout, error handling, and optional retry.

    If a retry_handler is configured, transient errors will be automatically
    retried with exponential backoff.

    Args:
        cmd: Command to execute as a list of strings
        label: Label for logging
        timeout_seconds: Timeout in seconds

    Returns:
        Command output if successful, None otherwise
    """
    try:
        if not self.retry_handler:
            return await self._execute_subprocess(cmd, label, timeout_seconds)
        return await self.retry_handler.execute_with_retry(
            lambda: self._execute_subprocess(cmd, label, timeout_seconds),
            f"applescript:{label}",
        )
    except OSError:
        # All retries exhausted, return None for backward compatibility
        return None

run_osascript async

run_osascript(cmd, label, timeout_seconds)

Run an osascript command and return output.

Uses rate limiter if configured (provides both rate limiting and concurrency), otherwise falls back to semaphore-only concurrency control.

Parameters:

Name Type Description Default
cmd list[str]

Command to execute as a list of strings

required
label str

Label for logging

required
timeout_seconds float

Timeout in seconds

required

Returns:

Type Description
str | None

Command output if successful, None otherwise

Source code in src/services/apple/applescript_executor.py
async def run_osascript(
    self,
    cmd: list[str],
    label: str,
    timeout_seconds: float,
) -> str | None:
    """Run an osascript command and return output.

    Uses rate limiter if configured (provides both rate limiting and concurrency),
    otherwise falls back to semaphore-only concurrency control.

    Args:
        cmd: Command to execute as a list of strings
        label: Label for logging
        timeout_seconds: Timeout in seconds

    Returns:
        Command output if successful, None otherwise
    """
    # Use rate limiter if available (provides both rate limiting + concurrency)
    if self.rate_limiter is not None:
        try:
            await self.rate_limiter.acquire()
            return await self.handle_subprocess_execution(cmd, label, timeout_seconds)
        finally:
            self.rate_limiter.release()

    # Fall back to semaphore-only concurrency control
    if self.semaphore is None:
        self.error_logger.error("AppleScriptExecutor semaphore not initialized.")
        return None

    async with self.semaphore:
        return await self.handle_subprocess_execution(cmd, label, timeout_seconds)