Skip to content

Core Modules Reference

Reference documentation for Apiary's core modules.

Authentication

Authentication Module

core.auth.authentication

Authentication utilities for API key validation.

AuthenticatedUser

AuthenticatedUser(api_key: str)

Represents an authenticated user.

Initialize authenticated user.

Parameters:

  • api_key (str) –

    The validated API key

Source code in core/auth/authentication.py
def __init__(self, api_key: str):
    """Initialize authenticated user.

    Args:
        api_key: The validated API key
    """
    self.api_key = api_key
    self.is_authenticated = True

get_api_key_from_header async

get_api_key_from_header(
    x_api_key: str | None = Header(None, alias="X-API-Key"),
) -> str | None

Extract API key from request header.

Parameters:

  • x_api_key (str | None, default: Header(None, alias='X-API-Key') ) –

    API key from X-API-Key header

Returns:

  • str | None

    API key string or None if not provided

Source code in core/auth/authentication.py
async def get_api_key_from_header(
    x_api_key: str | None = Header(None, alias="X-API-Key"),
) -> str | None:
    """Extract API key from request header.

    Args:
        x_api_key: API key from X-API-Key header

    Returns:
        API key string or None if not provided
    """
    return x_api_key

validate_api_key

validate_api_key(
    api_key: str, settings: Settings, endpoint_keys: str | None = None
) -> bool

Validate an API key against configured keys.

Parameters:

  • api_key (str) –

    The API key to validate

  • settings (Settings) –

    Application settings containing global API keys

  • endpoint_keys (str | None, default: None ) –

    Optional endpoint-specific API keys (overrides global keys)

Returns:

  • bool

    True if the API key is valid, False otherwise

Source code in core/auth/authentication.py
def validate_api_key(
    api_key: str, settings: Settings, endpoint_keys: str | None = None
) -> bool:
    """Validate an API key against configured keys.

    Args:
        api_key: The API key to validate
        settings: Application settings containing global API keys
        endpoint_keys: Optional endpoint-specific API keys (overrides global keys)

    Returns:
        True if the API key is valid, False otherwise
    """
    manager = get_api_key_manager()

    # If endpoint has specific keys, use those exclusively
    if endpoint_keys:
        return manager.validate_key(api_key, endpoint_keys)

    # Otherwise, use global keys
    if not settings.api_keys:
        # No API keys configured, authentication disabled
        return False

    return manager.validate_key(api_key, settings.api_keys)

create_api_key_validator

create_api_key_validator(endpoint_keys: str | None = None)

Create an API key validator dependency with optional endpoint-specific keys.

Parameters:

  • endpoint_keys (str | None, default: None ) –

    Optional endpoint-specific API keys

Returns:

  • A dependency function for API key validation

Source code in core/auth/authentication.py
def create_api_key_validator(endpoint_keys: str | None = None):
    """Create an API key validator dependency with optional endpoint-specific keys.

    Args:
        endpoint_keys: Optional endpoint-specific API keys

    Returns:
        A dependency function for API key validation
    """

    async def verify_api_key(
        api_key: str | None = Depends(get_api_key_from_header),
        settings: Settings = Depends(get_settings),
    ) -> str:
        """Dependency to verify API key authentication.

        Args:
            api_key: API key from request header
            settings: Application settings

        Returns:
            The validated API key

        Raises:
            HTTPException: If authentication fails
        """
        if not api_key:
            raise AuthenticationError(
                "API key required",
                details={"header": API_KEY_HEADER},
            )

        if not validate_api_key(api_key, settings, endpoint_keys):
            raise AuthenticationError(
                "Invalid API key",
                details={"header": API_KEY_HEADER},
            )

        return api_key

    return verify_api_key

verify_api_key async

verify_api_key(
    api_key: str | None = Depends(get_api_key_from_header),
    settings: Settings = Depends(get_settings),
) -> str

Default dependency to verify API key authentication using global keys.

Parameters:

  • api_key (str | None, default: Depends(get_api_key_from_header) ) –

    API key from request header

  • settings (Settings, default: Depends(get_settings) ) –

    Application settings

Returns:

  • str

    The validated API key

Raises:

  • HTTPException

    If authentication fails

Source code in core/auth/authentication.py
async def verify_api_key(
    api_key: str | None = Depends(get_api_key_from_header),
    settings: Settings = Depends(get_settings),
) -> str:
    """Default dependency to verify API key authentication using global keys.

    Args:
        api_key: API key from request header
        settings: Application settings

    Returns:
        The validated API key

    Raises:
        HTTPException: If authentication fails
    """
    if not api_key:
        raise AuthenticationError(
            "API key required",
            details={"header": API_KEY_HEADER},
        )

    if not validate_api_key(api_key, settings):
        raise AuthenticationError(
            "Invalid API key",
            details={"header": API_KEY_HEADER},
        )

    return api_key

create_optional_authenticator

create_optional_authenticator(endpoint_keys: str | None = None)

Create an optional authenticator with endpoint-specific keys.

Parameters:

  • endpoint_keys (str | None, default: None ) –

    Optional endpoint-specific API keys

Returns:

  • A dependency function for optional authentication

Source code in core/auth/authentication.py
def create_optional_authenticator(endpoint_keys: str | None = None):
    """Create an optional authenticator with endpoint-specific keys.

    Args:
        endpoint_keys: Optional endpoint-specific API keys

    Returns:
        A dependency function for optional authentication
    """

    async def get_authenticated_user(
        api_key: str | None = Depends(get_api_key_from_header),
        settings: Settings = Depends(get_settings),
    ) -> AuthenticatedUser | None:
        """Get authenticated user from API key (optional).

        This function does not raise an error if authentication fails.
        It returns None if the API key is missing or invalid.

        Args:
            api_key: API key from header (may be None)
            settings: Application settings

        Returns:
            AuthenticatedUser if authenticated, None otherwise
        """
        if not api_key:
            return None

        if validate_api_key(api_key, settings, endpoint_keys):
            return AuthenticatedUser(api_key)

        return None

    return get_authenticated_user

get_authenticated_user async

get_authenticated_user(
    api_key: str | None = Depends(get_api_key_from_header),
    settings: Settings = Depends(get_settings),
) -> AuthenticatedUser | None

Get authenticated user from API key (optional) using global keys.

This function does not raise an error if authentication fails. It returns None if the API key is missing or invalid.

Parameters:

  • api_key (str | None, default: Depends(get_api_key_from_header) ) –

    API key from header (may be None)

  • settings (Settings, default: Depends(get_settings) ) –

    Application settings

