Skip to content

Logging and Monitoring API

SpindleX provides a comprehensive logging and performance monitoring system.

Core Logging

spindlex.logging.logger

Main logging interface for SpindleX.

Classes

SSHLogger

Enhanced logger for SpindleX with security and performance features.

Source code in spindlex/logging/logger.py
class SSHLogger:
    """Enhanced logger for SpindleX with security and performance features."""

    def __init__(self, name: str, logger: Optional[logging.Logger] = None):
        """
        Initialize SSH logger.

        Args:
            name: Logger name
            logger: Existing logger instance (if None, creates new one)
        """
        self.name = name
        self.logger = logger or logging.getLogger(name)

        from .sanitizer import SanitizingFilter

        has_filter = any(isinstance(f, SanitizingFilter) for f in self.logger.filters)
        if not has_filter:
            self.logger.addFilter(SanitizingFilter())

        self._security_logger: Optional[logging.Logger] = None
        self._performance_logger: Optional[logging.Logger] = None

    def debug(self, msg: str, *args: Any, **kwargs: Any) -> None:
        """Log debug message."""
        self.logger.debug(msg, *args, **kwargs)

    def info(self, msg: str, *args: Any, **kwargs: Any) -> None:
        """Log info message."""
        self.logger.info(msg, *args, **kwargs)

    def warning(self, msg: str, *args: Any, **kwargs: Any) -> None:
        """Log warning message."""
        self.logger.warning(msg, *args, **kwargs)

    def error(self, msg: str, *args: Any, **kwargs: Any) -> None:
        """Log error message."""
        self.logger.error(msg, *args, **kwargs)

    def critical(self, msg: str, *args: Any, **kwargs: Any) -> None:
        """Log critical message."""
        self.logger.critical(msg, *args, **kwargs)

    def exception(self, msg: str, *args: Any, **kwargs: Any) -> None:
        """Log exception with traceback."""
        self.logger.exception(msg, *args, **kwargs)

    def security_event(
        self,
        event_type: str,
        message: str,
        client_ip: str = "unknown",
        username: str = "unknown",
        **kwargs: Any,
    ) -> None:
        """
        Log security-related event.

        Args:
            event_type: Type of security event (auth_success, auth_failure, etc.)
            message: Event description
            client_ip: Client IP address
            username: Username involved in event
            **kwargs: Additional event data
        """
        if not self._security_logger:
            self._security_logger = logging.getLogger(f"{self.name}.security")

        extra = {
            "event_type": event_type,
            "client_ip": client_ip,
            "username": username,
            **kwargs,
        }

        self._security_logger.info(message, extra=extra)

    def performance_metric(
        self, operation: str, duration: float, **kwargs: Any
    ) -> None:
        """
        Log performance metric.

        Args:
            operation: Operation name
            duration: Operation duration in seconds
            **kwargs: Additional metric data
        """
        if not self._performance_logger:
            self._performance_logger = logging.getLogger(f"{self.name}.performance")

        extra = {"operation": operation, "duration_seconds": duration, **kwargs}

        message = f"Operation '{operation}' completed in {duration:.4f}s"
        self._performance_logger.info(message, extra=extra)

    def protocol_debug(
        self, direction: str, message_type: str, data: dict[str, Any], **kwargs: Any
    ) -> None:
        """
        Log protocol-level debugging information.

        Args:
            direction: 'sent' or 'received'
            message_type: SSH message type
            data: Message data
            **kwargs: Additional debug data
        """
        extra = {
            "direction": direction,
            "message_type": message_type,
            "data": data,
            **kwargs,
        }

        message = f"SSH {direction} {message_type}"
        self.logger.debug(message, extra=extra)
Methods:
__init__(name, logger=None)

Initialize SSH logger.

Parameters:

Name Type Description Default
name str

Logger name

required
logger Optional[Logger]

Existing logger instance (if None, creates new one)

None
Source code in spindlex/logging/logger.py
def __init__(self, name: str, logger: Optional[logging.Logger] = None):
    """
    Initialize SSH logger.

    Args:
        name: Logger name
        logger: Existing logger instance (if None, creates new one)
    """
    self.name = name
    self.logger = logger or logging.getLogger(name)

    from .sanitizer import SanitizingFilter

    has_filter = any(isinstance(f, SanitizingFilter) for f in self.logger.filters)
    if not has_filter:
        self.logger.addFilter(SanitizingFilter())

    self._security_logger: Optional[logging.Logger] = None
    self._performance_logger: Optional[logging.Logger] = None
critical(msg, *args, **kwargs)

Log critical message.

Source code in spindlex/logging/logger.py
def critical(self, msg: str, *args: Any, **kwargs: Any) -> None:
    """Log critical message."""
    self.logger.critical(msg, *args, **kwargs)
debug(msg, *args, **kwargs)

Log debug message.

Source code in spindlex/logging/logger.py
def debug(self, msg: str, *args: Any, **kwargs: Any) -> None:
    """Log debug message."""
    self.logger.debug(msg, *args, **kwargs)
error(msg, *args, **kwargs)

Log error message.

Source code in spindlex/logging/logger.py
def error(self, msg: str, *args: Any, **kwargs: Any) -> None:
    """Log error message."""
    self.logger.error(msg, *args, **kwargs)
exception(msg, *args, **kwargs)

Log exception with traceback.

Source code in spindlex/logging/logger.py
def exception(self, msg: str, *args: Any, **kwargs: Any) -> None:
    """Log exception with traceback."""
    self.logger.exception(msg, *args, **kwargs)
info(msg, *args, **kwargs)

Log info message.

Source code in spindlex/logging/logger.py
def info(self, msg: str, *args: Any, **kwargs: Any) -> None:
    """Log info message."""
    self.logger.info(msg, *args, **kwargs)
performance_metric(operation, duration, **kwargs)

Log performance metric.

Parameters:

Name Type Description Default
operation str

Operation name

required
duration float

Operation duration in seconds

required
**kwargs Any

Additional metric data

{}
Source code in spindlex/logging/logger.py
def performance_metric(
    self, operation: str, duration: float, **kwargs: Any
) -> None:
    """
    Log performance metric.

    Args:
        operation: Operation name
        duration: Operation duration in seconds
        **kwargs: Additional metric data
    """
    if not self._performance_logger:
        self._performance_logger = logging.getLogger(f"{self.name}.performance")

    extra = {"operation": operation, "duration_seconds": duration, **kwargs}

    message = f"Operation '{operation}' completed in {duration:.4f}s"
    self._performance_logger.info(message, extra=extra)
protocol_debug(direction, message_type, data, **kwargs)

Log protocol-level debugging information.

Parameters:

Name Type Description Default
direction str

'sent' or 'received'

required
message_type str

SSH message type

required
data dict[str, Any]

Message data

required
**kwargs Any

Additional debug data

{}
Source code in spindlex/logging/logger.py
def protocol_debug(
    self, direction: str, message_type: str, data: dict[str, Any], **kwargs: Any
) -> None:
    """
    Log protocol-level debugging information.

    Args:
        direction: 'sent' or 'received'
        message_type: SSH message type
        data: Message data
        **kwargs: Additional debug data
    """
    extra = {
        "direction": direction,
        "message_type": message_type,
        "data": data,
        **kwargs,
    }

    message = f"SSH {direction} {message_type}"
    self.logger.debug(message, extra=extra)
security_event(event_type, message, client_ip='unknown', username='unknown', **kwargs)

Log security-related event.

Parameters:

Name Type Description Default
event_type str

Type of security event (auth_success, auth_failure, etc.)

required
message str

Event description

required
client_ip str

Client IP address

'unknown'
username str

Username involved in event

'unknown'
**kwargs Any

Additional event data

{}
Source code in spindlex/logging/logger.py
def security_event(
    self,
    event_type: str,
    message: str,
    client_ip: str = "unknown",
    username: str = "unknown",
    **kwargs: Any,
) -> None:
    """
    Log security-related event.

    Args:
        event_type: Type of security event (auth_success, auth_failure, etc.)
        message: Event description
        client_ip: Client IP address
        username: Username involved in event
        **kwargs: Additional event data
    """
    if not self._security_logger:
        self._security_logger = logging.getLogger(f"{self.name}.security")

    extra = {
        "event_type": event_type,
        "client_ip": client_ip,
        "username": username,
        **kwargs,
    }

    self._security_logger.info(message, extra=extra)
