Skip to content

crypto

Cryptography module for secure token and configuration management.

This module provides Fernet-based symmetric encryption for tokens and configuration data.

CryptographyManager

CryptographyManager(logger, key_file_path='encryption.key')

Manages Fernet-based encryption for tokens and configuration data.

Initialize CryptographyManager.

Parameters:

Name Type Description Default
logger Logger

Logger instance for error reporting

required
key_file_path str

Path to encryption key file

'encryption.key'
Source code in src/app/features/crypto/encryption.py
def __init__(self, logger: logging.Logger, key_file_path: str = "encryption.key") -> None:
    """Initialize CryptographyManager.

    Args:
        logger: Logger instance for error reporting
        key_file_path: Path to encryption key file

    """
    self.logger = logger
    self.key_file_path = Path(key_file_path)
    self._fernet: Fernet | None = None
    self._encryption_key: bytes | None = None

is_token_encrypted staticmethod

is_token_encrypted(token)

Check if a token is encrypted (Fernet format).

Parameters:

Name Type Description Default
token str

Token to check

required

Returns:

Type Description
bool

True if token appears to be Fernet-encrypted

Source code in src/app/features/crypto/encryption.py
@staticmethod
def is_token_encrypted(token: str) -> bool:
    """Check if a token is encrypted (Fernet format).

    Args:
        token: Token to check

    Returns:
        True if token appears to be Fernet-encrypted

    """
    try:
        if not token:
            return False

        # Check if token looks like base64-encoded Fernet token
        # Fernet tokens are typically 100+ characters and base64-encoded
        if len(token) < FERNET_TOKEN_MIN_ENCODED_LENGTH:
            return False

        # Try to decode the double-base64 encoded Fernet token
        try:
            # First decode the outer base64 layer
            outer_decoded = base64.urlsafe_b64decode(token.encode())
            # Then decode the inner base64 layer (Fernet token)
            inner_decoded = base64.urlsafe_b64decode(outer_decoded)

            # Fernet tokens have a minimum length and start with version byte
            return False if len(inner_decoded) < FERNET_TOKEN_MIN_LENGTH else inner_decoded[0] == FERNET_VERSION_BYTE
        except binascii.Error:
            # If double-base64 fails, try single base64 decode
            try:
                decoded = base64.urlsafe_b64decode(token.encode())

                # Fernet tokens have a minimum length and start with version byte
                return False if len(decoded) < FERNET_TOKEN_MIN_LENGTH else decoded[0] == FERNET_VERSION_BYTE
            except binascii.Error:
                return False

    except (ValueError, TypeError, AttributeError):
        return False

encrypt_token

encrypt_token(token, key=None, passphrase=None)

Encrypt a token using Fernet symmetric encryption.

Parameters:

Name Type Description Default
token str

Token to encrypt

required
key str | None

Optional base64-encoded encryption key

None
passphrase str | None

Optional passphrase for key derivation

None

Returns:

Type Description
str

Base64-encoded encrypted token

Raises:

Type Description
EncryptionError

If encryption fails

Source code in src/app/features/crypto/encryption.py
def encrypt_token(self, token: str, key: str | None = None, passphrase: str | None = None) -> str:
    """Encrypt a token using Fernet symmetric encryption.

    Args:
        token: Token to encrypt
        key: Optional base64-encoded encryption key
        passphrase: Optional passphrase for key derivation

    Returns:
        Base64-encoded encrypted token

    Raises:
        EncryptionError: If encryption fails

    """
    try:
        if not token:
            error_message = "Token cannot be empty"
            raise EncryptionError(error_message)

        fernet = self._get_fernet(key, passphrase)
        encrypted_bytes = fernet.encrypt(token.encode())
        encrypted_token = base64.urlsafe_b64encode(encrypted_bytes).decode()

        self.logger.debug("Token encrypted successfully")
        return encrypted_token

    except (InvalidKeyError, KeyGenerationError):
        raise  # Re-raise key-specific errors
    except (TypeError, ValueError, binascii.Error) as e:
        error_message = f"Token encryption failed: {e!s}"
        self.logger.exception(error_message)
        raise EncryptionError(error_message, {"original_error": str(e)}) from e