Returns:

Source code in core/auth/authentication.py
async def get_authenticated_user(
    api_key: str | None = Depends(get_api_key_from_header),
    settings: Settings = Depends(get_settings),
) -> AuthenticatedUser | None:
    """Get authenticated user from API key (optional) using global keys.

    This function does not raise an error if authentication fails.
    It returns None if the API key is missing or invalid.

    Args:
        api_key: API key from header (may be None)
        settings: Application settings

    Returns:
        AuthenticatedUser if authenticated, None otherwise
    """
    if not api_key:
        return None

    if validate_api_key(api_key, settings):
        return AuthenticatedUser(api_key)

    return None

Authorization Module

core.auth.authorization

Authorization utilities for access control.

create_auth_dependency

create_auth_dependency(endpoint_keys: str | None = None)

Create an auth dependency with optional endpoint-specific keys.

Parameters:

  • endpoint_keys (str | None, default: None ) –

    Optional endpoint-specific API keys

Returns:

  • A dependency function that requires authentication

Source code in core/auth/authorization.py
def create_auth_dependency(endpoint_keys: str | None = None):
    """Create an auth dependency with optional endpoint-specific keys.

    Args:
        endpoint_keys: Optional endpoint-specific API keys

    Returns:
        A dependency function that requires authentication
    """
    validator = create_api_key_validator(endpoint_keys)

    async def require_auth_with_keys(
        api_key: str = Depends(validator),
    ) -> AuthenticatedUser:
        """Dependency that requires authentication.

        Args:
            api_key: Validated API key from authentication

        Returns:
            AuthenticatedUser instance
        """
        return AuthenticatedUser(api_key)

    return require_auth_with_keys

require_auth async

require_auth(api_key: str = Depends(verify_api_key)) -> AuthenticatedUser

Dependency that requires authentication (using global keys).

This dependency will raise an AuthenticationError if the request is not authenticated.

Parameters:

  • api_key (str, default: Depends(verify_api_key) ) –

    Validated API key

Returns:

Raises:

Source code in core/auth/authorization.py
async def require_auth(
    api_key: str = Depends(verify_api_key),
) -> AuthenticatedUser:
    """Dependency that requires authentication (using global keys).

    This dependency will raise an AuthenticationError if the request
    is not authenticated.

    Args:
        api_key: Validated API key

    Returns:
        AuthenticatedUser instance

    Raises:
        AuthenticationError: If authentication fails
    """
    return AuthenticatedUser(api_key)

Exceptions

core.exceptions

Custom exception classes for the API.

APIError

APIError(
    message: str,
    status_code: int = 500,
    details: dict[str, Any] | None = None,
    error_code: str | None = None,
)

Bases: Exception

Base exception for all API errors.

Initialize API exception.

Parameters:

  • message (str) –

    Human-readable error message

  • status_code (int, default: 500 ) –

    HTTP status code

  • details (dict[str, Any] | None, default: None ) –

    Additional error details

  • error_code (str | None, default: None ) –

    Optional error code for programmatic handling

Source code in core/exceptions.py
def __init__(
    self,
    message: str,
    status_code: int = 500,
    details: dict[str, Any] | None = None,
    error_code: str | None = None,
):
    """Initialize API exception.

    Args:
        message: Human-readable error message
        status_code: HTTP status code
        details: Additional error details
        error_code: Optional error code for programmatic handling
    """
    super().__init__(message)
    self.message = message
    self.status_code = status_code
    self.details = details or {}
    self.error_code = error_code or f"ERR_{status_code}"

ValidationError

ValidationError(
    message: str, status_code: int = 400, details: dict[str, Any] | None = None
)

Bases: APIError

Exception for validation errors.

Initialize validation error.

Parameters:

  • message (str) –

    Error message

  • status_code (int, default: 400 ) –

    HTTP status code (default: 400)

  • details (dict[str, Any] | None, default: None ) –

    Additional error details

Source code in core/exceptions.py
def __init__(
    self,
    message: str,
    status_code: int = 400,
    details: dict[str, Any] | None = None,
):
    """Initialize validation error.

    Args:
        message: Error message
        status_code: HTTP status code (default: 400)
        details: Additional error details
    """
    super().__init__(message, status_code, details)

NotFoundError

NotFoundError(
    message: str = "Resource not found", details: dict[str, Any] | None = None
)

Bases: APIError

Exception for resource not found errors.

Initialize not found error.

Parameters:

  • message (str, default: 'Resource not found' ) –

    Error message

  • details (dict[str, Any] | None, default: None ) –

    Additional error details

Source code in core/exceptions.py
def __init__(
    self,
    message: str = "Resource not found",
    details: dict[str, Any] | None = None,
):
    """Initialize not found error.

    Args:
        message: Error message
        details: Additional error details
    """
    super().__init__(message, 404, details, error_code="ERR_NOT_FOUND")

AuthenticationError

AuthenticationError(
    message: str = "Authentication required",
    details: dict[str, Any] | None = None,
)

Bases: APIError

Exception for authentication errors.

Initialize authentication error.

Parameters:

  • message (str, default: 'Authentication required' ) –

    Error message

  • details (dict[str, Any] | None, default: None ) –

    Additional error details

Source code in core/exceptions.py
def __init__(
    self,
    message: str = "Authentication required",
    details: dict[str, Any] | None = None,
):
    """Initialize authentication error.

    Args:
        message: Error message
        details: Additional error details
    """
    super().__init__(message, 401, details, error_code="ERR_AUTH")

AuthorizationError

AuthorizationError(
    message: str = "Insufficient permissions",
    details: dict[str, Any] | None = None,
)

Bases: APIError

Exception for authorization errors.

Initialize authorization error.

Parameters:

  • message (str, default: 'Insufficient permissions' ) –

    Error message

  • details (dict[str, Any] | None, default: None ) –

    Additional error details

Source code in core/exceptions.py
def __init__(
    self,
    message: str = "Insufficient permissions",
    details: dict[str, Any] | None = None,
):
    """Initialize authorization error.

    Args:
        message: Error message
        details: Additional error details
    """
    super().__init__(message, 403, details, error_code="ERR_AUTHORIZATION")

Services

Base Service

core.services.base

Base service interface for extensible services.

BaseService

BaseService(http_client: AsyncClient | None = None)

Bases: ABC

Base class for all services.

Initialize service.

Parameters:

  • http_client (AsyncClient | None, default: None ) –

    Optional HTTP client (will create one if not provided)