warning(msg, *args, **kwargs)

Log warning message.

Source code in spindlex/logging/logger.py
def warning(self, msg: str, *args: Any, **kwargs: Any) -> None:
    """Log warning message."""
    self.logger.warning(msg, *args, **kwargs)

Functions:

configure_logging(level=logging.INFO, format_type='standard', output_file=None, security_file=None, performance_file=None, sanitize=True, json_format=False)

Configure SpindleX logging.

Parameters:

Name Type Description Default
level Union[str, int]

Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)

INFO
format_type str

Format type ('standard', 'debug', 'json')

'standard'
output_file Optional[str]

Main log file path

None
security_file Optional[str]

Security events log file path

None
performance_file Optional[str]

Performance metrics log file path

None
sanitize bool

Whether to sanitize sensitive information

True
json_format bool

Whether to use JSON formatting for main logs

False
Source code in spindlex/logging/logger.py
def configure_logging(
    level: Union[str, int] = logging.INFO,
    format_type: str = "standard",
    output_file: Optional[str] = None,
    security_file: Optional[str] = None,
    performance_file: Optional[str] = None,
    sanitize: bool = True,
    json_format: bool = False,
) -> None:
    """
    Configure SpindleX logging.

    Args:
        level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
        format_type: Format type ('standard', 'debug', 'json')
        output_file: Main log file path
        security_file: Security events log file path
        performance_file: Performance metrics log file path
        sanitize: Whether to sanitize sensitive information
        json_format: Whether to use JSON formatting for main logs
    """
    # Convert string level to int if needed
    if isinstance(level, str):
        level = getattr(logging, level.upper())

    # Check for environment variable override
    import os

    env_level = os.environ.get("SPINDLEX_LOG_LEVEL")
    if env_level:
        try:
            level = getattr(logging, env_level.upper())
        except (AttributeError, TypeError):
            pass

    # Configure root SSH logger
    root_logger = logging.getLogger("spindlex")
    root_logger.setLevel(level)

    # Clear existing handlers
    root_logger.handlers.clear()

    # Choose formatter
    formatter: logging.Formatter
    if json_format or format_type == "json":
        formatter = JSONFormatter(sanitize=sanitize)
    elif format_type == "debug":
        formatter = DebugFormatter(sanitize=sanitize)
    else:
        formatter = SSHFormatter(sanitize=sanitize)

    # Add main handler
    handler: logging.Handler
    if output_file:
        import os

        dirname = os.path.dirname(output_file)
        if dirname:
            os.makedirs(dirname, exist_ok=True)
        handler = logging.FileHandler(output_file)
    else:
        handler = logging.StreamHandler()

    handler.setFormatter(formatter)
    handler.setLevel(level)
    root_logger.addHandler(handler)

    # Configure security logger
    if security_file or not output_file:
        security_logger = logging.getLogger("spindlex.security")
        security_logger.setLevel(logging.INFO)
        security_handler = SecurityHandler(security_file)
        security_logger.addHandler(security_handler)
        security_logger.propagate = False  # Don't propagate to root logger

    # Configure performance logger
    if performance_file or not output_file:
        perf_logger = logging.getLogger("spindlex.performance")
        perf_logger.setLevel(logging.INFO)
        perf_handler = PerformanceHandler(performance_file, json_format=True)
        perf_logger.addHandler(perf_handler)
        perf_logger.propagate = False  # Don't propagate to root logger

    # Set library-wide logging level
    logging.getLogger("spindlex").setLevel(level)

get_logger(name)

Get or create SSH logger instance.

Parameters:

Name Type Description Default
name str

Logger name

required

Returns:

Type Description
SSHLogger

SSHLogger instance

Source code in spindlex/logging/logger.py
def get_logger(name: str) -> SSHLogger:
    """
    Get or create SSH logger instance.

    Args:
        name: Logger name

    Returns:
        SSHLogger instance
    """
    full_name = f"spindlex.{name}" if not name.startswith("spindlex") else name
    if full_name not in _loggers:
        _loggers[full_name] = SSHLogger(full_name)
    return _loggers[full_name]

Sanitization

spindlex.logging.sanitizer

Log sanitization utilities for security-sensitive information.

Classes

LogSanitizer

Sanitizes log messages to prevent sensitive information leakage.

Source code in spindlex/logging/sanitizer.py
class LogSanitizer:
    """Sanitizes log messages to prevent sensitive information leakage."""

    # Patterns for sensitive data that should be redacted
    SENSITIVE_PATTERNS: dict[str, Pattern[str]] = {
        # Passwords and secrets (Captured prefix and value) - handles multiple separators
        "password": re.compile(r'(?i)(\bpassword(?:[\s:=]|is\b)+)([^\s,"}]+)'),
        "secret": re.compile(r'(?i)(\bsecret(?:[\s:=]|is\b)+)([^\s,"}]+)'),
        "token": re.compile(r'(?i)(\btoken(?:[\s:=]|is\b)+)([^\s,"}]+)'),
        "key": re.compile(
            r'(?i)(\b(?:api_key|private_key|host_key|secret_key|auth_key|access_key|encryption_key)(?:[\s:=]|is\b)+)([^\s,"}]+)'
        ),
        "passphrase": re.compile(r'(?i)(\bpassphrase(?:[\s:=]|is\b)+)([^\s,"}]+)'),
        # SSH key material (base64 encoded)
        "ssh_key": re.compile(r"AAAA[A-Za-z0-9+/]{20,}={0,2}"),
        # Private key headers/footers
        "private_key": re.compile(
            r"-----BEGIN [A-Z ]+PRIVATE KEY-----.*?-----END [A-Z ]+PRIVATE KEY-----",
            re.DOTALL,
        ),
        # IP addresses (partial redaction)
        "ip": re.compile(r"(\d{1,3}\.\d{1,3}\.\d{1,3}\.)\d{1,3}"),
        # Hostnames (partial redaction for internal hosts)
        "hostname": re.compile(r"([a-zA-Z0-9-]+\.)(internal|local|corp|lan)"),
    }

    # Replacement patterns
    REPLACEMENTS = {
        "password": "[PASSWORD_REDACTED]",  # nosec
        "secret": "[SECRET_REDACTED]",  # nosec
        "token": "[TOKEN_REDACTED]",  # nosec
        "key": "[KEY_REDACTED]",  # nosec
        "passphrase": "[PASSPHRASE_REDACTED]",  # nosec
        "ssh_key": "[SSH_KEY_REDACTED]",  # nosec
        "private_key": "[PRIVATE_KEY_REDACTED]",  # nosec
        "ip_partial": r"\1***",
        "hostname_partial": r"\1[REDACTED]",
    }

    @classmethod
    def sanitize_message(cls, message: str) -> str:
        """
        Sanitize a log message by redacting sensitive information.

        Args:
            message: The log message to sanitize

        Returns:
            Sanitized message with sensitive data redacted
        """
        sanitized = message

        # Apply password/secret patterns using groups
        for pattern_type in ["password", "secret", "token", "key", "passphrase"]:
            pattern = cls.SENSITIVE_PATTERNS[pattern_type]

            def replace_with_group(match: re.Match[str], pt: str = pattern_type) -> str:
                return match.group(1) + cls.REPLACEMENTS[pt]

            sanitized = pattern.sub(replace_with_group, sanitized)

        # SSH key material
        sanitized = cls.SENSITIVE_PATTERNS["ssh_key"].sub(
            cls.REPLACEMENTS["ssh_key"], sanitized
        )

        # Private key blocks
        sanitized = cls.SENSITIVE_PATTERNS["private_key"].sub(
            cls.REPLACEMENTS["private_key"], sanitized
        )

        # IP addresses (keep first 3 octets)
        sanitized = cls.SENSITIVE_PATTERNS["ip"].sub(
            cls.REPLACEMENTS["ip_partial"], sanitized
        )

        # Internal hostnames
        sanitized = cls.SENSITIVE_PATTERNS["hostname"].sub(
            cls.REPLACEMENTS["hostname_partial"], sanitized
        )

        return sanitized

    @classmethod
    def sanitize_dict(cls, data: dict[str, Any]) -> dict[str, Any]:
        """
        Sanitize a dictionary by redacting sensitive keys and values.

        Args:
            data: Dictionary to sanitize

        Returns:
            Sanitized dictionary
        """
        sanitized: dict[str, Any] = {}
        sensitive_keys = {
            "password",
            "secret",
            "token",
            "key",
            "private_key",
            "passphrase",
        }

        for key, value in data.items():
            if key.lower() in sensitive_keys:
                sanitized[key] = "[REDACTED]"
            elif isinstance(value, str):
                sanitized[key] = cls.sanitize_message(value)
            elif isinstance(value, dict):
                sanitized[key] = cls.sanitize_dict(value)
            elif isinstance(value, (list, tuple)):
                sanitized_list: list[Any] = []
                for item in value:
                    if isinstance(item, str):
                        sanitized_list.append(cls.sanitize_message(item))
                    elif isinstance(item, dict):
                        sanitized_list.append(cls.sanitize_dict(item))
                    else:
                        sanitized_list.append(item)
                sanitized[key] = sanitized_list
            else:
                sanitized[key] = value

        return sanitized