decrypt_token

decrypt_token(encrypted_token, key=None, passphrase=None)

Decrypt a token using Fernet symmetric encryption.

Parameters:

Name Type Description Default
encrypted_token str

Base64-encoded encrypted token

required
key str | None

Optional base64-encoded encryption key

None
passphrase str | None

Optional passphrase for key derivation

None

Returns:

Type Description
str

Decrypted token

Raises:

Type Description
DecryptionError

If decryption fails

InvalidTokenError

If token format is invalid

Source code in src/app/features/crypto/encryption.py
def decrypt_token(self, encrypted_token: str, key: str | None = None, passphrase: str | None = None) -> str:
    """Decrypt a token using Fernet symmetric encryption.

    Args:
        encrypted_token: Base64-encoded encrypted token
        key: Optional base64-encoded encryption key
        passphrase: Optional passphrase for key derivation

    Returns:
        Decrypted token

    Raises:
        DecryptionError: If decryption fails
        InvalidTokenError: If token format is invalid

    """
    try:
        if not encrypted_token:
            error_message = "Encrypted token cannot be empty"
            raise InvalidTokenError(error_message)

        # Decode the base64-encoded token
        try:
            encrypted_bytes = base64.urlsafe_b64decode(encrypted_token.encode())
        except (binascii.Error, ValueError, TypeError) as e:
            error_message = "Invalid token format - not valid base64"
            raise InvalidTokenError(error_message) from e

        fernet = self._get_fernet(key, passphrase)
        decrypted_bytes = fernet.decrypt(encrypted_bytes)
        decrypted_token: str = decrypted_bytes.decode()

        self.logger.debug("Token decrypted successfully")
        return decrypted_token

    except InvalidToken as e:
        error_message = "Token decryption failed - invalid token or key"
        self.logger.exception(error_message)
        raise DecryptionError(error_message) from e
    except (InvalidKeyError, KeyGenerationError, InvalidTokenError):
        raise  # Re-raise specific errors
    except (TypeError, ValueError, UnicodeDecodeError, binascii.Error) as e:
        error_message = f"Token decryption failed: {e!s}"
        self.logger.exception(error_message)
        raise DecryptionError(error_message, {"original_error": str(e)}) from e

rotate_key

rotate_key(new_passphrase=None, backup_old_key=True)

Rotate the encryption key to a new one.

WARNING: This method only rotates the encryption key file itself. Any data encrypted with the old key will become inaccessible after rotation unless you: 1. Decrypt all existing data with the old key BEFORE calling this method 2. Re-encrypt the data with the new key AFTER rotation completes

For automatic token migration, use the orchestrator's rotate_keys command which handles the complete re-encryption workflow.

Parameters:

Name Type Description Default
new_passphrase str | None

Passphrase for new key derivation

None
backup_old_key bool

Whether to backup the old key

True

Raises:

Type Description
KeyGenerationError

If key rotation fails

Source code in src/app/features/crypto/encryption.py
def rotate_key(self, new_passphrase: str | None = None, backup_old_key: bool = True) -> None:
    """Rotate the encryption key to a new one.

    **WARNING**: This method only rotates the encryption key file itself.
    Any data encrypted with the old key will become inaccessible after rotation
    unless you:
    1. Decrypt all existing data with the old key BEFORE calling this method
    2. Re-encrypt the data with the new key AFTER rotation completes

    For automatic token migration, use the orchestrator's `rotate_keys` command
    which handles the complete re-encryption workflow.

    Args:
        new_passphrase: Passphrase for new key derivation
        backup_old_key: Whether to backup the old key

    Raises:
        KeyGenerationError: If key rotation fails

    """
    try:
        # Backup old key if requested (with timestamp to preserve history)
        if backup_old_key and self.key_file_path.exists():
            timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")
            backup_path = self.key_file_path.with_suffix(f".key.backup.{timestamp}")
            backup_path.write_bytes(self.key_file_path.read_bytes())
            self.logger.info("Backed up old key to %s", backup_path)

        # Generate new key
        new_key = self._generate_key_from_passphrase(new_passphrase) if new_passphrase else Fernet.generate_key()

        # Save new key
        self.key_file_path.write_bytes(new_key)
        self.key_file_path.chmod(0o600)

        # Reset cached instances
        self._fernet = None
        self._encryption_key = new_key

        self.logger.info("Encryption key rotated successfully")

    except (OSError, ValueError, TypeError) as e:
        self._raise_key_error("Key rotation failed: ", e)