Source code in core/services/base.py
def __init__(self, http_client: httpx.AsyncClient | None = None):
    """Initialize service.

    Args:
        http_client: Optional HTTP client (will create one if not provided)
    """
    self._http_client = http_client
    self._owns_client = http_client is None

name property

name: str

Get service name.

call abstractmethod async

call(parameters: dict[str, Any] | None = None) -> dict[str, Any]

Call the service with given parameters.

Parameters:

  • parameters (dict[str, Any] | None, default: None ) –

    Service parameters

Returns:

  • dict[str, Any]

    Service response as dictionary

Raises:

  • Exception

    If service call fails

Source code in core/services/base.py
@abstractmethod
async def call(self, parameters: dict[str, Any] | None = None) -> dict[str, Any]:
    """Call the service with given parameters.

    Args:
        parameters: Service parameters

    Returns:
        Service response as dictionary

    Raises:
        Exception: If service call fails
    """
    pass

get_http_client async

get_http_client() -> httpx.AsyncClient

Get HTTP client (creates one if needed).

Returns:

  • AsyncClient

    httpx.AsyncClient instance

Source code in core/services/base.py
async def get_http_client(self) -> httpx.AsyncClient:
    """Get HTTP client (creates one if needed).

    Returns:
        httpx.AsyncClient instance
    """
    if self._http_client is None:
        self._http_client = httpx.AsyncClient(timeout=30.0)
    return self._http_client

cleanup async

cleanup()

Cleanup resources.

Source code in core/services/base.py
async def cleanup(self):
    """Cleanup resources."""
    if self._owns_client and self._http_client is not None:
        await self._http_client.aclose()
        self._http_client = None

__del__

__del__() -> None

Cleanup on deletion.

Source code in core/services/base.py
def __del__(self) -> None:  # noqa: B027
    """Cleanup on deletion."""
    # Note: This is a fallback, proper cleanup should use async context manager
    pass

API Key Management

API Key Manager

core.api_key_manager

API key management with file watching and caching.

APIKeyFileHandler

APIKeyFileHandler(manager: APIKeyManager, file_path: Path)

Bases: FileSystemEventHandler

File system event handler for API key files.

Initialize the handler.

Parameters:

  • manager (APIKeyManager) –

    The APIKeyManager instance to notify

  • file_path (Path) –

    Path to the API key file being watched

Source code in core/api_key_manager.py
def __init__(self, manager: "APIKeyManager", file_path: Path):
    """Initialize the handler.

    Args:
        manager: The APIKeyManager instance to notify
        file_path: Path to the API key file being watched
    """
    self.manager = manager
    self.file_path = file_path

on_modified

on_modified(event: FileSystemEvent)

Handle file modification events.

Parameters:

  • event (FileSystemEvent) –

    The file system event

Source code in core/api_key_manager.py
def on_modified(self, event: FileSystemEvent):
    """Handle file modification events.

    Args:
        event: The file system event
    """
    src_path = (
        event.src_path.decode()
        if isinstance(event.src_path, bytes)
        else event.src_path
    )
    if not event.is_directory and Path(src_path) == self.file_path:
        logger.info(f"API key file modified: {self.file_path}")
        self.manager.reload_file(str(self.file_path))

APIKeyManager

APIKeyManager()

Manages API keys from strings and files with automatic reloading.

Initialize the API key manager.

Source code in core/api_key_manager.py
def __init__(self):
    """Initialize the API key manager."""
    self._keys_cache: dict[str, set[str]] = {}
    self._file_watchers: dict[str, Any] = {}
    self._lock = threading.Lock()

load_keys

load_keys(source: str, source_id: str = 'default') -> set[str]

Load API keys from a string or file.

Parameters:

  • source (str) –

    Either comma-separated API keys or a file path

  • source_id (str, default: 'default' ) –

    Identifier for this key source (for caching)

Returns:

  • set[str]

    Set of valid API keys

Raises:

  • ValueError

    If the source appears to be a file path but doesn't exist

Source code in core/api_key_manager.py
def load_keys(self, source: str, source_id: str = "default") -> set[str]:
    """Load API keys from a string or file.

    Args:
        source: Either comma-separated API keys or a file path
        source_id: Identifier for this key source (for caching)

    Returns:
        Set of valid API keys

    Raises:
        ValueError: If the source appears to be a file path but doesn't exist
    """
    if not source or not source.strip():
        return set()

    # Validate the source
    validation = validate_api_key_source(source, source_id)

    # Log warnings
    for warning in validation["warnings"]:
        logger.warning(warning)

    # Raise error if invalid
    if not validation["valid"]:
        error_msg = "; ".join(validation["errors"])
        logger.error(f"Invalid API key source '{source_id}': {error_msg}")
        raise ValueError(f"Invalid API key source: {error_msg}")

    # Load based on type
    if validation["type"] == "file":
        return self._load_from_file(validation["path"], source_id)
    else:
        # Treat as comma-separated string
        return self._parse_key_string(source)

reload_file

reload_file(file_path: str)

Reload API keys from a file.

Parameters:

  • file_path (str) –

    Path to the file to reload

Source code in core/api_key_manager.py
def reload_file(self, file_path: str):
    """Reload API keys from a file.

    Args:
        file_path: Path to the file to reload
    """
    path = Path(file_path)
    cache_key = f"file:{path}"

    with self._lock:
        if cache_key in self._keys_cache:
            keys = self._read_keys_from_file(path)
            self._keys_cache[cache_key] = keys
            logger.info(f"Reloaded {len(keys)} API key(s) from {file_path}")

get_cached_keys

get_cached_keys(source: str) -> set[str] | None

Get cached keys for a source.

Parameters:

  • source (str) –

    The source string (file path or keys)

Returns:

  • set[str] | None

    Set of keys if cached, None otherwise

Source code in core/api_key_manager.py
def get_cached_keys(self, source: str) -> set[str] | None:
    """Get cached keys for a source.

    Args:
        source: The source string (file path or keys)

    Returns:
        Set of keys if cached, None otherwise
    """
    source_path = Path(source.strip())
    if source_path.exists() and source_path.is_file():
        cache_key = f"file:{source_path}"
        with self._lock:
            return self._keys_cache.get(cache_key)
    return None

validate_key

validate_key(api_key: str, *sources: str) -> bool

Validate an API key against one or more sources.

Parameters:

  • api_key (str) –

    The API key to validate

  • *sources (str, default: () ) –

    One or more sources (file paths or key strings)

Returns:

  • bool

    True if the key is valid in any source, False otherwise