Methods:
sanitize_dict(data) classmethod

Sanitize a dictionary by redacting sensitive keys and values.

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary to sanitize

required

Returns:

Type Description
dict[str, Any]

Sanitized dictionary

Source code in spindlex/logging/sanitizer.py
@classmethod
def sanitize_dict(cls, data: dict[str, Any]) -> dict[str, Any]:
    """
    Sanitize a dictionary by redacting sensitive keys and values.

    Args:
        data: Dictionary to sanitize

    Returns:
        Sanitized dictionary
    """
    sanitized: dict[str, Any] = {}
    sensitive_keys = {
        "password",
        "secret",
        "token",
        "key",
        "private_key",
        "passphrase",
    }

    for key, value in data.items():
        if key.lower() in sensitive_keys:
            sanitized[key] = "[REDACTED]"
        elif isinstance(value, str):
            sanitized[key] = cls.sanitize_message(value)
        elif isinstance(value, dict):
            sanitized[key] = cls.sanitize_dict(value)
        elif isinstance(value, (list, tuple)):
            sanitized_list: list[Any] = []
            for item in value:
                if isinstance(item, str):
                    sanitized_list.append(cls.sanitize_message(item))
                elif isinstance(item, dict):
                    sanitized_list.append(cls.sanitize_dict(item))
                else:
                    sanitized_list.append(item)
            sanitized[key] = sanitized_list
        else:
            sanitized[key] = value

    return sanitized
sanitize_message(message) classmethod

Sanitize a log message by redacting sensitive information.

Parameters:

Name Type Description Default
message str

The log message to sanitize

required

Returns:

Type Description
str

Sanitized message with sensitive data redacted

Source code in spindlex/logging/sanitizer.py
@classmethod
def sanitize_message(cls, message: str) -> str:
    """
    Sanitize a log message by redacting sensitive information.

    Args:
        message: The log message to sanitize

    Returns:
        Sanitized message with sensitive data redacted
    """
    sanitized = message

    # Apply password/secret patterns using groups
    for pattern_type in ["password", "secret", "token", "key", "passphrase"]:
        pattern = cls.SENSITIVE_PATTERNS[pattern_type]

        def replace_with_group(match: re.Match[str], pt: str = pattern_type) -> str:
            return match.group(1) + cls.REPLACEMENTS[pt]

        sanitized = pattern.sub(replace_with_group, sanitized)

    # SSH key material
    sanitized = cls.SENSITIVE_PATTERNS["ssh_key"].sub(
        cls.REPLACEMENTS["ssh_key"], sanitized
    )

    # Private key blocks
    sanitized = cls.SENSITIVE_PATTERNS["private_key"].sub(
        cls.REPLACEMENTS["private_key"], sanitized
    )

    # IP addresses (keep first 3 octets)
    sanitized = cls.SENSITIVE_PATTERNS["ip"].sub(
        cls.REPLACEMENTS["ip_partial"], sanitized
    )

    # Internal hostnames
    sanitized = cls.SENSITIVE_PATTERNS["hostname"].sub(
        cls.REPLACEMENTS["hostname_partial"], sanitized
    )

    return sanitized

SanitizingFilter

Bases: Filter

Logging filter that redacts sensitive information from all log records.

Source code in spindlex/logging/sanitizer.py
class SanitizingFilter(logging.Filter):
    """Logging filter that redacts sensitive information from all log records."""

    def filter(self, record: logging.LogRecord) -> bool:
        record.msg = LogSanitizer.sanitize_message(str(record.msg))
        if record.args:
            if isinstance(record.args, dict):
                record.args = LogSanitizer.sanitize_dict(record.args)
            else:
                record.args = tuple(
                    LogSanitizer.sanitize_message(str(a)) if isinstance(a, str) else a
                    for a in record.args
                )
        return True

Functions:

configure_sanitizing_logging(logger_name='')

Install SanitizingFilter on a logger so that all records it handles (including those propagated from the spindlex logger) are sanitized.

Call once at application startup, typically with no arguments to install the filter on the root logger:

import spindlex
spindlex.configure_sanitizing_logging()

Parameters:

Name Type Description Default
logger_name str

Logger to protect (default "" = root logger).

''
Source code in spindlex/logging/sanitizer.py
def configure_sanitizing_logging(logger_name: str = "") -> None:
    """
    Install SanitizingFilter on a logger so that all records it handles
    (including those propagated from the ``spindlex`` logger) are sanitized.

    Call once at application startup, typically with no arguments to install
    the filter on the root logger:

        import spindlex
        spindlex.configure_sanitizing_logging()

    Args:
        logger_name: Logger to protect (default ``""`` = root logger).
    """
    target = logging.getLogger(logger_name)
    if not any(isinstance(f, SanitizingFilter) for f in target.filters):
        target.addFilter(SanitizingFilter())

Formatters and Handlers

spindlex.logging.formatters

Custom log formatters for SpindleX.

Classes

DebugFormatter

Bases: SSHFormatter

Detailed formatter for debugging with protocol information.

Source code in spindlex/logging/formatters.py
class DebugFormatter(SSHFormatter):
    """Detailed formatter for debugging with protocol information."""

    def __init__(self, sanitize: bool = False):
        """Initialize debug formatter with detailed format."""
        fmt = (
            "[%(asctime)s.%(msecs)03d] %(name)s.%(levelname)s "
            "[%(filename)s:%(lineno)d] %(funcName)s(): %(message)s"
        )
        datefmt = "%Y-%m-%d %H:%M:%S"
        super().__init__(fmt, datefmt, sanitize=sanitize)
Methods:
__init__(sanitize=False)

Initialize debug formatter with detailed format.

Source code in spindlex/logging/formatters.py
def __init__(self, sanitize: bool = False):
    """Initialize debug formatter with detailed format."""
    fmt = (
        "[%(asctime)s.%(msecs)03d] %(name)s.%(levelname)s "
        "[%(filename)s:%(lineno)d] %(funcName)s(): %(message)s"
    )
    datefmt = "%Y-%m-%d %H:%M:%S"
    super().__init__(fmt, datefmt, sanitize=sanitize)

JSONFormatter

Bases: Formatter

JSON log formatter for structured logging.

Source code in spindlex/logging/formatters.py
class JSONFormatter(logging.Formatter):
    """JSON log formatter for structured logging."""

    def __init__(self, sanitize: bool = True, include_extra: bool = True):
        """
        Initialize JSON formatter.

        Args:
            sanitize: Whether to sanitize sensitive information
            include_extra: Whether to include extra fields from LogRecord
        """
        super().__init__()
        self.sanitize = sanitize
        self.include_extra = include_extra

    def format(self, record: logging.LogRecord) -> str:
        """Format log record as JSON."""
        log_data = {
            "timestamp": time.time(),
            "level": record.levelname,
            "logger": record.name,
            "message": (
                str(record.msg) % record.args if record.args else str(record.msg)
            ),
            "module": record.module,
            "function": record.funcName,
            "line": record.lineno,
        }

        # Add exception info if present
        if record.exc_info:
            log_data["exception"] = self.formatException(record.exc_info)

        # Add extra fields
        if self.include_extra:
            extra_fields = {}
            for key, value in record.__dict__.items():
                if key not in {
                    "name",
                    "msg",
                    "args",
                    "levelname",
                    "levelno",
                    "pathname",
                    "filename",
                    "module",
                    "exc_info",
                    "exc_text",
                    "stack_info",
                    "lineno",
                    "funcName",
                    "created",
                    "msecs",
                    "relativeCreated",
                    "thread",
                    "threadName",
                    "processName",
                    "process",
                    "getMessage",
                }:
                    extra_fields[key] = value

            if extra_fields:
                log_data["extra"] = extra_fields

        # Sanitize if requested
        if self.sanitize:
            log_data = LogSanitizer.sanitize_dict(log_data)

        return json.dumps(log_data, default=str, separators=(",", ":"))