get_secure_config_status

get_secure_config_status()

Get security configuration status.

Returns:

Type Description
dict[str, Any]

Status dictionary with current configuration

Source code in src/app/features/crypto/encryption.py
def get_secure_config_status(self) -> dict[str, Any]:
    """Get security configuration status.

    Returns:
        Status dictionary with current configuration

    """
    return {
        "key_file_path": str(self.key_file_path),
        "encryption_initialized": self._fernet is not None,
        "password_configured": self._encryption_key is not None,
        "key_file_exists": self.key_file_path.exists(),
        "key_file_permissions": (oct(self.key_file_path.stat().st_mode)[-3:] if self.key_file_path.exists() else None),
    }

CryptographyError

CryptographyError(message, details=None)

Bases: Exception

Base exception for cryptography operations.

Initialize CryptographyError.

Parameters:

Name Type Description Default
message str

Error description

required
details dict[str, Any] | None

Additional error context

None
Source code in src/app/features/crypto/exceptions.py
def __init__(self, message: str, details: dict[str, Any] | None = None) -> None:
    """Initialize CryptographyError.

    Args:
        message: Error description
        details: Additional error context

    """
    super().__init__(message)
    self.message = message
    self.details = details or {}

DecryptionError

DecryptionError(message, details=None)

Bases: CryptographyError

Exception raised when decryption fails.

Source code in src/app/features/crypto/exceptions.py
def __init__(self, message: str, details: dict[str, Any] | None = None) -> None:
    """Initialize CryptographyError.

    Args:
        message: Error description
        details: Additional error context

    """
    super().__init__(message)
    self.message = message
    self.details = details or {}

EncryptionError

EncryptionError(message, details=None)

Bases: CryptographyError

Exception raised when encryption fails.

Source code in src/app/features/crypto/exceptions.py
def __init__(self, message: str, details: dict[str, Any] | None = None) -> None:
    """Initialize CryptographyError.

    Args:
        message: Error description
        details: Additional error context

    """
    super().__init__(message)
    self.message = message
    self.details = details or {}

InvalidKeyError

InvalidKeyError(message, details=None)

Bases: CryptographyError

Exception raised when encryption key is invalid.

Source code in src/app/features/crypto/exceptions.py
def __init__(self, message: str, details: dict[str, Any] | None = None) -> None:
    """Initialize CryptographyError.

    Args:
        message: Error description
        details: Additional error context

    """
    super().__init__(message)
    self.message = message
    self.details = details or {}

InvalidTokenError

InvalidTokenError(message, details=None)

Bases: CryptographyError

Exception raised when token format is invalid.

Source code in src/app/features/crypto/exceptions.py
def __init__(self, message: str, details: dict[str, Any] | None = None) -> None:
    """Initialize CryptographyError.

    Args:
        message: Error description
        details: Additional error context

    """
    super().__init__(message)
    self.message = message
    self.details = details or {}

KeyGenerationError

KeyGenerationError(message, details=None)

Bases: CryptographyError

Exception raised when key generation fails.

Source code in src/app/features/crypto/exceptions.py
def __init__(self, message: str, details: dict[str, Any] | None = None) -> None:
    """Initialize CryptographyError.

    Args:
        message: Error description
        details: Additional error context

    """
    super().__init__(message)
    self.message = message
    self.details = details or {}