Source code in core/api_key_manager.py
def validate_key(self, api_key: str, *sources: str) -> bool:
    """Validate an API key against one or more sources.

    Args:
        api_key: The API key to validate
        *sources: One or more sources (file paths or key strings)

    Returns:
        True if the key is valid in any source, False otherwise
    """
    if not api_key:
        return False

    for source in sources:
        if not source:
            continue

        keys = self.load_keys(source)
        if api_key in keys:
            return True

    return False

shutdown

shutdown()

Stop all file watchers.

Source code in core/api_key_manager.py
def shutdown(self):
    """Stop all file watchers."""
    with self._lock:
        for observer in self._file_watchers.values():
            observer.stop()
            observer.join()
        self._file_watchers.clear()
        logger.info("API key file watchers stopped")

get_api_key_manager

get_api_key_manager() -> APIKeyManager

Get the global API key manager instance.

Returns:

Source code in core/api_key_manager.py
def get_api_key_manager() -> APIKeyManager:
    """Get the global API key manager instance.

    Returns:
        The APIKeyManager instance
    """
    global _api_key_manager
    if _api_key_manager is None:
        _api_key_manager = APIKeyManager()
    return _api_key_manager

API Key Validator

core.api_key_validator

API key configuration validation.

APIKeyValidationError

Bases: Exception

Raised when API key configuration is invalid.

validate_api_key_source

validate_api_key_source(source: str, source_name: str = 'api_keys') -> dict

Validate an API key source (string or file path).

Parameters:

  • source (str) –

    The API key source (comma-separated keys or file path)

  • source_name (str, default: 'api_keys' ) –

    Name of the source for error messages

Returns:

  • dict

    Dictionary with validation results:

  • dict

    { "valid": bool, "type": "file" | "string", "path": Optional[Path], "warnings": list[str], "errors": list[str]

  • dict

    }

Raises:

Source code in core/api_key_validator.py
def validate_api_key_source(source: str, source_name: str = "api_keys") -> dict:
    """Validate an API key source (string or file path).

    Args:
        source: The API key source (comma-separated keys or file path)
        source_name: Name of the source for error messages

    Returns:
        Dictionary with validation results:
        {
            "valid": bool,
            "type": "file" | "string",
            "path": Optional[Path],
            "warnings": list[str],
            "errors": list[str]
        }

    Raises:
        APIKeyValidationError: If the source is invalid
    """
    result: dict[str, Any] = {
        "valid": True,
        "type": "string",
        "path": None,
        "warnings": [],
        "errors": [],
    }

    if not source or not source.strip():
        result["errors"].append(f"{source_name}: Empty or whitespace-only value")
        result["valid"] = False
        return result

    source = source.strip()

    # Check if it looks like a file path
    looks_like_path = (
        "/" in source or source.endswith(".txt") or source.endswith(".keys")
    )

    # Try to treat as file path
    try:
        path = Path(source)

        # If the path exists and is a file, it's definitely a file source
        if path.exists():
            if path.is_file():
                result["type"] = "file"
                result["path"] = path

                # Check file readability
                try:
                    with open(path) as f:
                        lines = f.readlines()

                    # Count valid keys
                    key_count = sum(
                        1
                        for line in lines
                        if line.strip() and not line.strip().startswith("#")
                    )

                    if key_count == 0:
                        result["warnings"].append(
                            f"{source_name}: File '{source}' contains no valid API keys"
                        )

                    logger.debug(
                        f"{source_name}: File '{source}' contains {key_count} key(s)"
                    )

                except Exception as e:
                    result["errors"].append(
                        f"{source_name}: Cannot read file '{source}': {e}"
                    )
                    result["valid"] = False

            else:
                result["errors"].append(
                    f"{source_name}: Path '{source}' exists but is not a file"
                )
                result["valid"] = False

        else:
            # Path doesn't exist
            if looks_like_path:
                # It looks like a file path but doesn't exist - this is an error
                result["errors"].append(
                    f"{source_name}: File '{source}' does not exist. "
                    f"If this is meant to be an API key, it looks like a file path."
                )
                result["valid"] = False
            else:
                # Treat as comma-separated string
                result["type"] = "string"
                keys = [k.strip() for k in source.split(",") if k.strip()]

                if len(keys) == 0:
                    result["errors"].append(
                        f"{source_name}: No valid keys found in string"
                    )
                    result["valid"] = False
                else:
                    # Check for suspiciously short or long keys
                    for key in keys:
                        if len(key) < 8:
                            result["warnings"].append(
                                f"{source_name}: Key '{key}' is very short "
                                "(<8 chars). Consider using stronger keys."
                            )
                        if len(key) > 200:
                            result["warnings"].append(
                                f"{source_name}: Key starts with '{key[:30]}...' "
                                "is very long. This might be a mistake."
                            )

                    logger.debug(f"{source_name}: Contains {len(keys)} inline key(s)")

    except Exception as e:
        result["errors"].append(f"{source_name}: Validation error: {e}")
        result["valid"] = False

    return result

validate_settings_api_keys

validate_settings_api_keys(settings_path: str = 'config/settings.json') -> bool

Validate API keys in settings file.

Parameters:

  • settings_path (str, default: 'config/settings.json' ) –

    Path to settings.json file

Returns:

  • bool

    True if valid, False otherwise

Source code in core/api_key_validator.py
def validate_settings_api_keys(settings_path: str = "config/settings.json") -> bool:
    """Validate API keys in settings file.

    Args:
        settings_path: Path to settings.json file

    Returns:
        True if valid, False otherwise
    """
    import json

    try:
        path = Path(settings_path)
        if not path.exists():
            logger.warning(f"Settings file not found: {settings_path}")
            return True  # Not an error if file doesn't exist

        with open(path) as f:
            data = json.load(f)

        api_keys = data.get("api_keys", "")
        if not api_keys:
            logger.info("No API keys configured in settings (authentication disabled)")
            return True

        result = validate_api_key_source(api_keys, "settings.api_keys")

        # Log warnings
        for warning in result["warnings"]:
            logger.warning(warning)

        # Log errors
        for error in result["errors"]:
            logger.error(error)

        return result["valid"]

    except Exception as e:
        logger.error(f"Failed to validate settings API keys: {e}")
        return False

validate_endpoints_api_keys

validate_endpoints_api_keys(
    endpoints_path: str = "config/endpoints.json",
) -> bool

Validate API keys in endpoints file.

Parameters:

  • endpoints_path (str, default: 'config/endpoints.json' ) –

    Path to endpoints.json file

Returns:

  • bool

    True if valid, False otherwise