Methods:
__init__(sanitize=True, include_extra=True)

Initialize JSON formatter.

Parameters:

Name Type Description Default
sanitize bool

Whether to sanitize sensitive information

True
include_extra bool

Whether to include extra fields from LogRecord

True
Source code in spindlex/logging/formatters.py
def __init__(self, sanitize: bool = True, include_extra: bool = True):
    """
    Initialize JSON formatter.

    Args:
        sanitize: Whether to sanitize sensitive information
        include_extra: Whether to include extra fields from LogRecord
    """
    super().__init__()
    self.sanitize = sanitize
    self.include_extra = include_extra
format(record)

Format log record as JSON.

Source code in spindlex/logging/formatters.py
def format(self, record: logging.LogRecord) -> str:
    """Format log record as JSON."""
    log_data = {
        "timestamp": time.time(),
        "level": record.levelname,
        "logger": record.name,
        "message": (
            str(record.msg) % record.args if record.args else str(record.msg)
        ),
        "module": record.module,
        "function": record.funcName,
        "line": record.lineno,
    }

    # Add exception info if present
    if record.exc_info:
        log_data["exception"] = self.formatException(record.exc_info)

    # Add extra fields
    if self.include_extra:
        extra_fields = {}
        for key, value in record.__dict__.items():
            if key not in {
                "name",
                "msg",
                "args",
                "levelname",
                "levelno",
                "pathname",
                "filename",
                "module",
                "exc_info",
                "exc_text",
                "stack_info",
                "lineno",
                "funcName",
                "created",
                "msecs",
                "relativeCreated",
                "thread",
                "threadName",
                "processName",
                "process",
                "getMessage",
            }:
                extra_fields[key] = value

        if extra_fields:
            log_data["extra"] = extra_fields

    # Sanitize if requested
    if self.sanitize:
        log_data = LogSanitizer.sanitize_dict(log_data)

    return json.dumps(log_data, default=str, separators=(",", ":"))

SSHFormatter

Bases: Formatter

Standard SpindleX log formatter with security sanitization.

Source code in spindlex/logging/formatters.py
class SSHFormatter(logging.Formatter):
    """Standard SpindleX log formatter with security sanitization."""

    def __init__(
        self,
        fmt: Optional[str] = None,
        datefmt: Optional[str] = None,
        sanitize: bool = True,
    ):
        """
        Initialize SSH formatter.

        Args:
            fmt: Log format string
            datefmt: Date format string
            sanitize: Whether to sanitize sensitive information
        """
        if fmt is None:
            fmt = "[%(asctime)s] %(name)s.%(levelname)s: %(message)s"
        if datefmt is None:
            datefmt = "%Y-%m-%d %H:%M:%S"

        super().__init__(fmt, datefmt)
        self.sanitize = sanitize

    def format(self, record: logging.LogRecord) -> str:
        """Format log record. Sanitization is handled by SanitizingFilter."""
        return super().format(record)
Methods:
__init__(fmt=None, datefmt=None, sanitize=True)

Initialize SSH formatter.

Parameters:

Name Type Description Default
fmt Optional[str]

Log format string

None
datefmt Optional[str]

Date format string

None
sanitize bool

Whether to sanitize sensitive information

True
Source code in spindlex/logging/formatters.py
def __init__(
    self,
    fmt: Optional[str] = None,
    datefmt: Optional[str] = None,
    sanitize: bool = True,
):
    """
    Initialize SSH formatter.

    Args:
        fmt: Log format string
        datefmt: Date format string
        sanitize: Whether to sanitize sensitive information
    """
    if fmt is None:
        fmt = "[%(asctime)s] %(name)s.%(levelname)s: %(message)s"
    if datefmt is None:
        datefmt = "%Y-%m-%d %H:%M:%S"

    super().__init__(fmt, datefmt)
    self.sanitize = sanitize
format(record)

Format log record. Sanitization is handled by SanitizingFilter.

Source code in spindlex/logging/formatters.py
def format(self, record: logging.LogRecord) -> str:
    """Format log record. Sanitization is handled by SanitizingFilter."""
    return super().format(record)

SecurityFormatter

Bases: SSHFormatter

Enhanced formatter for security-related events.

Source code in spindlex/logging/formatters.py
class SecurityFormatter(SSHFormatter):
    """Enhanced formatter for security-related events."""

    def __init__(self, sanitize: bool = True):
        """Initialize security formatter with enhanced format."""
        fmt = "[%(asctime)s] SECURITY.%(levelname)s [%(client_ip)s]: %(message)s"
        super().__init__(fmt, sanitize=sanitize)

    def format(self, record: logging.LogRecord) -> str:
        """Format security log record with client IP if available."""
        # Ensure client_ip field exists
        if not hasattr(record, "client_ip"):
            record.client_ip = "unknown"

        return super().format(record)
Methods:
__init__(sanitize=True)

Initialize security formatter with enhanced format.

Source code in spindlex/logging/formatters.py
def __init__(self, sanitize: bool = True):
    """Initialize security formatter with enhanced format."""
    fmt = "[%(asctime)s] SECURITY.%(levelname)s [%(client_ip)s]: %(message)s"
    super().__init__(fmt, sanitize=sanitize)
format(record)

Format security log record with client IP if available.

Source code in spindlex/logging/formatters.py
def format(self, record: logging.LogRecord) -> str:
    """Format security log record with client IP if available."""
    # Ensure client_ip field exists
    if not hasattr(record, "client_ip"):
        record.client_ip = "unknown"

    return super().format(record)

spindlex.logging.handlers

Custom log handlers for SpindleX.

Classes

PerformanceHandler

Bases: Handler

Handler for performance metrics and timing information.

Source code in spindlex/logging/handlers.py
class PerformanceHandler(logging.Handler):
    """Handler for performance metrics and timing information."""

    def __init__(self, filename: Optional[str] = None, json_format: bool = True):
        """
        Initialize performance handler.

        Args:
            filename: Log file path (if None, logs to console)
            json_format: Whether to use JSON formatting
        """
        super().__init__()

        self.filename = filename
        self.json_format = json_format

        if json_format:
            from .formatters import JSONFormatter

            formatter: logging.Formatter = JSONFormatter(sanitize=False)
        else:
            from .formatters import SSHFormatter

            formatter = SSHFormatter(
                fmt="[%(asctime)s] PERF.%(levelname)s: %(message)s", sanitize=False
            )

        if filename:
            # Ensure directory exists
            dirname = os.path.dirname(filename)
            if dirname:
                os.makedirs(dirname, exist_ok=True)

            self.file_handler = logging.FileHandler(filename)
            self.file_handler.setFormatter(formatter)
        else:
            self.console_handler = logging.StreamHandler()
            self.console_handler.setFormatter(formatter)

    def emit(self, record: logging.LogRecord) -> None:
        """Emit performance log record."""
        try:
            if self.filename:
                self.file_handler.emit(record)
            else:
                self.console_handler.emit(record)
        except Exception:
            self.handleError(record)

    def close(self) -> None:
        """Close the handler."""
        if hasattr(self, "file_handler"):
            self.file_handler.close()
        elif hasattr(self, "console_handler"):
            self.console_handler.close()
        super().close()
Methods:
__init__(filename=None, json_format=True)

Initialize performance handler.

Parameters:

Name Type Description Default
filename Optional[str]

Log file path (if None, logs to console)

None
json_format bool

Whether to use JSON formatting

