Skip to content

analytics

Performance tracking and analysis via decorators.

Measures execution time, success rates, and duration patterns for sync/async functions. Supports memory-safe event storage, HTML report generation, and multi-instance merging.

TimingInfo

TimingInfo(start, end, duration, overhead)

Container for timing-related data.

Source code in src/metrics/analytics.py
def __init__(self, start: float, end: float, duration: float, overhead: float) -> None:
    self.start = start
    self.end = end
    self.duration = duration
    self.overhead = overhead

CallInfo

CallInfo(func_name, event_type, success)

Container for function call metadata.

Source code in src/metrics/analytics.py
def __init__(self, func_name: str, event_type: str, success: bool) -> None:
    self.func_name = func_name
    self.event_type = event_type
    self.success = success

LoggerContainer

LoggerContainer(
    console_logger, error_logger, analytics_logger
)

Container for all loggers used by Analytics.

Source code in src/metrics/analytics.py
def __init__(
    self,
    console_logger: logging.Logger,
    error_logger: logging.Logger,
    analytics_logger: logging.Logger,
) -> None:
    self.console = console_logger
    self.error = error_logger
    self.analytics = analytics_logger

Analytics

Analytics(config, loggers, max_events=None)

Tracks function performance, success rates, and execution patterns.

Initialize the Analytics instance.

Source code in src/metrics/analytics.py
def __init__(
    self,
    config: AppConfig,
    loggers: LoggerContainer,
    max_events: int | None = None,
) -> None:
    """Initialize the Analytics instance."""
    Analytics._instances += 1
    self.instance_id = Analytics._instances

    # Check if analytics is enabled (default: True)
    self.enabled = config.analytics.enabled

    # Data stores
    self.events: list[dict[str, Any]] = []
    self.call_counts: dict[str, int] = {}
    self.success_counts: dict[str, int] = {}
    self.decorator_overhead: dict[str, float] = {}

    # Config & loggers
    self.config = config
    self.console_logger = loggers.console
    self.error_logger = loggers.error
    self.analytics_logger = loggers.analytics

    # Limits & thresholds
    self.max_events = config.analytics.max_events if max_events is None else max_events
    thresholds = config.analytics.duration_thresholds
    self.short_max = thresholds.short_max
    self.medium_max = thresholds.medium_max
    self.long_max = thresholds.long_max

    # Time formatting
    self.time_format = config.analytics.time_format
    self.compact_time = config.analytics.compact_time

    # Performance options
    self.enable_gc_collect = config.analytics.enable_gc_collect

    # Batch mode state (suppresses console logging, shows spinner instead)
    self._suppress_console_logging = False
    self._batch_call_count = 0
    self._batch_total_duration = 0.0

    # Shared console for coordinated Rich output
    self._console = get_shared_console()

    self.console_logger.debug("Analytics #%s initialized", self.instance_id)

track

track(event_type)

Preferred decorator API - tracks sync/async functions.

Source code in src/metrics/analytics.py
def track(self, event_type: str) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
    """Preferred decorator API - tracks sync/async functions."""
    return self._decorator(event_type)

execute_async_wrapped_call async

execute_async_wrapped_call(
    func, event_type, *args, **kwargs
)

Execute an async wrapped function call with analytics tracking.

Parameters:

Name Type Description Default
func Callable[..., Any]

Async function to execute

required
event_type str

Type of event for tracking

required
*args Any

Function arguments

()
**kwargs Any

Function keyword arguments

{}

Returns:

Type Description
Any

Function result

Source code in src/metrics/analytics.py
async def execute_async_wrapped_call(
    self,
    func: Callable[..., Any],
    event_type: str,
    *args: Any,
    **kwargs: Any,
) -> Any:
    """Execute an async wrapped function call with analytics tracking.

    Args:
        func: Async function to execute
        event_type: Type of event for tracking
        *args: Function arguments
        **kwargs: Function keyword arguments

    Returns:
        Function result

    """
    return await self._wrapped_call(func, event_type, True, *args, **kwargs)

execute_sync_wrapped_call

execute_sync_wrapped_call(
    func, event_type, *args, **kwargs
)

Execute a sync wrapped function call with analytics tracking.

Parameters:

Name Type Description Default
func Callable[..., Any]

Sync function to execute

required
event_type str

Type of event for tracking

required
*args Any

Function arguments

()
**kwargs Any

Function keyword arguments

{}

Returns:

Type Description
Any

Function result

Note

Cannot be called from within an active event loop. If you need to call a sync function from async context, call it directly or use an async wrapper.

Source code in src/metrics/analytics.py
def execute_sync_wrapped_call(
    self,
    func: Callable[..., Any],
    event_type: str,
    *args: Any,
    **kwargs: Any,
) -> Any:
    """Execute a sync wrapped function call with analytics tracking.

    Args:
        func: Sync function to execute
        event_type: Type of event for tracking
        *args: Function arguments
        **kwargs: Function keyword arguments

    Returns:
        Function result

    Note:
        Cannot be called from within an active event loop. If you need to call
        a sync function from async context, call it directly or use an async wrapper.

    """
    try:
        return asyncio.run(self._wrapped_call(func, event_type, False, *args, **kwargs))
    except RuntimeError as e:
        if "cannot be called from a running event loop" in str(e):
            func_name = _get_func_name(func)
            self.console_logger.warning("Cannot track %s with asyncio.run() from within event loop; executing without tracking", func_name)
            # Execute function directly without tracking to avoid blocking
            return func(*args, **kwargs)
        raise