Source code in core/api_key_validator.py
def validate_endpoints_api_keys(
    endpoints_path: str = "config/endpoints.json",
) -> bool:
    """Validate API keys in endpoints file.

    Args:
        endpoints_path: Path to endpoints.json file

    Returns:
        True if valid, False otherwise
    """
    import json

    try:
        path = Path(endpoints_path)
        if not path.exists():
            logger.warning(f"Endpoints file not found: {endpoints_path}")
            return True  # Not an error if file doesn't exist

        with open(path) as f:
            data = json.load(f)

        endpoints = data.get("endpoints", [])
        all_valid = True

        for i, endpoint in enumerate(endpoints):
            endpoint_id = endpoint.get("path", f"endpoint[{i}]")
            api_keys = endpoint.get("api_keys")

            if api_keys:
                result = validate_api_key_source(
                    api_keys, f"endpoints[{endpoint_id}].api_keys"
                )

                # Log warnings
                for warning in result["warnings"]:
                    logger.warning(warning)

                # Log errors
                for error in result["errors"]:
                    logger.error(error)

                if not result["valid"]:
                    all_valid = False

        return all_valid

    except Exception as e:
        logger.error(f"Failed to validate endpoints API keys: {e}")
        return False

validate_all_api_keys

validate_all_api_keys(
    settings_path: str = "config/settings.json",
    endpoints_path: str = "config/endpoints.json",
) -> bool

Validate all API key configurations.

Parameters:

  • settings_path (str, default: 'config/settings.json' ) –

    Path to settings.json file

  • endpoints_path (str, default: 'config/endpoints.json' ) –

    Path to endpoints.json file

Returns:

  • bool

    True if all configurations are valid, False otherwise

Source code in core/api_key_validator.py
def validate_all_api_keys(
    settings_path: str = "config/settings.json",
    endpoints_path: str = "config/endpoints.json",
) -> bool:
    """Validate all API key configurations.

    Args:
        settings_path: Path to settings.json file
        endpoints_path: Path to endpoints.json file

    Returns:
        True if all configurations are valid, False otherwise
    """
    logger.info("Validating API key configurations...")

    settings_valid = validate_settings_api_keys(settings_path)
    endpoints_valid = validate_endpoints_api_keys(endpoints_path)

    if settings_valid and endpoints_valid:
        logger.info("✅ All API key configurations are valid")
        return True
    else:
        logger.error("❌ API key configuration validation failed")
        return False

Caching

core.cache

Caching utilities for endpoints.

SimpleCache

SimpleCache()

Simple in-memory cache with TTL.

Initialize cache.

Source code in core/cache.py
def __init__(self):
    """Initialize cache."""
    self._cache: dict[str, tuple[Any, float]] = {}
    self._cleanup_interval = 60  # Clean up every 60 seconds
    self._last_cleanup = time.time()

get

get(key: str) -> Any | None

Get value from cache.

Parameters:

  • key (str) –

    Cache key

Returns:

  • Any | None

    Cached value or None if not found or expired

Source code in core/cache.py
def get(self, key: str) -> Any | None:
    """Get value from cache.

    Args:
        key: Cache key

    Returns:
        Cached value or None if not found or expired
    """
    self._cleanup_expired()

    if key not in self._cache:
        return None

    value, expiry = self._cache[key]
    if time.time() > expiry:
        del self._cache[key]
        return None

    return value

set

set(key: str, value: Any, ttl: int = 60)

Set value in cache.

Parameters:

  • key (str) –

    Cache key

  • value (Any) –

    Value to cache

  • ttl (int, default: 60 ) –

    Time to live in seconds

Source code in core/cache.py
def set(self, key: str, value: Any, ttl: int = 60):
    """Set value in cache.

    Args:
        key: Cache key
        value: Value to cache
        ttl: Time to live in seconds
    """
    self._cleanup_expired()
    expiry = time.time() + ttl
    self._cache[key] = (value, expiry)

clear

clear()

Clear all cache entries.

Source code in core/cache.py
def clear(self):
    """Clear all cache entries."""
    self._cache.clear()

cache_response

cache_response(ttl: int = 60, key_func: Callable | None = None)

Decorator to cache endpoint responses.

Parameters:

  • ttl (int, default: 60 ) –

    Time to live in seconds

  • key_func (Callable | None, default: None ) –

    Optional function to generate cache key from request

Returns:

  • Decorated function

Source code in core/cache.py
def cache_response(ttl: int = 60, key_func: Callable | None = None):
    """Decorator to cache endpoint responses.

    Args:
        ttl: Time to live in seconds
        key_func: Optional function to generate cache key from request

    Returns:
        Decorated function
    """

    def decorator(func: Callable):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # Generate cache key
            if key_func:
                cache_key = key_func(*args, **kwargs)
            else:
                # Default: use function name and arguments
                key_data = {
                    "func": getattr(func, "__name__", "unknown"),
                    "args": str(args),
                    "kwargs": str(sorted(kwargs.items())),
                }
                cache_key = hashlib.md5(
                    json.dumps(key_data, sort_keys=True).encode()
                ).hexdigest()

            # Check cache
            cached = _cache.get(cache_key)
            if cached is not None:
                return cached

            # Call function
            result = await func(*args, **kwargs)

            # Cache result
            _cache.set(cache_key, result, ttl)

            return result

        return wrapper

    return decorator

add_cache_headers

add_cache_headers(response: Response, ttl: int = 60, etag: str | None = None)

Add cache headers to response.

Parameters:

  • response (Response) –

    FastAPI response object

  • ttl (int, default: 60 ) –

    Time to live in seconds

  • etag (str | None, default: None ) –

    Optional ETag value

Source code in core/cache.py
def add_cache_headers(response: Response, ttl: int = 60, etag: str | None = None):
    """Add cache headers to response.

    Args:
        response: FastAPI response object
        ttl: Time to live in seconds
        etag: Optional ETag value
    """
    response.headers["Cache-Control"] = f"public, max-age={ttl}"
    if etag:
        response.headers["ETag"] = etag

Dependencies

core.dependencies

Shared dependencies for dependency injection.

HTTPClientDependency

HTTPClientDependency()

Dependency class for HTTP client with connection pooling.

Initialize the HTTP client dependency.

Source code in core/dependencies.py
def __init__(self):
    """Initialize the HTTP client dependency."""
    self._client: httpx.AsyncClient | None = None

__call__ async

__call__() -> httpx.AsyncClient

Get or create the HTTP client.