True
Source code in spindlex/logging/handlers.py
def __init__(self, filename: Optional[str] = None, json_format: bool = True):
    """
    Initialize performance handler.

    Args:
        filename: Log file path (if None, logs to console)
        json_format: Whether to use JSON formatting
    """
    super().__init__()

    self.filename = filename
    self.json_format = json_format

    if json_format:
        from .formatters import JSONFormatter

        formatter: logging.Formatter = JSONFormatter(sanitize=False)
    else:
        from .formatters import SSHFormatter

        formatter = SSHFormatter(
            fmt="[%(asctime)s] PERF.%(levelname)s: %(message)s", sanitize=False
        )

    if filename:
        # Ensure directory exists
        dirname = os.path.dirname(filename)
        if dirname:
            os.makedirs(dirname, exist_ok=True)

        self.file_handler = logging.FileHandler(filename)
        self.file_handler.setFormatter(formatter)
    else:
        self.console_handler = logging.StreamHandler()
        self.console_handler.setFormatter(formatter)
close()

Close the handler.

Source code in spindlex/logging/handlers.py
def close(self) -> None:
    """Close the handler."""
    if hasattr(self, "file_handler"):
        self.file_handler.close()
    elif hasattr(self, "console_handler"):
        self.console_handler.close()
    super().close()
emit(record)

Emit performance log record.

Source code in spindlex/logging/handlers.py
def emit(self, record: logging.LogRecord) -> None:
    """Emit performance log record."""
    try:
        if self.filename:
            self.file_handler.emit(record)
        else:
            self.console_handler.emit(record)
    except Exception:
        self.handleError(record)

SecurityHandler

Bases: Handler

Handler for security-related events with special formatting.

Source code in spindlex/logging/handlers.py
class SecurityHandler(logging.Handler):
    """Handler for security-related events with special formatting."""

    def __init__(
        self,
        filename: Optional[str] = None,
        max_bytes: int = 10 * 1024 * 1024,  # 10MB
        backup_count: int = 5,
    ):
        """
        Initialize security handler.

        Args:
            filename: Log file path (if None, logs to console)
            max_bytes: Maximum file size before rotation
            backup_count: Number of backup files to keep
        """
        super().__init__()
        self.addFilter(SanitizingFilter())

        self.filename = filename

        if filename:
            # Ensure directory exists
            dirname = os.path.dirname(filename)
            if dirname:
                os.makedirs(dirname, exist_ok=True)

            # Use rotating file handler
            self.file_handler = logging.handlers.RotatingFileHandler(
                filename, maxBytes=max_bytes, backupCount=backup_count
            )
            self.file_handler.setFormatter(SecurityFormatter())
        else:
            # Use console handler
            self.console_handler = logging.StreamHandler()
            self.console_handler.setFormatter(SecurityFormatter())

    def emit(self, record: logging.LogRecord) -> None:
        """Emit log record to appropriate handler."""
        try:
            if self.filename:
                self.file_handler.emit(record)
            else:
                self.console_handler.emit(record)
        except Exception:
            self.handleError(record)

    def close(self) -> None:
        """Close the handler."""
        if hasattr(self, "file_handler"):
            self.file_handler.close()
        elif hasattr(self, "console_handler"):
            self.console_handler.close()
        super().close()
Methods:
__init__(filename=None, max_bytes=10 * 1024 * 1024, backup_count=5)

Initialize security handler.

Parameters:

Name Type Description Default
filename Optional[str]

Log file path (if None, logs to console)

None
max_bytes int

Maximum file size before rotation

10 * 1024 * 1024
backup_count int

Number of backup files to keep

5
Source code in spindlex/logging/handlers.py
def __init__(
    self,
    filename: Optional[str] = None,
    max_bytes: int = 10 * 1024 * 1024,  # 10MB
    backup_count: int = 5,
):
    """
    Initialize security handler.

    Args:
        filename: Log file path (if None, logs to console)
        max_bytes: Maximum file size before rotation
        backup_count: Number of backup files to keep
    """
    super().__init__()
    self.addFilter(SanitizingFilter())

    self.filename = filename

    if filename:
        # Ensure directory exists
        dirname = os.path.dirname(filename)
        if dirname:
            os.makedirs(dirname, exist_ok=True)

        # Use rotating file handler
        self.file_handler = logging.handlers.RotatingFileHandler(
            filename, maxBytes=max_bytes, backupCount=backup_count
        )
        self.file_handler.setFormatter(SecurityFormatter())
    else:
        # Use console handler
        self.console_handler = logging.StreamHandler()
        self.console_handler.setFormatter(SecurityFormatter())
close()

Close the handler.

Source code in spindlex/logging/handlers.py
def close(self) -> None:
    """Close the handler."""
    if hasattr(self, "file_handler"):
        self.file_handler.close()
    elif hasattr(self, "console_handler"):
        self.console_handler.close()
    super().close()
emit(record)

Emit log record to appropriate handler.

Source code in spindlex/logging/handlers.py
def emit(self, record: logging.LogRecord) -> None:
    """Emit log record to appropriate handler."""
    try:
        if self.filename:
            self.file_handler.emit(record)
        else:
            self.console_handler.emit(record)
    except Exception:
        self.handleError(record)

Performance Monitoring

spindlex.logging.monitoring

Performance monitoring and metrics collection for SpindleX.

Classes

ConnectionMetrics dataclass

Metrics for SSH connection operations.

Source code in spindlex/logging/monitoring.py
@dataclass
class ConnectionMetrics:
    """Metrics for SSH connection operations."""

    connection_time: Optional[float] = None
    handshake_time: Optional[float] = None
    auth_time: Optional[float] = None
    kex_time: Optional[float] = None
    bytes_sent: int = 0
    bytes_received: int = 0
    packets_sent: int = 0
    packets_received: int = 0
    channels_opened: int = 0
    channels_closed: int = 0
    errors: int = 0

CryptoTimer

Specialized timer for cryptographic operations.

Source code in spindlex/logging/monitoring.py
class CryptoTimer:
    """Specialized timer for cryptographic operations."""

    def __init__(self, monitor: Optional[PerformanceMonitor] = None):
        """
        Initialize crypto timer.

        Args:
            monitor: Performance monitor instance (uses global if None)
        """
        self.monitor = monitor or get_performance_monitor()
        self.logger = get_logger("spindlex.crypto.timing")

    @contextmanager
    def time_crypto_operation(
        self,
        operation: str,
        algorithm: str,
        key_size: Optional[int] = None,
        **metadata: Any,
    ) -> Iterator[None]:
        """
        Time a cryptographic operation.

        Args:
            operation: Type of crypto operation (encrypt, decrypt, sign, verify, etc.)
            algorithm: Cryptographic algorithm name
            key_size: Key size in bits (optional)
            **metadata: Additional metadata
        """
        crypto_metadata = {"algorithm": algorithm, "category": "crypto", **metadata}

        if key_size:
            crypto_metadata["key_size"] = key_size

        with self.monitor.time_operation(f"crypto_{operation}", **crypto_metadata):
            yield

    def time_key_generation(
        self, algorithm: str, key_size: int
    ) -> ContextManager[None]:
        """Time key generation operation."""
        return self.time_crypto_operation("keygen", algorithm, key_size)

    def time_key_exchange(self, algorithm: str) -> ContextManager[None]:
        """Time key exchange operation."""
        return self.time_crypto_operation("kex", algorithm)

    def time_encryption(self, cipher: str, data_size: int) -> ContextManager[None]:
        """Time encryption operation."""
        return self.time_crypto_operation("encrypt", cipher, data_size=data_size)

    def time_decryption(self, cipher: str, data_size: int) -> ContextManager[None]:
        """Time decryption operation."""
        return self.time_crypto_operation("decrypt", cipher, data_size=data_size)

    def time_signature(self, algorithm: str, key_size: int) -> ContextManager[None]:
        """Time signature operation."""
        return self.time_crypto_operation("sign", algorithm, key_size)

    def time_verification(self, algorithm: str, key_size: int) -> ContextManager[None]:
        """Time signature verification operation."""
        return self.time_crypto_operation("verify", algorithm, key_size)
Methods:
__init__(monitor=None)

Initialize crypto timer.

Parameters:

Name Type Description Default
monitor Optional[PerformanceMonitor]

Performance monitor instance (uses global if None)

None
Source code in spindlex/logging/monitoring.py
def __init__(self, monitor: Optional[PerformanceMonitor] = None):
    """
    Initialize crypto timer.

    Args:
        monitor: Performance monitor instance (uses global if None)
    """
    self.monitor = monitor or get_performance_monitor()
    self.logger = get_logger("spindlex.crypto.timing")
