Skip to content

generic_cache

General-purpose generic cache with TTL and disk persistence.

GenericCacheService

GenericCacheService(config, logger=None)

Generic in-memory cache service with TTL support and automatic cleanup.

Initialize generic cache service.

Parameters:

Name Type Description Default
config AppConfig

Typed application configuration

required
logger Logger | None

Optional logger instance

None
Source code in src/services/cache/generic_cache.py
def __init__(self, config: AppConfig, logger: logging.Logger | None = None) -> None:
    """Initialize generic cache service.

    Args:
        config: Typed application configuration
        logger: Optional logger instance
    """
    self.config = config
    self.logger = logger or logging.getLogger(__name__)
    self.cache_config = SmartCacheConfig(config)

    # LRU cache: OrderedDict maintains insertion/access order for LRU eviction.
    # Each entry maps hash_key to (value, expires_at_timestamp) tuple.
    self.cache: OrderedDict[str, tuple[CacheableValue, float]] = OrderedDict()

    # Max cache size for LRU eviction (evicts on set() when exceeded)
    self.max_size: int = config.max_generic_entries

    # Cleanup task reference
    self._cleanup_task: asyncio.Task[None] | None = None

    # Resolve TTL and file path configuration
    self.default_ttl = self._resolve_default_ttl()
    self.cache_file = Path(get_full_log_path(config, "generic_cache_file", "cache/generic_cache.json"))

initialize async

initialize()

Initialize generic cache service and start cleanup task.

Source code in src/services/cache/generic_cache.py
async def initialize(self) -> None:
    """Initialize generic cache service and start cleanup task."""
    self.logger.info("Initializing %s...", LogFormat.entity("GenericCacheService"))

    await self._load_from_disk()
    # Start periodic cleanup task
    self._start_cleanup_task()

    self.logger.info("%s initialized with %ds default TTL", LogFormat.entity("GenericCacheService"), self.default_ttl)

get

get(key_data)

Get value from generic cache.

Accessing an entry updates its LRU position (moves to end).

Parameters:

Name Type Description Default
key_data CacheableKey

Cache key data

required

Returns:

Type Description
CacheableValue | None

Cached value if found and valid, None otherwise

Source code in src/services/cache/generic_cache.py
def get(self, key_data: CacheableKey) -> CacheableValue | None:
    """Get value from generic cache.

    Accessing an entry updates its LRU position (moves to end).

    Args:
        key_data: Cache key data

    Returns:
        Cached value if found and valid, None otherwise
    """
    key = UnifiedHashService.hash_generic_key(key_data)

    if key not in self.cache:
        self.logger.debug("Generic cache miss: %s", key[:16])
        return None

    value, timestamp = self.cache[key]

    # Check if expired
    if GenericCacheService._is_expired(timestamp):
        self.logger.debug("Generic cache expired: %s", key[:16])
        del self.cache[key]
        return None

    # Move to end to mark as recently used (LRU update)
    self.cache.move_to_end(key)

    self.logger.debug("Generic cache hit: %s", key[:16])
    return value

set

set(key_data, value, ttl=None)

Store value in generic cache with LRU eviction.

If the cache is at capacity and the key is new, the least recently used entry is evicted before adding the new entry.

Parameters:

Name Type Description Default
key_data CacheableKey

Cache key data

required
value CacheableValue

Value to cache

required
ttl int | None

Time to live in seconds (uses default if not specified)

None
Source code in src/services/cache/generic_cache.py
def set(self, key_data: CacheableKey, value: CacheableValue, ttl: int | None = None) -> None:
    """Store value in generic cache with LRU eviction.

    If the cache is at capacity and the key is new, the least recently used
    entry is evicted before adding the new entry.

    Args:
        key_data: Cache key data
        value: Value to cache
        ttl: Time to live in seconds (uses default if not specified)
    """
    key = UnifiedHashService.hash_generic_key(key_data)

    # Evict LRU entry if at capacity and this is a new key
    if len(self.cache) >= self.max_size and key not in self.cache:
        # Remove oldest entry (first item in OrderedDict = LRU)
        evicted_key, _ = self.cache.popitem(last=False)
        self.logger.debug("LRU eviction: removed %s to make room", evicted_key[:16])

    # Use provided TTL or default
    actual_ttl = ttl if ttl is not None else self.default_ttl
    expires_at = time.time() + actual_ttl

    self.cache[key] = (value, expires_at)

    # Move to end to mark as recently used (handles both new and updated keys)
    self.cache.move_to_end(key)

    self.logger.debug("Stored in generic cache: %s (TTL: %ds)", key[:16], actual_ttl)

invalidate

invalidate(key_data)

Invalidate specific cache entry.

Parameters:

Name Type Description Default
key_data CacheableKey

Cache key data to invalidate

required

Returns:

Type Description
bool

True if entry was found and removed, False otherwise

Source code in src/services/cache/generic_cache.py
def invalidate(self, key_data: CacheableKey) -> bool:
    """Invalidate specific cache entry.

    Args:
        key_data: Cache key data to invalidate

    Returns:
        True if entry was found and removed, False otherwise
    """
    key = UnifiedHashService.hash_generic_key(key_data)

    if key in self.cache:
        del self.cache[key]
        self.logger.debug("Invalidated generic cache entry: %s", key[:16])
        return True

    return False

invalidate_all