Source code in core/dependencies.py
async def __call__(self) -> httpx.AsyncClient:
    """Get or create the HTTP client."""
    if self._client is None:
        self._client = httpx.AsyncClient(
            timeout=30.0,
            follow_redirects=True,
            limits=httpx.Limits(max_keepalive_connections=10, max_connections=20),
        )
    return self._client

close async

close()

Close the HTTP client.

Source code in core/dependencies.py
async def close(self):
    """Close the HTTP client."""
    if self._client is not None:
        await self._client.aclose()
        self._client = None

get_settings_cached cached

get_settings_cached() -> Settings

Get cached settings instance.

Returns:

  • Settings

    Settings instance (cached).

Source code in core/dependencies.py
@lru_cache
def get_settings_cached() -> Settings:
    """Get cached settings instance.

    Returns:
        Settings instance (cached).
    """
    return get_settings()

get_http_client

get_http_client() -> httpx.AsyncClient

Get an HTTP client instance.

Note: This creates a new client each time. For connection pooling, consider using a shared client instance managed by the application.

Returns:

  • AsyncClient

    httpx.AsyncClient instance.

Source code in core/dependencies.py
def get_http_client() -> httpx.AsyncClient:
    """Get an HTTP client instance.

    Note: This creates a new client each time. For connection pooling,
    consider using a shared client instance managed by the application.

    Returns:
        httpx.AsyncClient instance.
    """
    return httpx.AsyncClient(timeout=30.0, follow_redirects=True)

Rate Limiting

core.rate_limiter

Rate limiting middleware and utilities.

RateLimiter

RateLimiter()

Simple in-memory rate limiter.

Note: State is per-process. When running multiple Gunicorn workers, each worker maintains its own rate limit counters, effectively multiplying the allowed rate by the number of workers. Use a shared store (e.g. Redis) if accurate cross-worker rate limiting is needed.

Initialize rate limiter.

Source code in core/rate_limiter.py
def __init__(self):
    """Initialize rate limiter."""
    self._requests: dict[str, list[float]] = defaultdict(list)
    self._cleanup_interval = 60  # Clean up old entries every 60 seconds
    self._last_cleanup = time.time()

check_rate_limit

check_rate_limit(
    identifier: str, limit: int, window_seconds: int = 60
) -> tuple[bool, int, int]

Check if request is within rate limit.

Parameters:

  • identifier (str) –

    Unique identifier (IP address, API key, etc.)

  • limit (int) –

    Maximum number of requests allowed

  • window_seconds (int, default: 60 ) –

    Time window in seconds (default: 60)

Returns:

  • tuple[bool, int, int]

    Tuple of (is_allowed, remaining, reset_time)

Source code in core/rate_limiter.py
def check_rate_limit(
    self, identifier: str, limit: int, window_seconds: int = 60
) -> tuple[bool, int, int]:
    """Check if request is within rate limit.

    Args:
        identifier: Unique identifier (IP address, API key, etc.)
        limit: Maximum number of requests allowed
        window_seconds: Time window in seconds (default: 60)

    Returns:
        Tuple of (is_allowed, remaining, reset_time)
    """
    self._cleanup_old_entries()

    current_time = time.time()
    cutoff_time = current_time - window_seconds

    # Get requests in the current window
    requests = [ts for ts in self._requests[identifier] if ts > cutoff_time]
    self._requests[identifier] = requests

    # Check if limit exceeded
    if len(requests) >= limit:
        reset_time = int(requests[0] + window_seconds)
        return False, 0, reset_time

    # Add current request
    requests.append(current_time)
    self._requests[identifier] = requests

    remaining = max(0, limit - len(requests))
    reset_time = int(current_time + window_seconds)

    return True, remaining, reset_time

RateLimitMiddleware

Bases: BaseHTTPMiddleware

Middleware for rate limiting.

dispatch async

dispatch(request: Request, call_next) -> Response

Apply rate limiting to requests.

Source code in core/rate_limiter.py
async def dispatch(self, request: Request, call_next) -> Response:
    """Apply rate limiting to requests."""
    settings = get_settings()

    # Skip rate limiting if disabled
    if not settings.rate_limit_enabled:
        return await call_next(request)

    # Determine identifier (IP address or API key)
    identifier = request.client.host if request.client else "unknown"
    api_key = request.headers.get("X-API-Key")
    is_authenticated = bool(api_key)

    # Use API key as identifier if authenticated
    if is_authenticated:
        identifier = f"api_key:{api_key}"

    # Determine rate limit
    if is_authenticated:
        limit = settings.rate_limit_per_minute_authenticated
    else:
        limit = settings.rate_limit_per_minute

    # Check rate limit
    allowed, remaining, reset_time = _rate_limiter.check_rate_limit(
        identifier, limit
    )

    # Add rate limit headers
    response = await call_next(request)
    response.headers["X-RateLimit-Limit"] = str(limit)
    response.headers["X-RateLimit-Remaining"] = str(remaining)
    response.headers["X-RateLimit-Reset"] = str(reset_time)

    if not allowed:
        return JSONResponse(
            status_code=status.HTTP_429_TOO_MANY_REQUESTS,
            content={
                "error": {
                    "message": "Rate limit exceeded",
                    "status_code": 429,
                    "details": {
                        "limit": limit,
                        "reset_time": reset_time,
                    },
                }
            },
            headers={
                "X-RateLimit-Limit": str(limit),
                "X-RateLimit-Remaining": "0",
                "X-RateLimit-Reset": str(reset_time),
            },
        )

    return response

Request Validation

core.request_validation

Request validation and sanitization utilities.

RequestValidationMiddleware

Bases: BaseHTTPMiddleware

Middleware for request validation and sanitization.

dispatch async

dispatch(request: Request, call_next: Callable) -> Response

Validate and sanitize requests.

Source code in core/request_validation.py
async def dispatch(self, request: Request, call_next: Callable) -> Response:
    """Validate and sanitize requests."""
    # Check request size
    content_length = request.headers.get("content-length")
    if content_length:
        try:
            size = int(content_length)
            if size > MAX_REQUEST_SIZE:
                return JSONResponse(
                    status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
                    content={
                        "error": {
                            "message": "Request entity too large",
                            "status_code": 413,
                            "error_code": "ERR_413",
                            "details": {
                                "max_size": MAX_REQUEST_SIZE,
                                "requested_size": size,
                            },
                        }
                    },
                )
        except ValueError:
            # Invalid content-length header, continue
            pass

    # Validate content type for POST/PUT/PATCH requests
    if request.method in ["POST", "PUT", "PATCH"]:
        content_type = request.headers.get("content-type", "")
        allowed = (
            "application/json",
            "application/x-www-form-urlencoded",
            "multipart/form-data",
        )
        if (
            not content_type.startswith(allowed)
            and content_length
            and int(content_length) > 0
        ):
            # Allow empty content type (some clients don't send it)
            logger.warning(
                f"Unexpected content-type: {content_type} for "
                f"{request.method} {request.url.path}"
            )

    return await call_next(request)