execute_wrapped_call async

execute_wrapped_call(
    func, event_type, is_async, *args, **kwargs
)

Execute a wrapped function call with analytics tracking.

Parameters:

Name Type Description Default
func Callable[..., Any]

Function to execute

required
event_type str

Type of event for tracking

required
is_async bool

Whether the function is async

required
*args Any

Function arguments

()
**kwargs Any

Function keyword arguments

{}

Returns:

Type Description
Any

Function result

Source code in src/metrics/analytics.py
async def execute_wrapped_call(
    self,
    func: Callable[..., Any],
    event_type: str,
    is_async: bool,
    *args: Any,
    **kwargs: Any,
) -> Any:
    """Execute a wrapped function call with analytics tracking.

    Args:
        func: Function to execute
        event_type: Type of event for tracking
        is_async: Whether the function is async
        *args: Function arguments
        **kwargs: Function keyword arguments

    Returns:
        Function result

    """
    if is_async:
        return await self.execute_async_wrapped_call(func, event_type, *args, **kwargs)
    return self.execute_sync_wrapped_call(func, event_type, *args, **kwargs)

batch_mode async

batch_mode(message='Processing...', console=None)

Context manager that suppresses console logging and shows a spinner instead.

Use this when executing many tracked operations in a loop to avoid flooding the console with individual timing logs.

Parameters:

Name Type Description Default
message str

Message to display next to the spinner

'Processing...'
console Console | None

Optional Rich Console instance

None

Yields:

Type Description
AsyncIterator[Status]

Rich Status object that can be updated with progress info

Example

async with analytics.batch_mode("Fetching tracks...") as status: for i, batch in enumerate(batches): status.update(f"[cyan]Fetching batch {i+1}/{len(batches)}...[/cyan]") await fetch_batch(batch)

Source code in src/metrics/analytics.py
@asynccontextmanager
async def batch_mode(
    self,
    message: str = "Processing...",
    console: Console | None = None,
) -> AsyncIterator[Status]:
    """Context manager that suppresses console logging and shows a spinner instead.

    Use this when executing many tracked operations in a loop to avoid
    flooding the console with individual timing logs.

    Args:
        message: Message to display next to the spinner
        console: Optional Rich Console instance

    Yields:
        Rich Status object that can be updated with progress info

    Example:
        async with analytics.batch_mode("Fetching tracks...") as status:
            for i, batch in enumerate(batches):
                status.update(f"[cyan]Fetching batch {i+1}/{len(batches)}...[/cyan]")
                await fetch_batch(batch)
    """
    _console = console or self._console
    self._suppress_console_logging = True
    self._batch_call_count = 0
    self._batch_total_duration = 0.0
    start_time = time.time()

    try:
        with _console.status(f"[cyan]{message}[/cyan]") as status:
            yield status
    finally:
        self._suppress_console_logging = False
        elapsed = time.time() - start_time

        # Log summary after batch completes
        if self._batch_call_count > 0:
            avg_duration = self._batch_total_duration / self._batch_call_count
            self.console_logger.info(
                "✅ Batch completed: %d calls in %.1fs (avg %.1fs/call)",
                self._batch_call_count,
                elapsed,
                avg_duration,
            )

        # Reset batch stats
        self._batch_call_count = 0
        self._batch_total_duration = 0.0

get_stats

get_stats(function_filter=None)

Get statistics for analytics data.

Source code in src/metrics/analytics.py
def get_stats(self, function_filter: str | list[str] | None = None) -> dict[str, Any]:
    """Get statistics for analytics data."""
    if function_filter:
        names = {function_filter} if isinstance(function_filter, str) else set(function_filter)
        events = [e for e in self.events if e["Function"] in names]
    else:
        names = set(self.call_counts.keys())
        events = self.events

    total_calls = sum(self.call_counts.get(fn, 0) for fn in names)
    total_success = sum(self.success_counts.get(fn, 0) for fn in names)
    total_time = sum(e[self._DURATION_FIELD] for e in events)
    success_rate = (total_success / total_calls * 100) if total_calls else 0
    avg_duration = (total_time / len(events)) if events else 0

    slowest = max(events, key=lambda e: e[self._DURATION_FIELD], default=None)
    fastest = min(events, key=lambda e: e[self._DURATION_FIELD], default=None)

    duration_counts = {
        "fast": len([e for e in events if e[self._DURATION_FIELD] <= self.short_max]),
        "medium": len([e for e in events if self.short_max < e[self._DURATION_FIELD] <= self.medium_max]),
        "slow": len([e for e in events if e[self._DURATION_FIELD] > self.medium_max]),
    }

    return {
        "total_calls": total_calls,
        "total_success": total_success,
        "success_rate": success_rate,
        "total_time": total_time,
        "avg_duration": avg_duration,
        "slowest": slowest,
        "fastest": fastest,
        "duration_counts": duration_counts,
        "function_count": len(names),
        "event_count": len(events),
    }