invalidate_all()

Clear all generic cache entries.

Source code in src/services/cache/generic_cache.py
def invalidate_all(self) -> None:
    """Clear all generic cache entries."""
    count = len(self.cache)
    self.cache.clear()
    self.logger.info("Cleared all generic cache entries (%d items)", count)

cleanup_expired

cleanup_expired()

Remove expired entries from cache.

Returns:

Type Description
int

Number of entries removed

Source code in src/services/cache/generic_cache.py
def cleanup_expired(self) -> int:
    """Remove expired entries from cache.

    Returns:
        Number of entries removed
    """
    current_time = time.time()
    expired_keys = [key for key, (_, timestamp) in self.cache.items() if timestamp <= current_time]

    # Remove expired entries
    for key in expired_keys:
        del self.cache[key]

    if expired_keys:
        self.logger.debug("Cleaned up %d expired generic cache entries", len(expired_keys))

    return len(expired_keys)

enforce_size_limits

enforce_size_limits()

Enforce cache size limits by removing LRU (least recently used) entries.

Uses OrderedDict iteration order where first = oldest (LRU).

Returns:

Type Description
int

Number of entries removed

Source code in src/services/cache/generic_cache.py
def enforce_size_limits(self) -> int:
    """Enforce cache size limits by removing LRU (least recently used) entries.

    Uses OrderedDict iteration order where first = oldest (LRU).

    Returns:
        Number of entries removed
    """
    if len(self.cache) <= self.max_size:
        return 0

    entries_to_remove = len(self.cache) - self.max_size
    removed_count = 0

    for _ in range(entries_to_remove):
        self.cache.popitem(last=False)  # Remove LRU (oldest in OrderedDict)
        removed_count += 1

    if removed_count > 0:
        self.logger.info("Enforced size limit: removed %d LRU entries", removed_count)

    return removed_count

stop_cleanup_task async

stop_cleanup_task()

Stop the periodic cleanup task.

Source code in src/services/cache/generic_cache.py
async def stop_cleanup_task(self) -> None:
    """Stop the periodic cleanup task."""
    if self._cleanup_task and not self._cleanup_task.done():
        self._cleanup_task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await self._cleanup_task
        self.logger.debug("Stopped cleanup task")

__aenter__ async

__aenter__()

Async context manager entry.

Source code in src/services/cache/generic_cache.py
async def __aenter__(self) -> Self:
    """Async context manager entry."""
    await self.initialize()
    return self

__aexit__ async

__aexit__(_exc_type, _exc, _tb)

Async context manager exit - ensures cleanup task is stopped.

Source code in src/services/cache/generic_cache.py
async def __aexit__(self, _exc_type: type[BaseException] | None, _exc: BaseException | None, _tb: object) -> None:
    """Async context manager exit - ensures cleanup task is stopped."""
    await self.stop_cleanup_task()

get_stats

get_stats()

Get generic cache statistics.

Returns:

Type Description
dict[str, Any]

Dictionary containing cache statistics

Source code in src/services/cache/generic_cache.py
def get_stats(self) -> dict[str, Any]:
    """Get generic cache statistics.

    Returns:
        Dictionary containing cache statistics
    """
    current_time = time.time()
    valid_entries = 0
    expired_entries = 0

    for _, timestamp in self.cache.values():
        if timestamp > current_time:
            valid_entries += 1
        else:
            expired_entries += 1

    policy = self.cache_config.get_policy(CacheContentType.GENERIC)

    return {
        "total_entries": len(self.cache),
        "valid_entries": valid_entries,
        "expired_entries": expired_entries,
        "content_type": CacheContentType.GENERIC.value,
        "default_ttl": self.default_ttl,
        "ttl_policy": policy.ttl_seconds,
        "invalidation_strategy": policy.invalidation_strategy.value,
        "max_entries": self.max_size,
        "cleanup_running": self._cleanup_task is not None and not self._cleanup_task.done(),
    }

save_to_disk async

save_to_disk()

Persist cache contents to disk.

Source code in src/services/cache/generic_cache.py
async def save_to_disk(self) -> None:
    """Persist cache contents to disk."""
    if not self.cache:
        if self.cache_file.exists():
            try:
                self.cache_file.unlink()
                self.logger.info("Deleted empty generic cache file: [cyan]%s[/cyan]", self.cache_file.name)
            except OSError as e:
                self.logger.warning("Failed to remove generic cache file %s: %s", self.cache_file, e)
        return

    def blocking_save() -> None:
        """Write current cache entries to disk within a worker thread."""
        ensure_directory(str(self.cache_file.parent))
        payload = {
            key: {"value": self._prepare_value_for_disk(value), "expires_at": expires_at} for key, (value, expires_at) in self.cache.items()
        }
        with tempfile.NamedTemporaryFile("w", encoding="utf-8", dir=str(self.cache_file.parent), delete=False) as tmp_file:
            json.dump(payload, tmp_file, ensure_ascii=False, indent=2)
            temp_path = Path(tmp_file.name)
        temp_path.replace(self.cache_file)

    try:
        await asyncio.to_thread(blocking_save)
        self.logger.info("Generic cache saved to [cyan]%s[/cyan] (%d entries)", self.cache_file.name, len(self.cache))
    except OSError as e:
        self.logger.exception("Failed to save generic cache to %s: %s", self.cache_file, e)