Middleware

core.middleware

Middleware for request/response processing.

RequestIDMiddleware

Bases: BaseHTTPMiddleware

Middleware to add request ID to requests and responses.

dispatch async

dispatch(request: Request, call_next: Callable) -> Response

Process request and add request ID.

Source code in core/middleware.py
async def dispatch(self, request: Request, call_next: Callable) -> Response:
    """Process request and add request ID."""
    # Generate or get request ID
    request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))

    # Add to request state
    request.state.request_id = request_id

    # Process request
    response = await call_next(request)

    # Add request ID to response headers
    response.headers["X-Request-ID"] = request_id

    return response

LoggingMiddleware

Bases: BaseHTTPMiddleware

Middleware for request/response logging.

dispatch async

dispatch(request: Request, call_next: Callable) -> Response

Log request and response.

Source code in core/middleware.py
async def dispatch(self, request: Request, call_next: Callable) -> Response:
    """Log request and response."""
    start_time = time.time()

    # Get request ID from state
    request_id = getattr(request.state, "request_id", "unknown")

    # Log request (don't log API keys)
    api_key_header = request.headers.get("X-API-Key")
    has_auth = bool(api_key_header)

    logger.info(
        f"Request: {request.method} {request.url.path}",
        extra={
            "request_id": request_id,
            "method": request.method,
            "path": request.url.path,
            "client": request.client.host if request.client else None,
            "authenticated": has_auth,
        },
    )

    # Process request
    try:
        response = await call_next(request)
        process_time = time.time() - start_time

        # Log response
        if process_time > 1.0:  # Log slow requests as warning
            logger.warning(
                f"Slow request: {request.method} {request.url.path} "
                f"- {process_time:.3f}s",
                extra={
                    "request_id": request_id,
                    "method": request.method,
                    "path": request.url.path,
                    "status_code": response.status_code,
                    "process_time": process_time,
                },
            )
        else:
            logger.info(
                f"Response: {request.method} {request.url.path} "
                f"- {response.status_code}",
                extra={
                    "request_id": request_id,
                    "method": request.method,
                    "path": request.url.path,
                    "status_code": response.status_code,
                    "process_time": process_time,
                },
            )

        # Add process time header
        response.headers["X-Process-Time"] = str(round(process_time, 4))

        return response
    except Exception as e:
        process_time = time.time() - start_time
        logger.error(
            f"Error processing request: {request.method} {request.url.path}",
            extra={
                "request_id": request_id,
                "method": request.method,
                "path": request.url.path,
                "error": str(e),
                "process_time": process_time,
            },
            exc_info=True,
        )
        raise

SecurityHeadersMiddleware

Bases: BaseHTTPMiddleware

Middleware to add security headers to responses.

dispatch async

dispatch(request: Request, call_next: Callable) -> Response

Add security headers to response.

Source code in core/middleware.py
async def dispatch(self, request: Request, call_next: Callable) -> Response:
    """Add security headers to response."""
    response = await call_next(request)

    # Security headers
    response.headers["X-Content-Type-Options"] = "nosniff"
    response.headers["X-Frame-Options"] = "SAMEORIGIN"
    response.headers["X-XSS-Protection"] = "1; mode=block"
    response.headers["Strict-Transport-Security"] = (
        "max-age=31536000; includeSubDomains"
    )
    response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"

    return response

configure_cors

configure_cors(
    app: Starlette, allowed_origins: list[str] | None = None
) -> None

Configure CORS middleware for the application.

Parameters:

  • app (Starlette) –

    FastAPI application instance

  • allowed_origins (list[str] | None, default: None ) –

    List of allowed origins. If None, allows all origins.

Source code in core/middleware.py
def configure_cors(app: Starlette, allowed_origins: list[str] | None = None) -> None:
    """Configure CORS middleware for the application.

    Args:
        app: FastAPI application instance
        allowed_origins: List of allowed origins. If None, allows all origins.
    """
    if allowed_origins is None:
        allowed_origins = ["*"]

    app.add_middleware(
        CORSMiddleware,  # type: ignore[invalid-argument-type]
        allow_origins=allowed_origins,
        allow_credentials=True,
        allow_methods=["*"],
        allow_headers=["*"],
        expose_headers=["X-Request-ID", "X-Process-Time"],
    )

Router Factory

core.router_factory

Dynamic router factory for configurable endpoints.

DynamicRouter

DynamicRouter(app: FastAPI)

Dynamic router for configurable endpoints.

Initialize dynamic router.

Parameters:

  • app (FastAPI) –

    FastAPI application instance

Source code in core/router_factory.py
def __init__(self, app: fastapi.FastAPI):
    """Initialize dynamic router.

    Args:
        app: FastAPI application instance
    """
    self.app = app
    self.registered_endpoints: list[str] = []

register_endpoint

register_endpoint(config: EndpointConfig)

Register a single endpoint from configuration.

Parameters:

Source code in core/router_factory.py
def register_endpoint(self, config: EndpointConfig):
    """Register a single endpoint from configuration.

    Args:
        config: Endpoint configuration
    """
    if not config.enabled:
        logger.debug(f"Skipping disabled endpoint: {config.path}")
        return

    # Get service class
    service_class = get_service(config.service)
    if not service_class:
        logger.warning(
            f"Service '{config.service}' not found for endpoint {config.path}"
        )
        return

    # Create endpoint handler
    handler = self._create_handler(config, service_class)

    # Register route
    route_method = getattr(self.app, config.method.value.lower())
    route_kwargs = {
        "path": config.path,
        "summary": config.summary or config.description,
        "description": config.description,
        "tags": config.tags or ["configurable"],
    }

    # Add response model if specified
    if config.response_model:
        # For now, we'll use dict as response model
        # Can be extended to use actual Pydantic models
        route_kwargs["response_model"] = dict

    # Register the route
    route_method(**route_kwargs)(handler)

    endpoint_id = f"{config.method.value} {config.path}"
    self.registered_endpoints.append(endpoint_id)
    logger.info(f"Registered configurable endpoint: {endpoint_id}")

load_and_register

load_and_register(config_path: str = 'config/endpoints.json')

Load configuration and register all endpoints.