time_crypto_operation(operation, algorithm, key_size=None, **metadata)

Time a cryptographic operation.

Parameters:

Name Type Description Default
operation str

Type of crypto operation (encrypt, decrypt, sign, verify, etc.)

required
algorithm str

Cryptographic algorithm name

required
key_size Optional[int]

Key size in bits (optional)

None
**metadata Any

Additional metadata

{}
Source code in spindlex/logging/monitoring.py
@contextmanager
def time_crypto_operation(
    self,
    operation: str,
    algorithm: str,
    key_size: Optional[int] = None,
    **metadata: Any,
) -> Iterator[None]:
    """
    Time a cryptographic operation.

    Args:
        operation: Type of crypto operation (encrypt, decrypt, sign, verify, etc.)
        algorithm: Cryptographic algorithm name
        key_size: Key size in bits (optional)
        **metadata: Additional metadata
    """
    crypto_metadata = {"algorithm": algorithm, "category": "crypto", **metadata}

    if key_size:
        crypto_metadata["key_size"] = key_size

    with self.monitor.time_operation(f"crypto_{operation}", **crypto_metadata):
        yield
time_decryption(cipher, data_size)

Time decryption operation.

Source code in spindlex/logging/monitoring.py
def time_decryption(self, cipher: str, data_size: int) -> ContextManager[None]:
    """Time decryption operation."""
    return self.time_crypto_operation("decrypt", cipher, data_size=data_size)
time_encryption(cipher, data_size)

Time encryption operation.

Source code in spindlex/logging/monitoring.py
def time_encryption(self, cipher: str, data_size: int) -> ContextManager[None]:
    """Time encryption operation."""
    return self.time_crypto_operation("encrypt", cipher, data_size=data_size)
time_key_exchange(algorithm)

Time key exchange operation.

Source code in spindlex/logging/monitoring.py
def time_key_exchange(self, algorithm: str) -> ContextManager[None]:
    """Time key exchange operation."""
    return self.time_crypto_operation("kex", algorithm)
time_key_generation(algorithm, key_size)

Time key generation operation.

Source code in spindlex/logging/monitoring.py
def time_key_generation(
    self, algorithm: str, key_size: int
) -> ContextManager[None]:
    """Time key generation operation."""
    return self.time_crypto_operation("keygen", algorithm, key_size)
time_signature(algorithm, key_size)

Time signature operation.

Source code in spindlex/logging/monitoring.py
def time_signature(self, algorithm: str, key_size: int) -> ContextManager[None]:
    """Time signature operation."""
    return self.time_crypto_operation("sign", algorithm, key_size)
time_verification(algorithm, key_size)

Time signature verification operation.

Source code in spindlex/logging/monitoring.py
def time_verification(self, algorithm: str, key_size: int) -> ContextManager[None]:
    """Time signature verification operation."""
    return self.time_crypto_operation("verify", algorithm, key_size)

PerformanceMetric dataclass

Container for performance metric data.

Source code in spindlex/logging/monitoring.py
@dataclass
class PerformanceMetric:
    """Container for performance metric data."""

    operation: str
    duration: float
    timestamp: float
    metadata: dict[str, Any] = field(default_factory=dict)

PerformanceMonitor

Performance monitoring and metrics collection system.