log_summary

log_summary()

Log a summary of analytics data.

Source code in src/metrics/analytics.py
def log_summary(self) -> None:
    """Log a summary of analytics data."""
    if not self.enabled:
        return
    stats = self.get_stats()
    self.console_logger.info(
        "Analytics Summary: %s calls | %.1f%% success | avg %.3fs",
        stats["total_calls"],
        stats["success_rate"],
        stats["avg_duration"],
    )

    duration_counts = stats["duration_counts"]
    total = sum(duration_counts.values()) or 1
    self.console_logger.info(
        "Performance: %s %.0f%% | %s %.0f%% | %s %.0f%%",
        self._FAST,
        duration_counts["fast"] / total * 100,
        self._MEDIUM,
        duration_counts["medium"] / total * 100,
        self._SLOW,
        duration_counts["slow"] / total * 100,
    )

clear_old_events

clear_old_events(days=7)

Clear old events from the analytics log.

Parameters:

Name Type Description Default
days int

Number of days to keep (only applies when compact_time=False)

7

Returns:

Type Description
int

Number of events removed

Note

When compact_time=True, uses count-based pruning (removes oldest half or 1000 events) instead of age-based pruning, since timestamps don't include dates.

Source code in src/metrics/analytics.py
def clear_old_events(self, days: int = 7) -> int:
    """Clear old events from the analytics log.

    Args:
        days: Number of days to keep (only applies when compact_time=False)

    Returns:
        Number of events removed

    Note:
        When compact_time=True, uses count-based pruning (removes oldest half or 1000 events)
        instead of age-based pruning, since timestamps don't include dates.

    """
    if not self.events:
        return 0

    if self.compact_time:
        self.console_logger.warning("Age-based pruning not supported in compact_time mode; using count-based pruning")
        prune = min(len(self.events) // 2, 1_000)
        self.events = self.events[prune:]
        return prune

    cutoff = datetime.now(UTC) - timedelta(days=days)
    original = len(self.events)
    self.events = [e for e in self.events if datetime.strptime(e["Start Time"], self.time_format).replace(tzinfo=UTC) >= cutoff]
    return original - len(self.events)

merge_with

merge_with(other)

Merge analytics data from another Analytics instance.

Source code in src/metrics/analytics.py
def merge_with(self, other: Analytics) -> None:
    """Merge analytics data from another Analytics instance."""
    if other is self:
        return

    # Handle events with cap enforcement
    if self.max_events > 0:
        cap = max(0, self.max_events - len(self.events))
        num_to_add = min(cap, len(other.events))
        to_add = other.events[-num_to_add:] if num_to_add > 0 else []
        num_dropped = len(other.events) - num_to_add
        if num_dropped > 0:
            self.console_logger.warning("Dropped %s events during merge due to max_events=%s limit", num_dropped, self.max_events)
    else:
        to_add = other.events
    self.events.extend(to_add)

    # Merge call_counts (int values)
    for func_name, count in other.call_counts.items():
        self.call_counts[func_name] = self.call_counts.get(func_name, 0) + count

    # Merge success_counts (int values)
    for func_name, count in other.success_counts.items():
        self.success_counts[func_name] = self.success_counts.get(func_name, 0) + count

    # Merge decorator_overhead (float values)
    for func_name, overhead in other.decorator_overhead.items():
        current_overhead: float = self.decorator_overhead.get(func_name, 0.0)
        self.decorator_overhead[func_name] = current_overhead + float(overhead)

    other.events.clear()
    other.call_counts.clear()
    other.success_counts.clear()
    other.decorator_overhead.clear()

generate_reports

generate_reports(force_mode=False)

Generate analytics reports.

Parameters:

Name Type Description Default
force_mode bool

Force report generation even if criteria not met

False
Note

Garbage collection is triggered after report generation if enabled via config (analytics.enable_gc_collect) and event count exceeds threshold.

Source code in src/metrics/analytics.py
def generate_reports(self, force_mode: bool = False) -> None:
    """Generate analytics reports.

    Args:
        force_mode: Force report generation even if criteria not met

    Note:
        Garbage collection is triggered after report generation if enabled
        via config (analytics.enable_gc_collect) and event count exceeds threshold.

    """
    # Skip report generation if analytics is disabled
    if not self.enabled:
        return

    if not self.events and not self.call_counts:
        self.console_logger.warning("No analytics data; skipping report")
        return

    self.log_summary()

    save_html_report(
        self.events,
        self.call_counts,
        self.success_counts,
        self.decorator_overhead,
        self.config,
        self.console_logger,
        self.error_logger,
        group_successful_short_calls=True,
        force_mode=force_mode,
    )

    if len(self.events) > self.GC_COLLECTION_THRESHOLD and self.enable_gc_collect:
        gc.collect()