Parameters:

  • config_path (str, default: 'config/endpoints.json' ) –

    Path to endpoints configuration file

Source code in core/router_factory.py
def load_and_register(self, config_path: str = "config/endpoints.json"):
    """Load configuration and register all endpoints.

    Args:
        config_path: Path to endpoints configuration file
    """
    try:
        config = load_endpoints_config(config_path)
        for endpoint_config in config.endpoints:
            try:
                self.register_endpoint(endpoint_config)
            except Exception as e:
                logger.error(
                    f"Failed to register endpoint {endpoint_config.path}: {e}",
                    exc_info=True,
                )
    except Exception as e:
        logger.error(f"Failed to load endpoint configuration: {e}", exc_info=True)

Metrics

core.metrics

Metrics collection for monitoring.

EndpointMetrics dataclass

EndpointMetrics(
    count: int = 0,
    total_time: float = 0.0,
    error_count: int = 0,
    last_request_time: float = 0.0,
    status_codes: dict[int, int] = (lambda: defaultdict(int))(),
)

Metrics for a single endpoint.

average_time property

average_time: float

Calculate average request time.

error_rate property

error_rate: float

Calculate error rate.

MetricsCollector

MetricsCollector()

Collects metrics for the API.

Note: State is per-process. When running multiple Gunicorn workers, each worker maintains its own metrics. Use a shared store (e.g. Redis or Prometheus) if aggregated cross-worker metrics are needed.

Initialize metrics collector.

Source code in core/metrics.py
def __init__(self):
    """Initialize metrics collector."""
    self._endpoint_metrics: dict[str, EndpointMetrics] = defaultdict(
        EndpointMetrics
    )
    self._start_time = time.time()
    self._total_requests = 0
    self._total_errors = 0

record_request

record_request(path: str, method: str, status_code: int, duration: float)

Record a request metric.

Parameters:

  • path (str) –

    Request path

  • method (str) –

    HTTP method

  • status_code (int) –

    Response status code

  • duration (float) –

    Request duration in seconds

Source code in core/metrics.py
def record_request(
    self,
    path: str,
    method: str,
    status_code: int,
    duration: float,
):
    """Record a request metric.

    Args:
        path: Request path
        method: HTTP method
        status_code: Response status code
        duration: Request duration in seconds
    """
    endpoint_key = f"{method} {path}"
    metrics = self._endpoint_metrics[endpoint_key]

    metrics.count += 1
    metrics.total_time += duration
    metrics.last_request_time = time.time()
    metrics.status_codes[status_code] += 1

    if status_code >= 400:
        metrics.error_count += 1
        self._total_errors += 1

    self._total_requests += 1

get_metrics

get_metrics() -> dict

Get all collected metrics.

Returns:

  • dict

    Dictionary with metrics data

Source code in core/metrics.py
def get_metrics(self) -> dict:
    """Get all collected metrics.

    Returns:
        Dictionary with metrics data
    """
    uptime = time.time() - self._start_time

    endpoint_data = {}
    for endpoint, metrics in self._endpoint_metrics.items():
        endpoint_data[endpoint] = {
            "count": metrics.count,
            "average_time": round(metrics.average_time, 4),
            "error_count": metrics.error_count,
            "error_rate": round(metrics.error_rate, 4),
            "status_codes": dict(metrics.status_codes),
        }

    return {
        "uptime_seconds": round(uptime, 2),
        "total_requests": self._total_requests,
        "total_errors": self._total_errors,
        "error_rate": (
            round(self._total_errors / self._total_requests, 4)
            if self._total_requests > 0
            else 0.0
        ),
        "endpoints": endpoint_data,
    }

reset

reset()

Reset all metrics.

Source code in core/metrics.py
def reset(self):
    """Reset all metrics."""
    self._endpoint_metrics.clear()
    self._start_time = time.time()
    self._total_requests = 0
    self._total_errors = 0

MetricsMiddleware

Bases: BaseHTTPMiddleware

Middleware for collecting metrics.

dispatch async

dispatch(request: Request, call_next)

Collect metrics for requests.

Source code in core/metrics.py
async def dispatch(self, request: Request, call_next):
    """Collect metrics for requests."""
    start_time = time.time()

    response = await call_next(request)

    duration = time.time() - start_time

    # Record metrics
    metrics_collector.record_request(
        path=request.url.path,
        method=request.method,
        status_code=response.status_code,
        duration=duration,
    )

    return response

Logging

core.logging_config

Logging configuration for the application.

RequestIDFilter

Bases: Filter

Logging filter to add request ID to log records.

filter

filter(record: LogRecord) -> bool

Add request_id to log record if not present.

Source code in core/logging_config.py
def filter(self, record: logging.LogRecord) -> bool:
    """Add request_id to log record if not present."""
    if not hasattr(record, "request_id"):
        record.request_id = "N/A"
    return True

setup_logging

setup_logging(
    level: str = "INFO",
    log_file: Path | None = None,
    log_format: str | None = None,
) -> None

Configure application logging.

Parameters:

  • level (str, default: 'INFO' ) –

    Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)

  • log_file (Path | None, default: None ) –

    Optional path to log file

  • log_format (str | None, default: None ) –

    Optional custom log format

Source code in core/logging_config.py
def setup_logging(
    level: str = "INFO",
    log_file: Path | None = None,
    log_format: str | None = None,
) -> None:
    """Configure application logging.

    Args:
        level: Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
        log_file: Optional path to log file
        log_format: Optional custom log format
    """
    if log_format is None:
        log_format = (
            "%(asctime)s - %(name)s - %(levelname)s - [%(request_id)s] - %(message)s"
        )

    # Add request ID filter
    request_id_filter = RequestIDFilter()

    # Create formatter
    formatter = logging.Formatter(log_format)

    # Configure root logger
    root_logger = logging.getLogger()
    root_logger.setLevel(getattr(logging, level.upper(), logging.INFO))

    # Remove existing handlers
    root_logger.handlers.clear()

    # Console handler
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setFormatter(formatter)
    console_handler.addFilter(request_id_filter)
    root_logger.addHandler(console_handler)

    # File handler (if specified)
    if log_file:
        log_file.parent.mkdir(parents=True, exist_ok=True)
        file_handler = logging.FileHandler(log_file)
        file_handler.setFormatter(formatter)
        file_handler.addFilter(request_id_filter)
        root_logger.addHandler(file_handler)

    # Set levels for third-party libraries
    logging.getLogger("uvicorn.access").setLevel(logging.WARNING)
    logging.getLogger("httpx").setLevel(logging.WARNING)

Next Steps