Source code in spindlex/logging/monitoring.py
class PerformanceMonitor:
    """Performance monitoring and metrics collection system."""

    def __init__(self, max_metrics: int = 10000):
        """
        Initialize performance monitor.

        Args:
            max_metrics: Maximum number of metrics to keep in memory
        """
        self.max_metrics = max_metrics
        self.metrics: deque[PerformanceMetric] = deque(maxlen=max_metrics)
        self.connection_metrics: dict[str, ConnectionMetrics] = {}
        self.operation_stats: dict[str, list[float]] = defaultdict(list)
        self._lock = threading.RLock()
        self.logger = get_logger("spindlex.monitoring")

    def record_metric(self, operation: str, duration: float, **metadata: Any) -> None:
        """
        Record a performance metric.

        Args:
            operation: Operation name
            duration: Operation duration in seconds
            **metadata: Additional metric metadata
        """
        metric = PerformanceMetric(
            operation=operation,
            duration=duration,
            timestamp=time.time(),
            metadata=metadata,
        )

        with self._lock:
            self.metrics.append(metric)
            self.operation_stats[operation].append(duration)

            # Keep only recent stats to prevent memory growth
            if len(self.operation_stats[operation]) > 1000:
                self.operation_stats[operation] = self.operation_stats[operation][-500:]

        # Log performance metric
        self.logger.performance_metric(operation, duration, **metadata)

    @contextmanager
    def time_operation(self, operation: str, **metadata: Any) -> Iterator[None]:
        """
        Context manager for timing operations.

        Args:
            operation: Operation name
            **metadata: Additional metadata to record
        """
        start_time = time.perf_counter()
        try:
            yield
        finally:
            duration = time.perf_counter() - start_time
            self.record_metric(operation, duration, **metadata)

    def get_connection_metrics(self, connection_id: str) -> ConnectionMetrics:
        """
        Get metrics for a specific connection.

        Args:
            connection_id: Unique connection identifier

        Returns:
            ConnectionMetrics instance
        """
        with self._lock:
            if connection_id not in self.connection_metrics:
                self.connection_metrics[connection_id] = ConnectionMetrics()
            return self.connection_metrics[connection_id]

    def update_connection_metric(
        self, connection_id: str, metric_name: str, value: Union[float, int, str]
    ) -> None:
        """
        Update a specific connection metric.

        Args:
            connection_id: Connection identifier
            metric_name: Name of metric to update
            value: New metric value
        """
        metrics = self.get_connection_metrics(connection_id)
        setattr(metrics, metric_name, value)

    def increment_connection_counter(
        self, connection_id: str, counter_name: str, amount: int = 1
    ) -> None:
        """
        Increment a connection counter metric.

        Args:
            connection_id: Connection identifier
            counter_name: Name of counter to increment
            amount: Amount to increment by
        """
        metrics = self.get_connection_metrics(connection_id)
        current_value = getattr(metrics, counter_name, 0)
        setattr(metrics, counter_name, current_value + amount)

    def get_operation_stats(self, operation: str) -> dict[str, float]:
        """
        Get statistical summary for an operation.

        Args:
            operation: Operation name

        Returns:
            Dictionary with min, max, mean, median statistics
        """
        with self._lock:
            durations = self.operation_stats.get(operation, [])

            if not durations:
                return {}

            sorted_durations = sorted(durations)
            count = len(sorted_durations)

            return {
                "count": count,
                "min": min(sorted_durations),
                "max": max(sorted_durations),
                "mean": sum(sorted_durations) / count,
                "median": sorted_durations[count // 2],
                "p95": (
                    sorted_durations[int(count * 0.95)]
                    if count > 20
                    else sorted_durations[-1]
                ),
                "p99": (
                    sorted_durations[int(count * 0.99)]
                    if count > 100
                    else sorted_durations[-1]
                ),
            }

    def get_recent_metrics(
        self, operation: Optional[str] = None, limit: int = 100
    ) -> list[PerformanceMetric]:
        """
        Get recent performance metrics.

        Args:
            operation: Filter by operation name (optional)
            limit: Maximum number of metrics to return

        Returns:
            List of recent metrics
        """
        with self._lock:
            metrics = list(self.metrics)

            if operation:
                metrics = [m for m in metrics if m.operation == operation]

            return metrics[-limit:]

    def clear_metrics(self, connection_id: Optional[str] = None) -> None:
        """
        Clear stored metrics.

        Args:
            connection_id: Clear metrics for specific connection (optional)
        """
        with self._lock:
            if connection_id:
                self.connection_metrics.pop(connection_id, None)
            else:
                self.metrics.clear()
                self.connection_metrics.clear()
                self.operation_stats.clear()

    def print_summary(self) -> None:
        """Print a human-readable summary of all recorded performance metrics."""
        with self._lock:
            print("\n" + "=" * 60)
            print(f"{'SpindleX Performance Summary':^60}")
            print("=" * 60)

            if not self.operation_stats and not self.connection_metrics:
                print(f"{'No metrics recorded yet':^60}")
                print("=" * 60 + "\n")
                return

            if self.operation_stats:
                print(f"\n{'Operation Statistics':^60}")
                print("-" * 60)
                print(
                    f"{'Operation':<20} | {'Count':>6} | {'Mean (ms)':>10} | {'p95 (ms)':>10}"
                )
                print("-" * 60)

                for op in sorted(self.operation_stats.keys()):
                    stats = self.get_operation_stats(op)
                    if stats:
                        mean_ms = stats["mean"] * 1000
                        p95_ms = stats["p95"] * 1000
                        print(
                            f"{op:<20} | {stats['count']:>6} | {mean_ms:>10.2f} | {p95_ms:>10.2f}"
                        )

            if self.connection_metrics:
                print(f"\n{'Connection Statistics':^60}")
                print("-" * 60)
                print(
                    f"{'Conn ID':<10} | {'Sent (pkts)':>12} | {'Recv (pkts)':>12} | {'Errors':>6}"
                )
                print("-" * 60)

                for conn_id, metrics in self.connection_metrics.items():
                    print(
                        f"{conn_id[:10]:<10} | {metrics.packets_sent:>12} | "
                        f"{metrics.packets_received:>12} | {metrics.errors:>6}"
                    )

            print("=" * 60 + "\n")
Methods:
__init__(max_metrics=10000)

Initialize performance monitor.

Parameters:

Name Type Description Default
max_metrics int

Maximum number of metrics to keep in memory

10000
Source code in spindlex/logging/monitoring.py
def __init__(self, max_metrics: int = 10000):
    """
    Initialize performance monitor.

    Args:
        max_metrics: Maximum number of metrics to keep in memory
    """
    self.max_metrics = max_metrics
    self.metrics: deque[PerformanceMetric] = deque(maxlen=max_metrics)
    self.connection_metrics: dict[str, ConnectionMetrics] = {}
    self.operation_stats: dict[str, list[float]] = defaultdict(list)
    self._lock = threading.RLock()
    self.logger = get_logger("spindlex.monitoring")
clear_metrics(connection_id=None)

Clear stored metrics.

Parameters:

Name Type Description Default
connection_id Optional[str]

Clear metrics for specific connection (optional)

None
Source code in spindlex/logging/monitoring.py
def clear_metrics(self, connection_id: Optional[str] = None) -> None:
    """
    Clear stored metrics.

    Args:
        connection_id: Clear metrics for specific connection (optional)
    """
    with self._lock:
        if connection_id:
            self.connection_metrics.pop(connection_id, None)
        else:
            self.metrics.clear()
            self.connection_metrics.clear()
            self.operation_stats.clear()
get_connection_metrics(connection_id)

Get metrics for a specific connection.

Parameters:

Name Type Description Default
connection_id str

Unique connection identifier

required

Returns:

Type Description
ConnectionMetrics

ConnectionMetrics instance

Source code in spindlex/logging/monitoring.py
def get_connection_metrics(self, connection_id: str) -> ConnectionMetrics:
    """
    Get metrics for a specific connection.

    Args:
        connection_id: Unique connection identifier

    Returns:
        ConnectionMetrics instance
    """
    with self._lock:
        if connection_id not in self.connection_metrics:
            self.connection_metrics[connection_id] = ConnectionMetrics()
        return self.connection_metrics[connection_id]
get_operation_stats(operation)

Get statistical summary for an operation.

Parameters:

Name Type Description Default
operation str

Operation name

required

Returns:

Type Description
dict[str, float]

Dictionary with min, max, mean, median statistics

Source code in spindlex/logging/monitoring.py
def get_operation_stats(self, operation: str) -> dict[str, float]:
    """
    Get statistical summary for an operation.

    Args:
        operation: Operation name

    Returns:
        Dictionary with min, max, mean, median statistics
    """
    with self._lock:
        durations = self.operation_stats.get(operation, [])

        if not durations:
            return {}

        sorted_durations = sorted(durations)
        count = len(sorted_durations)

        return {
            "count": count,
            "min": min(sorted_durations),
            "max": max(sorted_durations),
            "mean": sum(sorted_durations) / count,
            "median": sorted_durations[count // 2],
            "p95": (
                sorted_durations[int(count * 0.95)]
                if count > 20
                else sorted_durations[-1]
            ),
            "p99": (
                sorted_durations[int(count * 0.99)]
                if count > 100
                else sorted_durations[-1]
            ),
        }
get_recent_metrics(operation=None, limit=100)

Get recent performance metrics.

Parameters:

Name Type Description Default
operation Optional[str]

Filter by operation name (optional)

None
limit int

Maximum number of metrics to return

100

Returns:

Type Description
list[PerformanceMetric]

List of recent metrics

Source code in spindlex/logging/monitoring.py
def get_recent_metrics(
    self, operation: Optional[str] = None, limit: int = 100
) -> list[PerformanceMetric]:
    """
    Get recent performance metrics.

    Args:
        operation: Filter by operation name (optional)
        limit: Maximum number of metrics to return

    Returns:
        List of recent metrics
    """
    with self._lock:
        metrics = list(self.metrics)

        if operation:
            metrics = [m for m in metrics if m.operation == operation]

        return metrics[-limit:]
increment_connection_counter(connection_id, counter_name, amount=1)

Increment a connection counter metric.

Parameters:

Name Type Description Default
connection_id str

Connection identifier

required
counter_name str

Name of counter to increment

required
amount int

Amount to increment by

1
Source code in spindlex/logging/monitoring.py
def increment_connection_counter(
    self, connection_id: str, counter_name: str, amount: int = 1
) -> None:
    """
    Increment a connection counter metric.

    Args:
        connection_id: Connection identifier
        counter_name: Name of counter to increment
        amount: Amount to increment by
    """
    metrics = self.get_connection_metrics(connection_id)
    current_value = getattr(metrics, counter_name, 0)
    setattr(metrics, counter_name, current_value + amount)
print_summary()

Print a human-readable summary of all recorded performance metrics.

Source code in spindlex/logging/monitoring.py
def print_summary(self) -> None:
    """Print a human-readable summary of all recorded performance metrics."""
    with self._lock:
        print("\n" + "=" * 60)
        print(f"{'SpindleX Performance Summary':^60}")
        print("=" * 60)

        if not self.operation_stats and not self.connection_metrics:
            print(f"{'No metrics recorded yet':^60}")
            print("=" * 60 + "\n")
            return

        if self.operation_stats:
            print(f"\n{'Operation Statistics':^60}")
            print("-" * 60)
            print(
                f"{'Operation':<20} | {'Count':>6} | {'Mean (ms)':>10} | {'p95 (ms)':>10}"
            )
            print("-" * 60)

            for op in sorted(self.operation_stats.keys()):
                stats = self.get_operation_stats(op)
                if stats:
                    mean_ms = stats["mean"] * 1000
                    p95_ms = stats["p95"] * 1000
                    print(
                        f"{op:<20} | {stats['count']:>6} | {mean_ms:>10.2f} | {p95_ms:>10.2f}"
                    )

        if self.connection_metrics:
            print(f"\n{'Connection Statistics':^60}")
            print("-" * 60)
            print(
                f"{'Conn ID':<10} | {'Sent (pkts)':>12} | {'Recv (pkts)':>12} | {'Errors':>6}"
            )
            print("-" * 60)

            for conn_id, metrics in self.connection_metrics.items():
                print(
                    f"{conn_id[:10]:<10} | {metrics.packets_sent:>12} | "
                    f"{metrics.packets_received:>12} | {metrics.errors:>6}"
                )

        print("=" * 60 + "\n")
record_metric(operation, duration, **metadata)

Record a performance metric.

Parameters:

Name Type Description Default
operation str

Operation name

required
duration float

Operation duration in seconds

required
**metadata Any

Additional metric metadata

{}
Source code in spindlex/logging/monitoring.py
def record_metric(self, operation: str, duration: float, **metadata: Any) -> None:
    """
    Record a performance metric.

    Args:
        operation: Operation name
        duration: Operation duration in seconds
        **metadata: Additional metric metadata
    """
    metric = PerformanceMetric(
        operation=operation,
        duration=duration,
        timestamp=time.time(),
        metadata=metadata,
    )

    with self._lock:
        self.metrics.append(metric)
        self.operation_stats[operation].append(duration)

        # Keep only recent stats to prevent memory growth
        if len(self.operation_stats[operation]) > 1000:
            self.operation_stats[operation] = self.operation_stats[operation][-500:]

    # Log performance metric
    self.logger.performance_metric(operation, duration, **metadata)
time_operation(operation, **metadata)

Context manager for timing operations.

Parameters:

Name Type Description Default
operation str

Operation name

required
**metadata Any

Additional metadata to record

{}
Source code in spindlex/logging/monitoring.py
@contextmanager
def time_operation(self, operation: str, **metadata: Any) -> Iterator[None]:
    """
    Context manager for timing operations.

    Args:
        operation: Operation name
        **metadata: Additional metadata to record
    """
    start_time = time.perf_counter()
    try:
        yield
    finally:
        duration = time.perf_counter() - start_time
        self.record_metric(operation, duration, **metadata)
update_connection_metric(connection_id, metric_name, value)

Update a specific connection metric.

Parameters:

Name Type Description Default
connection_id str

Connection identifier

required
metric_name str

Name of metric to update

required
value Union[float, int, str]

New metric value

required
Source code in spindlex/logging/monitoring.py
def update_connection_metric(
    self, connection_id: str, metric_name: str, value: Union[float, int, str]
) -> None:
    """
    Update a specific connection metric.

    Args:
        connection_id: Connection identifier
        metric_name: Name of metric to update
        value: New metric value
    """
    metrics = self.get_connection_metrics(connection_id)
    setattr(metrics, metric_name, value)

ProtocolAnalyzer

Analyzer for SSH protocol debugging and performance analysis.

Source code in spindlex/logging/monitoring.py
class ProtocolAnalyzer:
    """Analyzer for SSH protocol debugging and performance analysis."""

    def __init__(self, monitor: Optional[PerformanceMonitor] = None):
        """
        Initialize protocol analyzer.

        Args:
            monitor: Performance monitor instance (uses global if None)
        """
        self.monitor = monitor or get_performance_monitor()
        self.logger = get_logger("spindlex.protocol.analyzer")
        self.message_counts: dict[str, int] = defaultdict(int)
        self.message_sizes: dict[str, list[int]] = defaultdict(list)
        self._lock = threading.RLock()

    def record_message(
        self, direction: str, message_type: str, size: int, connection_id: str
    ) -> None:
        """
        Record SSH protocol message for analysis.

        Args:
            direction: 'sent' or 'received'
            message_type: SSH message type
            size: Message size in bytes
            connection_id: Connection identifier
        """
        with self._lock:
            key = f"{direction}_{message_type}"
            self.message_counts[key] += 1
            self.message_sizes[key].append(size)

            # Keep size history bounded
            if len(self.message_sizes[key]) > 1000:
                self.message_sizes[key] = self.message_sizes[key][-500:]

        # Update connection metrics
        if direction == "sent":
            self.monitor.increment_connection_counter(connection_id, "packets_sent")
            self.monitor.increment_connection_counter(connection_id, "bytes_sent", size)
        else:
            self.monitor.increment_connection_counter(connection_id, "packets_received")
            self.monitor.increment_connection_counter(
                connection_id, "bytes_received", size
            )

        # Log protocol event for debugging
        self.logger.protocol_debug(
            direction=direction,
            message_type=message_type,
            data={"size": size, "connection_id": connection_id},
        )

    def get_message_stats(self) -> dict[str, dict[str, Any]]:
        """
        Get statistics for protocol messages.

        Returns:
            Dictionary with message statistics
        """
        with self._lock:
            stats = {}

            for message_key, count in self.message_counts.items():
                sizes = self.message_sizes[message_key]
                if sizes:
                    stats[message_key] = {
                        "count": count,
                        "total_bytes": sum(sizes),
                        "avg_size": sum(sizes) / len(sizes),
                        "min_size": min(sizes),
                        "max_size": max(sizes),
                    }
                else:
                    stats[message_key] = {"count": count}

            return stats

    def clear_stats(self) -> None:
        """Clear protocol analysis statistics."""
        with self._lock:
            self.message_counts.clear()
            self.message_sizes.clear()
Methods:
__init__(monitor=None)

Initialize protocol analyzer.

Parameters:

Name Type Description Default
monitor Optional[PerformanceMonitor]

Performance monitor instance (uses global if None)

None
Source code in spindlex/logging/monitoring.py
def __init__(self, monitor: Optional[PerformanceMonitor] = None):
    """
    Initialize protocol analyzer.

    Args:
        monitor: Performance monitor instance (uses global if None)
    """
    self.monitor = monitor or get_performance_monitor()
    self.logger = get_logger("spindlex.protocol.analyzer")
    self.message_counts: dict[str, int] = defaultdict(int)
    self.message_sizes: dict[str, list[int]] = defaultdict(list)
    self._lock = threading.RLock()
clear_stats()

Clear protocol analysis statistics.

Source code in spindlex/logging/monitoring.py
def clear_stats(self) -> None:
    """Clear protocol analysis statistics."""
    with self._lock:
        self.message_counts.clear()
        self.message_sizes.clear()
get_message_stats()

Get statistics for protocol messages.

Returns:

Type Description
dict[str, dict[str, Any]]

Dictionary with message statistics

Source code in spindlex/logging/monitoring.py
def get_message_stats(self) -> dict[str, dict[str, Any]]:
    """
    Get statistics for protocol messages.

    Returns:
        Dictionary with message statistics
    """
    with self._lock:
        stats = {}

        for message_key, count in self.message_counts.items():
            sizes = self.message_sizes[message_key]
            if sizes:
                stats[message_key] = {
                    "count": count,
                    "total_bytes": sum(sizes),
                    "avg_size": sum(sizes) / len(sizes),
                    "min_size": min(sizes),
                    "max_size": max(sizes),
                }
            else:
                stats[message_key] = {"count": count}

        return stats
record_message(direction, message_type, size, connection_id)

Record SSH protocol message for analysis.

Parameters:

Name Type Description Default
direction str

'sent' or 'received'

required
message_type str

SSH message type

required
size int

Message size in bytes

required
connection_id str

Connection identifier

required
Source code in spindlex/logging/monitoring.py
def record_message(
    self, direction: str, message_type: str, size: int, connection_id: str
) -> None:
    """
    Record SSH protocol message for analysis.

    Args:
        direction: 'sent' or 'received'
        message_type: SSH message type
        size: Message size in bytes
        connection_id: Connection identifier
    """
    with self._lock:
        key = f"{direction}_{message_type}"
        self.message_counts[key] += 1
        self.message_sizes[key].append(size)

        # Keep size history bounded
        if len(self.message_sizes[key]) > 1000:
            self.message_sizes[key] = self.message_sizes[key][-500:]

    # Update connection metrics
    if direction == "sent":
        self.monitor.increment_connection_counter(connection_id, "packets_sent")
        self.monitor.increment_connection_counter(connection_id, "bytes_sent", size)
    else:
        self.monitor.increment_connection_counter(connection_id, "packets_received")
        self.monitor.increment_connection_counter(
            connection_id, "bytes_received", size
        )

    # Log protocol event for debugging
    self.logger.protocol_debug(
        direction=direction,
        message_type=message_type,
        data={"size": size, "connection_id": connection_id},
    )

Functions:

get_performance_monitor()

Get the global performance monitor instance.

Source code in spindlex/logging/monitoring.py
def get_performance_monitor() -> PerformanceMonitor:
    """Get the global performance monitor instance."""
    return _performance_monitor

get_protocol_analyzer()

Get the global protocol analyzer instance.

Source code in spindlex/logging/monitoring.py
def get_protocol_analyzer() -> ProtocolAnalyzer:
    """Get the global protocol analyzer instance."""
    return _protocol_analyzer

timed_operation(operation_name, **metadata)

Decorator for timing function execution.

Parameters:

Name Type Description Default
operation_name str

Name of the operation being timed

required
**metadata Any

Additional metadata to record

{}
Source code in spindlex/logging/monitoring.py
def timed_operation(operation_name: str, **metadata: Any) -> Callable[..., Any]:
    """
    Decorator for timing function execution.

    Args:
        operation_name: Name of the operation being timed
        **metadata: Additional metadata to record
    """

    def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
        @wraps(func)
        def wrapper(*args: Any, **kwargs: Any) -> Any:
            with _performance_monitor.time_operation(operation_name, **metadata):
                return func(*args, **kwargs)

        return wrapper

    return decorator