Skip to content

Crypto API

spindlex.crypto

SSH Cryptography Module

Provides pluggable cryptographic backend abstraction with support for modern ciphers, key exchange algorithms, and cryptographic operations.

Classes

CipherSuite

SSH cipher suite implementation.

Manages cipher algorithms, key exchange methods, and MAC algorithms with preference for modern, secure cryptographic primitives.

Source code in spindlex/crypto/ciphers.py
class CipherSuite:
    """
    SSH cipher suite implementation.

    Manages cipher algorithms, key exchange methods, and MAC algorithms
    with preference for modern, secure cryptographic primitives.
    """

    # Negotiable key exchange algorithms (in preference order)
    # Do NOT include signaling tokens here - they are not real KEX algorithms
    # and must not appear in server-bound KEXINIT lists.
    KEX_ALGORITHMS = [
        "curve25519-sha256",
        "ecdh-sha2-nistp256",
        "ecdh-sha2-nistp384",
        "ecdh-sha2-nistp521",
        "diffie-hellman-group14-sha256",
    ]

    # Client-only signaling tokens appended to the client's KEXINIT kex list.
    # Servers MUST NOT include these; clients include them to advertise capabilities.
    KEX_SIGNAL_TOKENS = [
        "kex-strict-c-v00@openssh.com",
        "ext-info-c",
    ]

    # Supported host key algorithms (in preference order)
    HOST_KEY_ALGORITHMS = [
        "ssh-ed25519",
        "ecdsa-sha2-nistp256",
        "ecdsa-sha2-nistp384",
        "ecdsa-sha2-nistp521",
        "rsa-sha2-512",
        "rsa-sha2-256",
    ]

    # Supported encryption algorithms (in preference order)
    ENCRYPTION_ALGORITHMS = [
        "chacha20-poly1305@openssh.com",
        "aes256-ctr",
        "aes192-ctr",
        "aes128-ctr",
    ]

    # Supported MAC algorithms (in preference order)
    MAC_ALGORITHMS = [
        "hmac-sha2-256",
        "hmac-sha2-512",
    ]

    # Ciphers that bundle authentication (no separate MAC)
    AEAD_CIPHERS: frozenset[str] = frozenset(["chacha20-poly1305@openssh.com"])

    # Cipher key and IV lengths
    CIPHER_INFO = {
        "chacha20-poly1305@openssh.com": {"key_len": 64, "iv_len": 0},
        "aes256-ctr": {"key_len": 32, "iv_len": 16},
        "aes192-ctr": {"key_len": 24, "iv_len": 16},
        "aes128-ctr": {"key_len": 16, "iv_len": 16},
    }

    # MAC key lengths
    MAC_INFO = {
        "hmac-sha2-256": {"key_len": 32, "digest_len": 32},
        "hmac-sha2-512": {"key_len": 64, "digest_len": 64},
    }

    def __init__(self, crypto_backend: Optional[CryptoBackend] = None) -> None:
        """
        Initialize cipher suite with secure defaults.

        Args:
            crypto_backend: Cryptographic backend to use (defaults to CryptographyBackend)
        """
        self.crypto_backend = crypto_backend or default_crypto_backend
        self.negotiated_algorithms: dict[str, str] = {}

    def negotiate_algorithms(
        self,
        client_algorithms: dict[str, list[str]],
        server_algorithms: dict[str, list[str]],
    ) -> dict[str, str]:
        """
        Negotiate algorithms between client and server.

        Args:
            client_algorithms: Client's supported algorithms
            server_algorithms: Server's supported algorithms

        Returns:
            Dictionary of negotiated algorithms

        Raises:
            CryptoException: If no compatible algorithms found
        """
        negotiated = {}

        # Algorithm categories to negotiate
        categories = {
            "kex": ("kex_algorithms", self.KEX_ALGORITHMS),
            "server_host_key": ("server_host_key_algorithms", self.HOST_KEY_ALGORITHMS),
            "encryption_client_to_server": (
                "encryption_algorithms_client_to_server",
                self.ENCRYPTION_ALGORITHMS,
            ),
            "encryption_server_to_client": (
                "encryption_algorithms_server_to_client",
                self.ENCRYPTION_ALGORITHMS,
            ),
            "mac_client_to_server": (
                "mac_algorithms_client_to_server",
                self.MAC_ALGORITHMS,
            ),
            "mac_server_to_client": (
                "mac_algorithms_server_to_client",
                self.MAC_ALGORITHMS,
            ),
        }

        # Strict-KEX / extension markers must never be selected as the actual
        # KEX algorithm even if both peers advertise them. They are signaling
        # tokens, not key-exchange algorithms.
        kex_markers = {
            "ext-info-c",
            "ext-info-s",
            "kex-strict-c-v00@openssh.com",
            "kex-strict-s-v00@openssh.com",
        }

        for category, (key, preferred_list) in categories.items():
            client_list = client_algorithms.get(key, [])
            server_list = server_algorithms.get(key, [])

            # Find first mutually supported algorithm. RFC 4253 §7.1 says the
            # client's preference order wins; iterate the client list first
            # and check that the algorithm is also in our preferred set so we
            # never end up agreeing to something we cannot actually implement.
            preferred_set = set(preferred_list)
            server_set = set(server_list)
            if category == "kex":
                preferred_set -= kex_markers
                server_set -= kex_markers

            selected = None
            for algorithm in client_list:
                if algorithm in kex_markers and category == "kex":
                    continue
                if algorithm in preferred_set and algorithm in server_set:
                    selected = algorithm
                    break

            if selected is None:
                raise CryptoException(f"No compatible {category} algorithm found")

            negotiated[category] = selected

        self.negotiated_algorithms = negotiated
        return negotiated

    def get_cipher_info(self, algorithm: str) -> dict[str, Any]:
        """
        Get cipher information for specified algorithm.

        Args:
            algorithm: Cipher algorithm name

        Returns:
            Dictionary with key_len and iv_len properties

        Raises:
            CryptoException: If algorithm is unsupported
        """
        if algorithm not in self.CIPHER_INFO:
            raise CryptoException(f"Unsupported cipher algorithm: {algorithm}")
        return self.CIPHER_INFO[algorithm]

    def get_mac_info(self, algorithm: str) -> dict[str, int]:
        """
        Get MAC information for specified algorithm.

        Args:
            algorithm: MAC algorithm name

        Returns:
            Dictionary with key_len and digest_len properties

        Raises:
            CryptoException: If algorithm is unsupported
        """
        if algorithm not in self.MAC_INFO:
            raise CryptoException(f"Unsupported MAC algorithm: {algorithm}")
        return self.MAC_INFO[algorithm]
Methods:
__init__(crypto_backend=None)

Initialize cipher suite with secure defaults.

Parameters:

Name Type Description Default
crypto_backend Optional[CryptoBackend]

Cryptographic backend to use (defaults to CryptographyBackend)

None
Source code in spindlex/crypto/ciphers.py
def __init__(self, crypto_backend: Optional[CryptoBackend] = None) -> None:
    """
    Initialize cipher suite with secure defaults.

    Args:
        crypto_backend: Cryptographic backend to use (defaults to CryptographyBackend)
    """
    self.crypto_backend = crypto_backend or default_crypto_backend
    self.negotiated_algorithms: dict[str, str] = {}
get_cipher_info(algorithm)

Get cipher information for specified algorithm.

Parameters:

Name Type Description Default
algorithm str

Cipher algorithm name

required

Returns:

Type Description
dict[str, Any]

Dictionary with key_len and iv_len properties

Raises:

Type Description
CryptoException

If algorithm is unsupported

Source code in spindlex/crypto/ciphers.py
def get_cipher_info(self, algorithm: str) -> dict[str, Any]:
    """
    Get cipher information for specified algorithm.

    Args:
        algorithm: Cipher algorithm name

    Returns:
        Dictionary with key_len and iv_len properties

    Raises:
        CryptoException: If algorithm is unsupported
    """
    if algorithm not in self.CIPHER_INFO:
        raise CryptoException(f"Unsupported cipher algorithm: {algorithm}")
    return self.CIPHER_INFO[algorithm]
get_mac_info(algorithm)

Get MAC information for specified algorithm.

Parameters:

Name Type Description Default
algorithm str

MAC algorithm name

required

Returns:

Type Description
dict[str, int]

Dictionary with key_len and digest_len properties

Raises:

Type Description
CryptoException

If algorithm is unsupported

Source code in spindlex/crypto/ciphers.py
def get_mac_info(self, algorithm: str) -> dict[str, int]:
    """
    Get MAC information for specified algorithm.

    Args:
        algorithm: MAC algorithm name

    Returns:
        Dictionary with key_len and digest_len properties

    Raises:
        CryptoException: If algorithm is unsupported
    """
    if algorithm not in self.MAC_INFO:
        raise CryptoException(f"Unsupported MAC algorithm: {algorithm}")
    return self.MAC_INFO[algorithm]
negotiate_algorithms(client_algorithms, server_algorithms)

Negotiate algorithms between client and server.

Parameters:

Name Type Description Default
client_algorithms dict[str, list[str]]

Client's supported algorithms

required
server_algorithms dict[str, list[str]]

Server's supported algorithms

required

Returns:

Type Description
dict[str, str]

Dictionary of negotiated algorithms

Raises:

Type Description
CryptoException

If no compatible algorithms found

Source code in spindlex/crypto/ciphers.py
def negotiate_algorithms(
    self,
    client_algorithms: dict[str, list[str]],
    server_algorithms: dict[str, list[str]],
) -> dict[str, str]:
    """
    Negotiate algorithms between client and server.

    Args:
        client_algorithms: Client's supported algorithms
        server_algorithms: Server's supported algorithms

    Returns:
        Dictionary of negotiated algorithms

    Raises:
        CryptoException: If no compatible algorithms found
    """
    negotiated = {}

    # Algorithm categories to negotiate
    categories = {
        "kex": ("kex_algorithms", self.KEX_ALGORITHMS),
        "server_host_key": ("server_host_key_algorithms", self.HOST_KEY_ALGORITHMS),
        "encryption_client_to_server": (
            "encryption_algorithms_client_to_server",
            self.ENCRYPTION_ALGORITHMS,
        ),
        "encryption_server_to_client": (
            "encryption_algorithms_server_to_client",
            self.ENCRYPTION_ALGORITHMS,
        ),
        "mac_client_to_server": (
            "mac_algorithms_client_to_server",
            self.MAC_ALGORITHMS,
        ),
        "mac_server_to_client": (
            "mac_algorithms_server_to_client",
            self.MAC_ALGORITHMS,
        ),
    }

    # Strict-KEX / extension markers must never be selected as the actual
    # KEX algorithm even if both peers advertise them. They are signaling
    # tokens, not key-exchange algorithms.
    kex_markers = {
        "ext-info-c",
        "ext-info-s",
        "kex-strict-c-v00@openssh.com",
        "kex-strict-s-v00@openssh.com",
    }

    for category, (key, preferred_list) in categories.items():
        client_list = client_algorithms.get(key, [])
        server_list = server_algorithms.get(key, [])

        # Find first mutually supported algorithm. RFC 4253 §7.1 says the
        # client's preference order wins; iterate the client list first
        # and check that the algorithm is also in our preferred set so we
        # never end up agreeing to something we cannot actually implement.
        preferred_set = set(preferred_list)
        server_set = set(server_list)
        if category == "kex":
            preferred_set -= kex_markers
            server_set -= kex_markers

        selected = None
        for algorithm in client_list:
            if algorithm in kex_markers and category == "kex":
                continue
            if algorithm in preferred_set and algorithm in server_set:
                selected = algorithm
                break

        if selected is None:
            raise CryptoException(f"No compatible {category} algorithm found")

        negotiated[category] = selected

    self.negotiated_algorithms = negotiated
    return negotiated

CryptoBackend

Bases: Protocol

Cryptographic backend interface.

Defines the interface for pluggable cryptographic backends to support different crypto libraries and implementations.

Source code in spindlex/crypto/backend.py
class CryptoBackend(Protocol):
    """
    Cryptographic backend interface.

    Defines the interface for pluggable cryptographic backends
    to support different crypto libraries and implementations.
    """

    def generate_random(self, length: int) -> bytes:
        """Generate cryptographically secure random bytes."""
        ...

    def hash_data(self, algorithm: str, data: bytes) -> bytes:
        """Hash data using specified algorithm."""
        ...

    def encrypt(self, algorithm: str, key: bytes, iv: bytes, data: bytes) -> bytes:
        """Encrypt data using specified cipher."""
        ...

    def decrypt(self, algorithm: str, key: bytes, iv: bytes, data: bytes) -> bytes:
        """Decrypt data using specified cipher."""
        ...

    def decrypt_length(
        self, algorithm: str, key: bytes, iv: bytes, data: bytes
    ) -> bytes:
        """
        Decrypt only the length field for ciphers that encrypt it.
        """
        ...

    def create_cipher(self, algorithm: str, key: bytes, iv: bytes) -> Any:
        """Create cipher instance for streaming operations."""
        ...

    def compute_mac(self, algorithm: str, key: bytes, data: bytes) -> bytes:
        """Compute MAC using specified algorithm."""
        ...

    def derive_key(
        self,
        algorithm: str,
        shared_secret: bytes,
        exchange_hash: bytes,
        session_id: bytes,
        key_type: bytes,
        key_length: int,
    ) -> bytes:
        """Derive encryption/MAC keys from shared secret."""
        ...

    def chacha20_poly1305_encrypt(
        self, key_64: bytes, seq_num: int, length_bytes: bytes, body_bytes: bytes
    ) -> bytes:
        """Encrypt an SSH packet using chacha20-poly1305@openssh.com."""
        ...

    def chacha20_poly1305_decrypt_length(
        self, key_64: bytes, seq_num: int, enc_length: bytes
    ) -> bytes:
        """Decrypt the 4-byte length field using chacha20-poly1305@openssh.com."""
        ...

    def chacha20_poly1305_decrypt_body(
        self,
        key_64: bytes,
        seq_num: int,
        enc_length: bytes,
        enc_body: bytes,
        tag: bytes,
    ) -> bytes:
        """Verify Poly1305 tag and decrypt the packet body."""
        ...
Methods:
chacha20_poly1305_decrypt_body(key_64, seq_num, enc_length, enc_body, tag)

Verify Poly1305 tag and decrypt the packet body.

Source code in spindlex/crypto/backend.py
def chacha20_poly1305_decrypt_body(
    self,
    key_64: bytes,
    seq_num: int,
    enc_length: bytes,
    enc_body: bytes,
    tag: bytes,
) -> bytes:
    """Verify Poly1305 tag and decrypt the packet body."""
    ...
chacha20_poly1305_decrypt_length(key_64, seq_num, enc_length)

Decrypt the 4-byte length field using chacha20-poly1305@openssh.com.

Source code in spindlex/crypto/backend.py
def chacha20_poly1305_decrypt_length(
    self, key_64: bytes, seq_num: int, enc_length: bytes
) -> bytes:
    """Decrypt the 4-byte length field using chacha20-poly1305@openssh.com."""
    ...
chacha20_poly1305_encrypt(key_64, seq_num, length_bytes, body_bytes)

Encrypt an SSH packet using chacha20-poly1305@openssh.com.

Source code in spindlex/crypto/backend.py
def chacha20_poly1305_encrypt(
    self, key_64: bytes, seq_num: int, length_bytes: bytes, body_bytes: bytes
) -> bytes:
    """Encrypt an SSH packet using chacha20-poly1305@openssh.com."""
    ...
compute_mac(algorithm, key, data)

Compute MAC using specified algorithm.

Source code in spindlex/crypto/backend.py
def compute_mac(self, algorithm: str, key: bytes, data: bytes) -> bytes:
    """Compute MAC using specified algorithm."""
    ...
create_cipher(algorithm, key, iv)

Create cipher instance for streaming operations.

Source code in spindlex/crypto/backend.py
def create_cipher(self, algorithm: str, key: bytes, iv: bytes) -> Any:
    """Create cipher instance for streaming operations."""
    ...
decrypt(algorithm, key, iv, data)

Decrypt data using specified cipher.

Source code in spindlex/crypto/backend.py
def decrypt(self, algorithm: str, key: bytes, iv: bytes, data: bytes) -> bytes:
    """Decrypt data using specified cipher."""
    ...
decrypt_length(algorithm, key, iv, data)

Decrypt only the length field for ciphers that encrypt it.

Source code in spindlex/crypto/backend.py
def decrypt_length(
    self, algorithm: str, key: bytes, iv: bytes, data: bytes
) -> bytes:
    """
    Decrypt only the length field for ciphers that encrypt it.
    """
    ...
derive_key(algorithm, shared_secret, exchange_hash, session_id, key_type, key_length)

Derive encryption/MAC keys from shared secret.

Source code in spindlex/crypto/backend.py
def derive_key(
    self,
    algorithm: str,
    shared_secret: bytes,
    exchange_hash: bytes,
    session_id: bytes,
    key_type: bytes,
    key_length: int,
) -> bytes:
    """Derive encryption/MAC keys from shared secret."""
    ...
encrypt(algorithm, key, iv, data)

Encrypt data using specified cipher.

Source code in spindlex/crypto/backend.py
def encrypt(self, algorithm: str, key: bytes, iv: bytes, data: bytes) -> bytes:
    """Encrypt data using specified cipher."""
    ...
generate_random(length)

Generate cryptographically secure random bytes.

Source code in spindlex/crypto/backend.py
def generate_random(self, length: int) -> bytes:
    """Generate cryptographically secure random bytes."""
    ...
hash_data(algorithm, data)

Hash data using specified algorithm.

Source code in spindlex/crypto/backend.py
def hash_data(self, algorithm: str, data: bytes) -> bytes:
    """Hash data using specified algorithm."""
    ...

CryptographyBackend

Cryptography library backend implementation.

Implements CryptoBackend interface using the Python cryptography library for modern, secure cryptographic operations.

Source code in spindlex/crypto/backend.py
class CryptographyBackend:
    """
    Cryptography library backend implementation.

    Implements CryptoBackend interface using the Python cryptography library
    for modern, secure cryptographic operations.
    """

    # Hash algorithm mapping — SHA-1 intentionally excluded (weak, not negotiated)
    HASH_ALGORITHMS = {
        "sha256": hashes.SHA256,
        "sha384": hashes.SHA384,
        "sha512": hashes.SHA512,
    }

    # MAC algorithm mapping
    MAC_ALGORITHMS = {
        "hmac-sha2-256": hashes.SHA256,
        "hmac-sha2-512": hashes.SHA512,
        "hmac-sha256": hashes.SHA256,  # Alias
        "hmac-sha512": hashes.SHA512,  # Alias
    }

    def __init__(self) -> None:
        """Initialize cryptography backend."""
        pass

    def generate_random(self, length: int) -> bytes:
        """
        Generate cryptographically secure random bytes.

        Args:
            length: Number of random bytes to generate

        Returns:
            Cryptographically secure random bytes

        Raises:
            CryptoException: If random generation fails
        """
        try:
            return os.urandom(length)
        except Exception as e:
            raise CryptoException(f"Failed to generate random bytes: {e}") from e

    def hash_data(self, algorithm: str, data: bytes) -> bytes:
        """
        Hash data using specified algorithm.

        Args:
            algorithm: Hash algorithm name (sha256, sha384, sha512)
            data: Data to hash

        Returns:
            Hash digest

        Raises:
            CryptoException: If hashing fails or algorithm unsupported
        """
        try:
            if algorithm not in self.HASH_ALGORITHMS:
                raise CryptoException(f"Unsupported hash algorithm: {algorithm}")

            # Ensure data is bytes (not bytearray)
            data_bytes = bytes(data)

            hash_class = self.HASH_ALGORITHMS[algorithm]
            digest = hashes.Hash(hash_class())  # type: ignore[abstract]
            digest.update(data_bytes)
            return digest.finalize()
        except CryptoException:
            raise
        except Exception as e:
            raise CryptoException(f"Hash operation failed: {e}") from e

    def encrypt(self, algorithm: str, key: bytes, iv: bytes, data: bytes) -> bytes:
        """
        Encrypt data using specified cipher.

        Args:
            algorithm: Cipher algorithm name
            key: Encryption key
            iv: Initialization vector or nonce
            data: Data to encrypt

        Returns:
            Encrypted data

        Raises:
            CryptoException: If encryption fails or algorithm unsupported
        """
        try:
            # Ensure all inputs are bytes
            key_bytes = bytes(key)
            iv_bytes = bytes(iv)
            data_bytes = bytes(data)

            if algorithm in ["aes128-ctr", "aes192-ctr", "aes256-ctr"]:
                cipher_algo = algorithms.AES(key_bytes)
                mode = modes.CTR(iv_bytes)
                cipher = Cipher(cipher_algo, mode)
                encryptor = cipher.encryptor()
                return bytes(encryptor.update(data_bytes) + encryptor.finalize())
            else:
                raise CryptoException(f"Unsupported cipher algorithm: {algorithm}")
        except Exception as e:
            raise CryptoException(f"Encryption failed: {e}") from e

    def decrypt(self, algorithm: str, key: bytes, iv: bytes, data: bytes) -> bytes:
        """
        Decrypt data using specified cipher.

        Args:
            algorithm: Cipher algorithm name
            key: Decryption key
            iv: Initialization vector or nonce
            data: Data to decrypt

        Returns:
            Decrypted data

        Raises:
            CryptoException: If decryption fails or algorithm unsupported
        """
        try:
            # Ensure all inputs are bytes
            key_bytes = bytes(key)
            iv_bytes = bytes(iv)
            data_bytes = bytes(data)

            if algorithm in ["aes128-ctr", "aes192-ctr", "aes256-ctr"]:
                cipher_algo = algorithms.AES(key_bytes)
                mode = modes.CTR(iv_bytes)
                cipher = Cipher(cipher_algo, mode)
                decryptor = cipher.decryptor()
                return bytes(decryptor.update(data_bytes) + decryptor.finalize())
            else:
                raise CryptoException(f"Unsupported cipher algorithm: {algorithm}")
        except Exception as e:
            raise CryptoException(f"Decryption failed: {e}") from e

    def create_cipher(self, algorithm: str, key: bytes, iv: bytes) -> Any:
        """
        Create cipher instance for streaming operations.

        Args:
            algorithm: Cipher algorithm name
            key: Encryption/decryption key
            iv: Initialization vector

        Returns:
            Cipher instance for streaming operations

        Raises:
            CryptoException: If cipher creation fails
        """
        try:
            # Ensure all inputs are bytes
            key_bytes = bytes(key)
            iv_bytes = bytes(iv)

            if algorithm in ["aes128-ctr", "aes192-ctr", "aes256-ctr"]:
                cipher_algo = algorithms.AES(key_bytes)
                mode = modes.CTR(iv_bytes)
                return Cipher(cipher_algo, mode)
            else:
                raise CryptoException(
                    f"Streaming cipher not supported for: {algorithm}"
                )
        except Exception as e:
            raise CryptoException(f"Cipher creation failed: {e}") from e

    def compute_mac(self, algorithm: str, key: bytes, data: bytes) -> bytes:
        """
        Compute MAC using specified algorithm.

        Args:
            algorithm: MAC algorithm name
            key: MAC key
            data: Data to authenticate

        Returns:
            MAC digest

        Raises:
            CryptoException: If MAC computation fails
        """
        try:
            if algorithm not in self.MAC_ALGORITHMS:
                raise CryptoException(f"Unsupported MAC algorithm: {algorithm}")

            # Ensure all inputs are bytes
            key_bytes = bytes(key)
            data_bytes = bytes(data)

            hash_class = self.MAC_ALGORITHMS[algorithm]
            h = hmac.HMAC(key_bytes, hash_class())  # type: ignore
            h.update(data_bytes)
            return bytes(h.finalize())
        except Exception as e:
            raise CryptoException(f"MAC computation failed: {e}") from e

    def derive_key(
        self,
        algorithm: str,
        shared_secret: bytes,
        exchange_hash: bytes,
        session_id: bytes,
        key_type: bytes,
        key_length: int,
    ) -> bytes:
        """
        Derive encryption/MAC keys from shared secret using SSH key derivation.

        Args:
            algorithm: Hash algorithm for key derivation
            shared_secret: Shared secret K - MUST be mpint-encoded per RFC 4253 §7.2,
                i.e. a 4-byte big-endian length prefix followed by the minimal
                two's-complement big-endian representation of K (with a leading
                0x00 byte if the MSB is set). Pass write_mpint(K) from protocol.utils.
            exchange_hash: Hash of key exchange
            session_id: Session identifier
            key_type: Key type identifier (A, B, C, D, E, F)
            key_length: Required key length

        Returns:
            Derived key

        Raises:
            CryptoException: If key derivation fails or shared_secret is not mpint-encoded
        """
        try:
            if algorithm not in self.HASH_ALGORITHMS:
                raise CryptoException(f"Unsupported hash algorithm: {algorithm}")

            hash_class = self.HASH_ALGORITHMS[algorithm]

            # Ensure all inputs are bytes
            shared_secret_bytes = bytes(shared_secret)

            # Validate mpint envelope: 4-byte length prefix + payload (RFC 4253 §7.2)
            if len(shared_secret_bytes) < 4:
                raise CryptoException(
                    "shared_secret must be mpint-encoded (RFC 4253 §7.2): too short"
                )
            declared = struct.unpack(">I", shared_secret_bytes[:4])[0]
            if len(shared_secret_bytes) != 4 + declared:
                raise CryptoException(
                    "shared_secret must be mpint-encoded (RFC 4253 §7.2): "
                    f"declared length {declared} does not match actual payload length "
                    f"{len(shared_secret_bytes) - 4}"
                )
            exchange_hash_bytes = bytes(exchange_hash)
            key_type_bytes = bytes(key_type)
            session_id_bytes = bytes(session_id)

            # SSH key derivation: K || H || key_type || session_id
            initial_data = (
                shared_secret_bytes
                + exchange_hash_bytes
                + key_type_bytes
                + session_id_bytes
            )

            # Hash the initial data
            digest = hashes.Hash(hash_class())  # type: ignore[abstract]
            digest.update(initial_data)
            key_material = digest.finalize()

            # Extend key material if needed
            while len(key_material) < key_length:
                digest = hashes.Hash(hash_class())  # type: ignore[abstract]
                digest.update(shared_secret_bytes + exchange_hash_bytes + key_material)
                key_material += digest.finalize()

            return key_material[:key_length]
        except Exception as e:
            raise CryptoException(f"Key derivation failed: {e}") from e

    def decrypt_length(
        self, algorithm: str, key: bytes, iv: bytes, data: bytes
    ) -> bytes:
        """
        Decrypt only the length field for ciphers that encrypt it.

        For AES-CTR (the only cipher currently supported), the full packet
        including the length field is decrypted together, so this method is
        not used. It is retained as a hook for future CBC-mode support where
        the 4-byte length must be decrypted before knowing how much payload
        to read from the socket.

        Returns:
            ``data`` unchanged (passthrough for CTR-mode callers).
        """
        return bytes(data)

    def chacha20_poly1305_encrypt(
        self, key_64: bytes, seq_num: int, length_bytes: bytes, body_bytes: bytes
    ) -> bytes:
        seq_be = struct.pack(">Q", seq_num)
        nonce_c0 = b"\x00" * 8 + seq_be
        nonce_c1 = b"\x01\x00\x00\x00" + b"\x00" * 4 + seq_be
        key_body = key_64[:32]
        key_header = key_64[32:]
        enc_length = (
            Cipher(algorithms.ChaCha20(key_header, nonce_c0), mode=None)
            .encryptor()
            .update(length_bytes)
        )
        poly_key = (
            Cipher(algorithms.ChaCha20(key_body, nonce_c0), mode=None)
            .encryptor()
            .update(b"\x00" * 32)
        )
        enc_body = (
            Cipher(algorithms.ChaCha20(key_body, nonce_c1), mode=None)
            .encryptor()
            .update(body_bytes)
        )
        ciphertext = enc_length + enc_body
        tag = Poly1305.generate_tag(poly_key, ciphertext)
        return ciphertext + tag

    def chacha20_poly1305_decrypt_length(
        self, key_64: bytes, seq_num: int, enc_length: bytes
    ) -> bytes:
        seq_be = struct.pack(">Q", seq_num)
        nonce_c0 = b"\x00" * 4 + b"\x00" * 4 + seq_be
        key_header = key_64[32:]
        return (
            Cipher(algorithms.ChaCha20(key_header, nonce_c0), mode=None)
            .encryptor()
            .update(enc_length)
        )

    def chacha20_poly1305_decrypt_body(
        self,
        key_64: bytes,
        seq_num: int,
        enc_length: bytes,
        enc_body: bytes,
        tag: bytes,
    ) -> bytes:
        seq_be = struct.pack(">Q", seq_num)
        nonce_c0 = b"\x00" * 4 + b"\x00" * 4 + seq_be
        nonce_c1 = b"\x01\x00\x00\x00" + b"\x00" * 4 + seq_be
        key_body = key_64[:32]
        poly_key = (
            Cipher(algorithms.ChaCha20(key_body, nonce_c0), mode=None)
            .encryptor()
            .update(b"\x00" * 32)
        )
        try:
            Poly1305.verify_tag(poly_key, enc_length + enc_body, tag)
        except InvalidSignature as e:
            raise CryptoException("Poly1305 authentication failed") from e
        return (
            Cipher(algorithms.ChaCha20(key_body, nonce_c1), mode=None)
            .encryptor()
            .update(enc_body)
        )
Methods:
__init__()

Initialize cryptography backend.

Source code in spindlex/crypto/backend.py
def __init__(self) -> None:
    """Initialize cryptography backend."""
    pass
compute_mac(algorithm, key, data)

Compute MAC using specified algorithm.

Parameters:

Name Type Description Default
algorithm str

MAC algorithm name

required
key bytes

MAC key

required
data bytes

Data to authenticate

required

Returns:

Type Description
bytes

MAC digest

Raises:

Type Description
CryptoException

If MAC computation fails

Source code in spindlex/crypto/backend.py
def compute_mac(self, algorithm: str, key: bytes, data: bytes) -> bytes:
    """
    Compute MAC using specified algorithm.

    Args:
        algorithm: MAC algorithm name
        key: MAC key
        data: Data to authenticate

    Returns:
        MAC digest

    Raises:
        CryptoException: If MAC computation fails
    """
    try:
        if algorithm not in self.MAC_ALGORITHMS:
            raise CryptoException(f"Unsupported MAC algorithm: {algorithm}")

        # Ensure all inputs are bytes
        key_bytes = bytes(key)
        data_bytes = bytes(data)

        hash_class = self.MAC_ALGORITHMS[algorithm]
        h = hmac.HMAC(key_bytes, hash_class())  # type: ignore
        h.update(data_bytes)
        return bytes(h.finalize())
    except Exception as e:
        raise CryptoException(f"MAC computation failed: {e}") from e
create_cipher(algorithm, key, iv)

Create cipher instance for streaming operations.

Parameters:

Name Type Description Default
algorithm str

Cipher algorithm name

required
key bytes

Encryption/decryption key

required
iv bytes

Initialization vector

required

Returns:

Type Description
Any

Cipher instance for streaming operations

Raises:

Type Description
CryptoException

If cipher creation fails

Source code in spindlex/crypto/backend.py
def create_cipher(self, algorithm: str, key: bytes, iv: bytes) -> Any:
    """
    Create cipher instance for streaming operations.

    Args:
        algorithm: Cipher algorithm name
        key: Encryption/decryption key
        iv: Initialization vector

    Returns:
        Cipher instance for streaming operations

    Raises:
        CryptoException: If cipher creation fails
    """
    try:
        # Ensure all inputs are bytes
        key_bytes = bytes(key)
        iv_bytes = bytes(iv)

        if algorithm in ["aes128-ctr", "aes192-ctr", "aes256-ctr"]:
            cipher_algo = algorithms.AES(key_bytes)
            mode = modes.CTR(iv_bytes)
            return Cipher(cipher_algo, mode)
        else:
            raise CryptoException(
                f"Streaming cipher not supported for: {algorithm}"
            )
    except Exception as e:
        raise CryptoException(f"Cipher creation failed: {e}") from e
decrypt(algorithm, key, iv, data)

Decrypt data using specified cipher.

Parameters:

Name Type Description Default
algorithm str

Cipher algorithm name

required
key bytes

Decryption key

required
iv bytes

Initialization vector or nonce

required
data bytes

Data to decrypt

required

Returns:

Type Description
bytes

Decrypted data

Raises:

Type Description
CryptoException

If decryption fails or algorithm unsupported

Source code in spindlex/crypto/backend.py
def decrypt(self, algorithm: str, key: bytes, iv: bytes, data: bytes) -> bytes:
    """
    Decrypt data using specified cipher.

    Args:
        algorithm: Cipher algorithm name
        key: Decryption key
        iv: Initialization vector or nonce
        data: Data to decrypt

    Returns:
        Decrypted data

    Raises:
        CryptoException: If decryption fails or algorithm unsupported
    """
    try:
        # Ensure all inputs are bytes
        key_bytes = bytes(key)
        iv_bytes = bytes(iv)
        data_bytes = bytes(data)

        if algorithm in ["aes128-ctr", "aes192-ctr", "aes256-ctr"]:
            cipher_algo = algorithms.AES(key_bytes)
            mode = modes.CTR(iv_bytes)
            cipher = Cipher(cipher_algo, mode)
            decryptor = cipher.decryptor()
            return bytes(decryptor.update(data_bytes) + decryptor.finalize())
        else:
            raise CryptoException(f"Unsupported cipher algorithm: {algorithm}")
    except Exception as e:
        raise CryptoException(f"Decryption failed: {e}") from e
decrypt_length(algorithm, key, iv, data)

Decrypt only the length field for ciphers that encrypt it.

For AES-CTR (the only cipher currently supported), the full packet including the length field is decrypted together, so this method is not used. It is retained as a hook for future CBC-mode support where the 4-byte length must be decrypted before knowing how much payload to read from the socket.

Returns:

Type Description
bytes

data unchanged (passthrough for CTR-mode callers).

Source code in spindlex/crypto/backend.py
def decrypt_length(
    self, algorithm: str, key: bytes, iv: bytes, data: bytes
) -> bytes:
    """
    Decrypt only the length field for ciphers that encrypt it.

    For AES-CTR (the only cipher currently supported), the full packet
    including the length field is decrypted together, so this method is
    not used. It is retained as a hook for future CBC-mode support where
    the 4-byte length must be decrypted before knowing how much payload
    to read from the socket.

    Returns:
        ``data`` unchanged (passthrough for CTR-mode callers).
    """
    return bytes(data)
derive_key(algorithm, shared_secret, exchange_hash, session_id, key_type, key_length)

Derive encryption/MAC keys from shared secret using SSH key derivation.

Parameters:

Name Type Description Default
algorithm str

Hash algorithm for key derivation

required
shared_secret bytes

Shared secret K - MUST be mpint-encoded per RFC 4253 §7.2, i.e. a 4-byte big-endian length prefix followed by the minimal two's-complement big-endian representation of K (with a leading 0x00 byte if the MSB is set). Pass write_mpint(K) from protocol.utils.

required
exchange_hash bytes

Hash of key exchange

required
session_id bytes

Session identifier

required
key_type bytes

Key type identifier (A, B, C, D, E, F)

required
key_length int

Required key length

required

Returns:

Type Description
bytes

Derived key

Raises:

Type Description
CryptoException

If key derivation fails or shared_secret is not mpint-encoded

Source code in spindlex/crypto/backend.py
def derive_key(
    self,
    algorithm: str,
    shared_secret: bytes,
    exchange_hash: bytes,
    session_id: bytes,
    key_type: bytes,
    key_length: int,
) -> bytes:
    """
    Derive encryption/MAC keys from shared secret using SSH key derivation.

    Args:
        algorithm: Hash algorithm for key derivation
        shared_secret: Shared secret K - MUST be mpint-encoded per RFC 4253 §7.2,
            i.e. a 4-byte big-endian length prefix followed by the minimal
            two's-complement big-endian representation of K (with a leading
            0x00 byte if the MSB is set). Pass write_mpint(K) from protocol.utils.
        exchange_hash: Hash of key exchange
        session_id: Session identifier
        key_type: Key type identifier (A, B, C, D, E, F)
        key_length: Required key length

    Returns:
        Derived key

    Raises:
        CryptoException: If key derivation fails or shared_secret is not mpint-encoded
    """
    try:
        if algorithm not in self.HASH_ALGORITHMS:
            raise CryptoException(f"Unsupported hash algorithm: {algorithm}")

        hash_class = self.HASH_ALGORITHMS[algorithm]

        # Ensure all inputs are bytes
        shared_secret_bytes = bytes(shared_secret)

        # Validate mpint envelope: 4-byte length prefix + payload (RFC 4253 §7.2)
        if len(shared_secret_bytes) < 4:
            raise CryptoException(
                "shared_secret must be mpint-encoded (RFC 4253 §7.2): too short"
            )
        declared = struct.unpack(">I", shared_secret_bytes[:4])[0]
        if len(shared_secret_bytes) != 4 + declared:
            raise CryptoException(
                "shared_secret must be mpint-encoded (RFC 4253 §7.2): "
                f"declared length {declared} does not match actual payload length "
                f"{len(shared_secret_bytes) - 4}"
            )
        exchange_hash_bytes = bytes(exchange_hash)
        key_type_bytes = bytes(key_type)
        session_id_bytes = bytes(session_id)

        # SSH key derivation: K || H || key_type || session_id
        initial_data = (
            shared_secret_bytes
            + exchange_hash_bytes
            + key_type_bytes
            + session_id_bytes
        )

        # Hash the initial data
        digest = hashes.Hash(hash_class())  # type: ignore[abstract]
        digest.update(initial_data)
        key_material = digest.finalize()

        # Extend key material if needed
        while len(key_material) < key_length:
            digest = hashes.Hash(hash_class())  # type: ignore[abstract]
            digest.update(shared_secret_bytes + exchange_hash_bytes + key_material)
            key_material += digest.finalize()

        return key_material[:key_length]
    except Exception as e:
        raise CryptoException(f"Key derivation failed: {e}") from e
encrypt(algorithm, key, iv, data)

Encrypt data using specified cipher.

Parameters:

Name Type Description Default
algorithm str

Cipher algorithm name

required
key bytes

Encryption key

required
iv bytes

Initialization vector or nonce

required
data bytes

Data to encrypt

required

Returns:

Type Description
bytes

Encrypted data

Raises:

Type Description
CryptoException

If encryption fails or algorithm unsupported

Source code in spindlex/crypto/backend.py
def encrypt(self, algorithm: str, key: bytes, iv: bytes, data: bytes) -> bytes:
    """
    Encrypt data using specified cipher.

    Args:
        algorithm: Cipher algorithm name
        key: Encryption key
        iv: Initialization vector or nonce
        data: Data to encrypt

    Returns:
        Encrypted data

    Raises:
        CryptoException: If encryption fails or algorithm unsupported
    """
    try:
        # Ensure all inputs are bytes
        key_bytes = bytes(key)
        iv_bytes = bytes(iv)
        data_bytes = bytes(data)

        if algorithm in ["aes128-ctr", "aes192-ctr", "aes256-ctr"]:
            cipher_algo = algorithms.AES(key_bytes)
            mode = modes.CTR(iv_bytes)
            cipher = Cipher(cipher_algo, mode)
            encryptor = cipher.encryptor()
            return bytes(encryptor.update(data_bytes) + encryptor.finalize())
        else:
            raise CryptoException(f"Unsupported cipher algorithm: {algorithm}")
    except Exception as e:
        raise CryptoException(f"Encryption failed: {e}") from e
generate_random(length)

Generate cryptographically secure random bytes.

Parameters:

Name Type Description Default
length int

Number of random bytes to generate

required

Returns:

Type Description
bytes

Cryptographically secure random bytes

Raises:

Type Description
CryptoException

If random generation fails

Source code in spindlex/crypto/backend.py
def generate_random(self, length: int) -> bytes:
    """
    Generate cryptographically secure random bytes.

    Args:
        length: Number of random bytes to generate

    Returns:
        Cryptographically secure random bytes

    Raises:
        CryptoException: If random generation fails
    """
    try:
        return os.urandom(length)
    except Exception as e:
        raise CryptoException(f"Failed to generate random bytes: {e}") from e
hash_data(algorithm, data)

Hash data using specified algorithm.

Parameters:

Name Type Description Default
algorithm str

Hash algorithm name (sha256, sha384, sha512)

required
data bytes

Data to hash

required

Returns:

Type Description
bytes

Hash digest

Raises:

Type Description
CryptoException

If hashing fails or algorithm unsupported

Source code in spindlex/crypto/backend.py
def hash_data(self, algorithm: str, data: bytes) -> bytes:
    """
    Hash data using specified algorithm.

    Args:
        algorithm: Hash algorithm name (sha256, sha384, sha512)
        data: Data to hash

    Returns:
        Hash digest

    Raises:
        CryptoException: If hashing fails or algorithm unsupported
    """
    try:
        if algorithm not in self.HASH_ALGORITHMS:
            raise CryptoException(f"Unsupported hash algorithm: {algorithm}")

        # Ensure data is bytes (not bytearray)
        data_bytes = bytes(data)

        hash_class = self.HASH_ALGORITHMS[algorithm]
        digest = hashes.Hash(hash_class())  # type: ignore[abstract]
        digest.update(data_bytes)
        return digest.finalize()
    except CryptoException:
        raise
    except Exception as e:
        raise CryptoException(f"Hash operation failed: {e}") from e

ECDSAKey

Bases: PKey

ECDSA SSH key implementation.

Implements ecdsa-sha2-nistp256 key type using NIST P-256 curve.

Source code in spindlex/crypto/pkey.py
class ECDSAKey(PKey):
    """
    ECDSA SSH key implementation.

    Implements ecdsa-sha2-nistp256 key type using NIST P-256 curve.
    """

    def __init__(
        self,
        crypto_backend: Optional[CryptoBackend] = None,
        curve_name: str = "nistp256",
    ) -> None:
        """Initialize ECDSA key with specified curve."""
        super().__init__(crypto_backend)
        self.curve_name = curve_name
        self.curve: ec.EllipticCurve
        if curve_name == "nistp256":
            self.curve = ec.SECP256R1()
        elif curve_name == "nistp384":
            self.curve = ec.SECP384R1()
        elif curve_name == "nistp521":
            self.curve = ec.SECP521R1()
        else:
            raise CryptoException(f"Unsupported ECDSA curve: {curve_name}")

    @property
    def algorithm_name(self) -> str:
        """Get SSH algorithm name."""
        return f"ecdsa-sha2-{self.curve_name}"

    def _get_hash_algo(self) -> Any:
        """Get appropriate hash algorithm for the curve."""
        if self.curve_name == "nistp256":
            return hashes.SHA256()
        elif self.curve_name == "nistp384":
            return hashes.SHA384()
        elif self.curve_name == "nistp521":
            return hashes.SHA512()
        return hashes.SHA256()

    def load_private_key(
        self, key_data: bytes, password: Optional[bytes] = None
    ) -> None:
        """
        Load ECDSA private key.

        Args:
            key_data: Private key data (PEM or OpenSSH format)
            password: Optional password for encrypted keys

        Raises:
            CryptoException: If key loading fails
        """
        try:
            if b"BEGIN OPENSSH PRIVATE KEY" in key_data:
                self._key = serialization.load_ssh_private_key(
                    key_data, password=password
                )
            else:
                self._key = serialization.load_pem_private_key(
                    key_data, password=password
                )
            if not isinstance(self._key, ec.EllipticCurvePrivateKey):
                raise CryptoException("Key is not ECDSA private key")

            # Update curve based on loaded key
            if isinstance(self._key.curve, ec.SECP256R1):
                self.curve_name = "nistp256"
                self.curve = self._key.curve
            elif isinstance(self._key.curve, ec.SECP384R1):
                self.curve_name = "nistp384"
                self.curve = self._key.curve
            elif isinstance(self._key.curve, ec.SECP521R1):
                self.curve_name = "nistp521"
                self.curve = self._key.curve
            else:
                raise CryptoException(
                    f"Unsupported ECDSA curve: {type(self._key.curve)}"
                )
        except Exception as e:
            raise CryptoException(f"Failed to load ECDSA private key: {e}") from e

    def load_public_key(self, key_data: bytes) -> None:
        """
        Load ECDSA public key from SSH wire format.

        Args:
            key_data: Public key data in SSH wire format

        Raises:
            CryptoException: If key loading fails
        """
        try:
            # Parse SSH wire format
            offset = 0

            # Read algorithm name
            algo_len = struct.unpack(">I", key_data[offset : offset + 4])[0]
            offset += 4
            algorithm = key_data[offset : offset + algo_len].decode()
            offset += algo_len

            if not algorithm.startswith("ecdsa-sha2-") and algorithm != "ecdsa":
                raise CryptoException(f"Expected ECDSA algorithm, got {algorithm}")

            # Read curve name
            curve_len = struct.unpack(">I", key_data[offset : offset + 4])[0]
            offset += 4
            curve_name = key_data[offset : offset + curve_len].decode()
            offset += curve_len

            if curve_name not in ["nistp256", "nistp384", "nistp521"]:
                raise CryptoException(f"Unsupported curve, got {curve_name}")

            self.curve_name = curve_name
            if curve_name == "nistp256":
                self.curve = ec.SECP256R1()
            elif curve_name == "nistp384":
                self.curve = ec.SECP384R1()
            elif curve_name == "nistp521":
                self.curve = ec.SECP521R1()

            # Read public key point
            point_len = struct.unpack(">I", key_data[offset : offset + 4])[0]
            offset += 4
            point_bytes = key_data[offset : offset + point_len]

            # Load public key from uncompressed point
            self._key = ec.EllipticCurvePublicKey.from_encoded_point(
                self.curve, point_bytes
            )
        except Exception as e:
            raise CryptoException(f"Failed to load ECDSA public key: {e}") from e

    def get_public_key_bytes(self) -> bytes:
        """
        Get ECDSA public key in SSH wire format.

        Returns:
            Public key bytes in SSH wire format

        Raises:
            CryptoException: If no key loaded
        """
        try:
            if self._key is None:
                raise CryptoException("No key loaded")

            # Get public key
            if isinstance(self._key, ec.EllipticCurvePrivateKey):
                public_key = self._key.public_key()
            else:
                public_key = self._key

            # Get uncompressed point
            point_bytes = public_key.public_bytes(
                encoding=serialization.Encoding.X962,
                format=serialization.PublicFormat.UncompressedPoint,
            )

            # Format as SSH wire format
            algorithm = self.algorithm_name.encode()
            curve_name = self.curve_name.encode()

            result = struct.pack(">I", len(algorithm)) + algorithm
            result += struct.pack(">I", len(curve_name)) + curve_name
            result += struct.pack(">I", len(point_bytes)) + point_bytes
            return result
        except Exception as e:
            raise CryptoException(f"Failed to get ECDSA public key bytes: {e}") from e

    def sign(self, data: bytes) -> bytes:
        """
        Sign data with ECDSA private key.

        Args:
            data: Data to sign

        Returns:
            Signature bytes in SSH format

        Raises:
            CryptoException: If signing fails or no private key
        """
        try:
            if not isinstance(self._key, ec.EllipticCurvePrivateKey):
                raise CryptoException("No ECDSA private key loaded")

            # Sign data with appropriate hash
            hash_algo = self._get_hash_algo()
            signature = self._key.sign(data, ec.ECDSA(hash_algo))

            # Convert DER signature to SSH format (r, s values)
            r, s = decode_dss_signature(signature)

            # Create SSH signature blob with proper mpint encoding
            sig_blob = write_mpint(r) + write_mpint(s)

            # Format as SSH signature
            algorithm = self.algorithm_name.encode()
            result = struct.pack(">I", len(algorithm)) + algorithm
            result += struct.pack(">I", len(sig_blob)) + sig_blob
            return result
        except Exception as e:
            raise CryptoException(f"ECDSA signing failed: {e}") from e

    @classmethod
    def generate(
        cls, key_type: str = "ecdsa", bits: int = 256, *args: Any, **kwargs: Any
    ) -> "ECDSAKey":
        """Generate a new ECDSA key pair (default P-256)."""
        curve_name = "nistp256"
        if bits == 384:
            curve_name = "nistp384"
        elif bits == 521:
            curve_name = "nistp521"

        key = cls(curve_name=curve_name)
        key._key = ec.generate_private_key(key.curve)
        return key

    def save_to_file(self, filename: str, password: Optional[str] = None) -> None:
        """Save ECDSA private key to file."""
        try:
            if not isinstance(self._key, ec.EllipticCurvePrivateKey):
                raise CryptoException("No ECDSA private key loaded")

            encryption_algorithm = (
                serialization.BestAvailableEncryption(password.encode())
                if password
                else serialization.NoEncryption()
            )

            pem = self._key.private_bytes(
                encoding=serialization.Encoding.PEM,
                format=serialization.PrivateFormat.PKCS8,
                encryption_algorithm=encryption_algorithm,
            )

            with open(filename, "wb") as f:
                f.write(pem)
        except Exception as e:
            raise CryptoException(f"Failed to save ECDSA key: {e}") from e

    def verify(self, signature: bytes, data: bytes) -> bool:
        """
        Verify ECDSA signature.

        Args:
            signature: Signature bytes in SSH format
            data: Original data that was signed

        Returns:
            True if signature is valid, False otherwise
        """
        try:
            if self._key is None:
                return False

            # Get public key
            if isinstance(self._key, ec.EllipticCurvePrivateKey):
                public_key = self._key.public_key()
            else:
                public_key = self._key

            # Parse SSH signature format
            offset = 0
            algo_len = struct.unpack(">I", signature[offset : offset + 4])[0]
            offset += 4
            algorithm = signature[offset : offset + algo_len].decode()
            offset += algo_len

            if algorithm != self.algorithm_name:
                return False

            sig_len = struct.unpack(">I", signature[offset : offset + 4])[0]
            offset += 4
            sig_blob = signature[offset : offset + sig_len]

            # Parse r and s values
            blob_offset = 0
            r_len = struct.unpack(">I", sig_blob[blob_offset : blob_offset + 4])[0]
            blob_offset += 4
            r_bytes = sig_blob[blob_offset : blob_offset + r_len]
            blob_offset += r_len

            s_len = struct.unpack(">I", sig_blob[blob_offset : blob_offset + 4])[0]
            blob_offset += 4
            s_bytes = sig_blob[blob_offset : blob_offset + s_len]

            # Convert to integers and create DER signature
            r = int.from_bytes(r_bytes, "big")
            s = int.from_bytes(s_bytes, "big")
            der_signature = encode_dss_signature(r, s)

            # Verify signature
            hash_algo = self._get_hash_algo()
            public_key.verify(der_signature, data, ec.ECDSA(hash_algo))
            return True
        except Exception:
            return False
Attributes
algorithm_name property

Get SSH algorithm name.

Methods:
__init__(crypto_backend=None, curve_name='nistp256')

Initialize ECDSA key with specified curve.

Source code in spindlex/crypto/pkey.py
def __init__(
    self,
    crypto_backend: Optional[CryptoBackend] = None,
    curve_name: str = "nistp256",
) -> None:
    """Initialize ECDSA key with specified curve."""
    super().__init__(crypto_backend)
    self.curve_name = curve_name
    self.curve: ec.EllipticCurve
    if curve_name == "nistp256":
        self.curve = ec.SECP256R1()
    elif curve_name == "nistp384":
        self.curve = ec.SECP384R1()
    elif curve_name == "nistp521":
        self.curve = ec.SECP521R1()
    else:
        raise CryptoException(f"Unsupported ECDSA curve: {curve_name}")
generate(key_type='ecdsa', bits=256, *args, **kwargs) classmethod

Generate a new ECDSA key pair (default P-256).

Source code in spindlex/crypto/pkey.py
@classmethod
def generate(
    cls, key_type: str = "ecdsa", bits: int = 256, *args: Any, **kwargs: Any
) -> "ECDSAKey":
    """Generate a new ECDSA key pair (default P-256)."""
    curve_name = "nistp256"
    if bits == 384:
        curve_name = "nistp384"
    elif bits == 521:
        curve_name = "nistp521"

    key = cls(curve_name=curve_name)
    key._key = ec.generate_private_key(key.curve)
    return key
get_public_key_bytes()

Get ECDSA public key in SSH wire format.

Returns:

Type Description
bytes

Public key bytes in SSH wire format

Raises:

Type Description
CryptoException

If no key loaded

Source code in spindlex/crypto/pkey.py
def get_public_key_bytes(self) -> bytes:
    """
    Get ECDSA public key in SSH wire format.

    Returns:
        Public key bytes in SSH wire format

    Raises:
        CryptoException: If no key loaded
    """
    try:
        if self._key is None:
            raise CryptoException("No key loaded")

        # Get public key
        if isinstance(self._key, ec.EllipticCurvePrivateKey):
            public_key = self._key.public_key()
        else:
            public_key = self._key

        # Get uncompressed point
        point_bytes = public_key.public_bytes(
            encoding=serialization.Encoding.X962,
            format=serialization.PublicFormat.UncompressedPoint,
        )

        # Format as SSH wire format
        algorithm = self.algorithm_name.encode()
        curve_name = self.curve_name.encode()

        result = struct.pack(">I", len(algorithm)) + algorithm
        result += struct.pack(">I", len(curve_name)) + curve_name
        result += struct.pack(">I", len(point_bytes)) + point_bytes
        return result
    except Exception as e:
        raise CryptoException(f"Failed to get ECDSA public key bytes: {e}") from e
load_private_key(key_data, password=None)

Load ECDSA private key.

Parameters:

Name Type Description Default
key_data bytes

Private key data (PEM or OpenSSH format)

required
password Optional[bytes]

Optional password for encrypted keys

None

Raises:

Type Description
CryptoException

If key loading fails

Source code in spindlex/crypto/pkey.py
def load_private_key(
    self, key_data: bytes, password: Optional[bytes] = None
) -> None:
    """
    Load ECDSA private key.

    Args:
        key_data: Private key data (PEM or OpenSSH format)
        password: Optional password for encrypted keys

    Raises:
        CryptoException: If key loading fails
    """
    try:
        if b"BEGIN OPENSSH PRIVATE KEY" in key_data:
            self._key = serialization.load_ssh_private_key(
                key_data, password=password
            )
        else:
            self._key = serialization.load_pem_private_key(
                key_data, password=password
            )
        if not isinstance(self._key, ec.EllipticCurvePrivateKey):
            raise CryptoException("Key is not ECDSA private key")

        # Update curve based on loaded key
        if isinstance(self._key.curve, ec.SECP256R1):
            self.curve_name = "nistp256"
            self.curve = self._key.curve
        elif isinstance(self._key.curve, ec.SECP384R1):
            self.curve_name = "nistp384"
            self.curve = self._key.curve
        elif isinstance(self._key.curve, ec.SECP521R1):
            self.curve_name = "nistp521"
            self.curve = self._key.curve
        else:
            raise CryptoException(
                f"Unsupported ECDSA curve: {type(self._key.curve)}"
            )
    except Exception as e:
        raise CryptoException(f"Failed to load ECDSA private key: {e}") from e
load_public_key(key_data)

Load ECDSA public key from SSH wire format.

Parameters:

Name Type Description Default
key_data bytes

Public key data in SSH wire format

required

Raises:

Type Description
CryptoException

If key loading fails

Source code in spindlex/crypto/pkey.py
def load_public_key(self, key_data: bytes) -> None:
    """
    Load ECDSA public key from SSH wire format.

    Args:
        key_data: Public key data in SSH wire format

    Raises:
        CryptoException: If key loading fails
    """
    try:
        # Parse SSH wire format
        offset = 0

        # Read algorithm name
        algo_len = struct.unpack(">I", key_data[offset : offset + 4])[0]
        offset += 4
        algorithm = key_data[offset : offset + algo_len].decode()
        offset += algo_len

        if not algorithm.startswith("ecdsa-sha2-") and algorithm != "ecdsa":
            raise CryptoException(f"Expected ECDSA algorithm, got {algorithm}")

        # Read curve name
        curve_len = struct.unpack(">I", key_data[offset : offset + 4])[0]
        offset += 4
        curve_name = key_data[offset : offset + curve_len].decode()
        offset += curve_len

        if curve_name not in ["nistp256", "nistp384", "nistp521"]:
            raise CryptoException(f"Unsupported curve, got {curve_name}")

        self.curve_name = curve_name
        if curve_name == "nistp256":
            self.curve = ec.SECP256R1()
        elif curve_name == "nistp384":
            self.curve = ec.SECP384R1()
        elif curve_name == "nistp521":
            self.curve = ec.SECP521R1()

        # Read public key point
        point_len = struct.unpack(">I", key_data[offset : offset + 4])[0]
        offset += 4
        point_bytes = key_data[offset : offset + point_len]

        # Load public key from uncompressed point
        self._key = ec.EllipticCurvePublicKey.from_encoded_point(
            self.curve, point_bytes
        )
    except Exception as e:
        raise CryptoException(f"Failed to load ECDSA public key: {e}") from e
save_to_file(filename, password=None)

Save ECDSA private key to file.

Source code in spindlex/crypto/pkey.py
def save_to_file(self, filename: str, password: Optional[str] = None) -> None:
    """Save ECDSA private key to file."""
    try:
        if not isinstance(self._key, ec.EllipticCurvePrivateKey):
            raise CryptoException("No ECDSA private key loaded")

        encryption_algorithm = (
            serialization.BestAvailableEncryption(password.encode())
            if password
            else serialization.NoEncryption()
        )

        pem = self._key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.PKCS8,
            encryption_algorithm=encryption_algorithm,
        )

        with open(filename, "wb") as f:
            f.write(pem)
    except Exception as e:
        raise CryptoException(f"Failed to save ECDSA key: {e}") from e
sign(data)

Sign data with ECDSA private key.

Parameters:

Name Type Description Default
data bytes

Data to sign

required

Returns:

Type Description
bytes

Signature bytes in SSH format

Raises:

Type Description
CryptoException

If signing fails or no private key

Source code in spindlex/crypto/pkey.py
def sign(self, data: bytes) -> bytes:
    """
    Sign data with ECDSA private key.

    Args:
        data: Data to sign

    Returns:
        Signature bytes in SSH format

    Raises:
        CryptoException: If signing fails or no private key
    """
    try:
        if not isinstance(self._key, ec.EllipticCurvePrivateKey):
            raise CryptoException("No ECDSA private key loaded")

        # Sign data with appropriate hash
        hash_algo = self._get_hash_algo()
        signature = self._key.sign(data, ec.ECDSA(hash_algo))

        # Convert DER signature to SSH format (r, s values)
        r, s = decode_dss_signature(signature)

        # Create SSH signature blob with proper mpint encoding
        sig_blob = write_mpint(r) + write_mpint(s)

        # Format as SSH signature
        algorithm = self.algorithm_name.encode()
        result = struct.pack(">I", len(algorithm)) + algorithm
        result += struct.pack(">I", len(sig_blob)) + sig_blob
        return result
    except Exception as e:
        raise CryptoException(f"ECDSA signing failed: {e}") from e
verify(signature, data)

Verify ECDSA signature.

Parameters:

Name Type Description Default
signature bytes

Signature bytes in SSH format

required
data bytes

Original data that was signed

required

Returns:

Type Description
bool

True if signature is valid, False otherwise

Source code in spindlex/crypto/pkey.py
def verify(self, signature: bytes, data: bytes) -> bool:
    """
    Verify ECDSA signature.

    Args:
        signature: Signature bytes in SSH format
        data: Original data that was signed

    Returns:
        True if signature is valid, False otherwise
    """
    try:
        if self._key is None:
            return False

        # Get public key
        if isinstance(self._key, ec.EllipticCurvePrivateKey):
            public_key = self._key.public_key()
        else:
            public_key = self._key

        # Parse SSH signature format
        offset = 0
        algo_len = struct.unpack(">I", signature[offset : offset + 4])[0]
        offset += 4
        algorithm = signature[offset : offset + algo_len].decode()
        offset += algo_len

        if algorithm != self.algorithm_name:
            return False

        sig_len = struct.unpack(">I", signature[offset : offset + 4])[0]
        offset += 4
        sig_blob = signature[offset : offset + sig_len]

        # Parse r and s values
        blob_offset = 0
        r_len = struct.unpack(">I", sig_blob[blob_offset : blob_offset + 4])[0]
        blob_offset += 4
        r_bytes = sig_blob[blob_offset : blob_offset + r_len]
        blob_offset += r_len

        s_len = struct.unpack(">I", sig_blob[blob_offset : blob_offset + 4])[0]
        blob_offset += 4
        s_bytes = sig_blob[blob_offset : blob_offset + s_len]

        # Convert to integers and create DER signature
        r = int.from_bytes(r_bytes, "big")
        s = int.from_bytes(s_bytes, "big")
        der_signature = encode_dss_signature(r, s)

        # Verify signature
        hash_algo = self._get_hash_algo()
        public_key.verify(der_signature, data, ec.ECDSA(hash_algo))
        return True
    except Exception:
        return False

Ed25519Key

Bases: PKey

Ed25519 SSH key implementation.

Implements ssh-ed25519 key type using Ed25519 signature algorithm.

Source code in spindlex/crypto/pkey.py
class Ed25519Key(PKey):
    """
    Ed25519 SSH key implementation.

    Implements ssh-ed25519 key type using Ed25519 signature algorithm.
    """

    @property
    def algorithm_name(self) -> str:
        """Get SSH algorithm name."""
        return "ssh-ed25519"

    def load_private_key(
        self, key_data: bytes, password: Optional[bytes] = None
    ) -> None:
        """
        Load Ed25519 private key.

        Args:
            key_data: Private key data (PEM or OpenSSH format)
            password: Optional password for encrypted keys

        Raises:
            CryptoException: If key loading fails
        """
        try:
            if b"BEGIN OPENSSH PRIVATE KEY" in key_data:
                self._key = serialization.load_ssh_private_key(
                    key_data, password=password
                )
            else:
                self._key = serialization.load_pem_private_key(
                    key_data, password=password
                )

            if not isinstance(self._key, ed25519.Ed25519PrivateKey):
                raise CryptoException("Key is not Ed25519 private key")
        except Exception as e:
            raise CryptoException(f"Failed to load Ed25519 private key: {e}") from e

    def load_public_key(self, key_data: bytes) -> None:
        """
        Load Ed25519 public key from SSH wire format.

        Args:
            key_data: Public key data in SSH wire format

        Raises:
            CryptoException: If key loading fails
        """
        try:
            # Parse SSH wire format: string algorithm_name, string public_key
            offset = 0

            # Read algorithm name
            algo_len = struct.unpack(">I", key_data[offset : offset + 4])[0]
            offset += 4
            algorithm = key_data[offset : offset + algo_len].decode()
            offset += algo_len

            if algorithm not in ["ssh-ed25519", "ed25519"]:
                raise CryptoException(f"Expected ssh-ed25519, got {algorithm}")

            # Read public key bytes
            key_len = struct.unpack(">I", key_data[offset : offset + 4])[0]
            offset += 4
            public_key_bytes = key_data[offset : offset + key_len]

            if len(public_key_bytes) != 32:
                raise CryptoException("Invalid Ed25519 public key length")

            self._key = ed25519.Ed25519PublicKey.from_public_bytes(public_key_bytes)
        except Exception as e:
            raise CryptoException(f"Failed to load Ed25519 public key: {e}") from e

    def get_public_key_bytes(self) -> bytes:
        """
        Get Ed25519 public key in SSH wire format.

        Returns:
            Public key bytes in SSH wire format

        Raises:
            CryptoException: If no key loaded
        """
        try:
            if self._key is None:
                raise CryptoException("No key loaded")

            # Get public key
            if isinstance(self._key, ed25519.Ed25519PrivateKey):
                public_key = self._key.public_key()
            else:
                public_key = self._key

            # Get raw public key bytes
            public_key_bytes = public_key.public_bytes(
                encoding=serialization.Encoding.Raw,
                format=serialization.PublicFormat.Raw,
            )

            # Format as SSH wire format
            algorithm = b"ssh-ed25519"
            result = struct.pack(">I", len(algorithm)) + algorithm
            result += struct.pack(">I", len(public_key_bytes)) + public_key_bytes
            return result
        except Exception as e:
            raise CryptoException(f"Failed to get Ed25519 public key bytes: {e}") from e

    def sign(self, data: bytes) -> bytes:
        """
        Sign data with Ed25519 private key.

        Args:
            data: Data to sign

        Returns:
            Signature bytes in SSH format

        Raises:
            CryptoException: If signing fails or no private key
        """
        try:
            if not isinstance(self._key, ed25519.Ed25519PrivateKey):
                raise CryptoException("No Ed25519 private key loaded")

            # Sign data
            signature = self._key.sign(data)

            # Format as SSH signature: string algorithm_name, string signature_blob
            # Total size should be: 4(len) + 11("ssh-ed25519") + 4(len) + 64(sig)
            from ..protocol.utils import write_string

            return write_string("ssh-ed25519") + write_string(signature)

        except Exception as e:
            raise CryptoException(f"Ed25519 signing failed: {e}") from e

    @classmethod
    def generate(
        cls, key_type: str = "ed25519", bits: int = 256, *args: Any, **kwargs: Any
    ) -> "Ed25519Key":
        """Generate a new Ed25519 key pair."""
        key = cls()
        key._key = ed25519.Ed25519PrivateKey.generate()
        return key

    def save_to_file(self, filename: str, password: Optional[str] = None) -> None:
        """Save Ed25519 private key to file."""
        try:
            if not isinstance(self._key, ed25519.Ed25519PrivateKey):
                raise CryptoException("No Ed25519 private key loaded")

            encryption_algorithm = (
                serialization.BestAvailableEncryption(password.encode())
                if password
                else serialization.NoEncryption()
            )

            pem = self._key.private_bytes(
                encoding=serialization.Encoding.PEM,
                format=serialization.PrivateFormat.OpenSSH,
                encryption_algorithm=encryption_algorithm,
            )

            with open(filename, "wb") as f:
                f.write(pem)
        except Exception as e:
            raise CryptoException(f"Failed to save Ed25519 key: {e}") from e

    def verify(self, signature: bytes, data: bytes) -> bool:
        """
        Verify Ed25519 signature.

        Args:
            signature: Signature bytes in SSH format
            data: Original data that was signed

        Returns:
            True if signature is valid, False otherwise
        """
        try:
            if self._key is None:
                return False

            # Get public key
            if isinstance(self._key, ed25519.Ed25519PrivateKey):
                public_key = self._key.public_key()
            else:
                public_key = self._key

            # Parse SSH signature format
            offset = 0
            algo_len = struct.unpack(">I", signature[offset : offset + 4])[0]
            offset += 4
            algorithm = signature[offset : offset + algo_len].decode()
            offset += algo_len

            if algorithm != "ssh-ed25519":
                return False

            sig_len = struct.unpack(">I", signature[offset : offset + 4])[0]
            offset += 4
            sig_bytes = signature[offset : offset + sig_len]

            # Verify signature
            public_key.verify(sig_bytes, data)
            return True
        except Exception:
            return False
Attributes
algorithm_name property

Get SSH algorithm name.

Methods:
generate(key_type='ed25519', bits=256, *args, **kwargs) classmethod

Generate a new Ed25519 key pair.

Source code in spindlex/crypto/pkey.py
@classmethod
def generate(
    cls, key_type: str = "ed25519", bits: int = 256, *args: Any, **kwargs: Any
) -> "Ed25519Key":
    """Generate a new Ed25519 key pair."""
    key = cls()
    key._key = ed25519.Ed25519PrivateKey.generate()
    return key
get_public_key_bytes()

Get Ed25519 public key in SSH wire format.

Returns:

Type Description
bytes

Public key bytes in SSH wire format

Raises:

Type Description
CryptoException

If no key loaded

Source code in spindlex/crypto/pkey.py
def get_public_key_bytes(self) -> bytes:
    """
    Get Ed25519 public key in SSH wire format.

    Returns:
        Public key bytes in SSH wire format

    Raises:
        CryptoException: If no key loaded
    """
    try:
        if self._key is None:
            raise CryptoException("No key loaded")

        # Get public key
        if isinstance(self._key, ed25519.Ed25519PrivateKey):
            public_key = self._key.public_key()
        else:
            public_key = self._key

        # Get raw public key bytes
        public_key_bytes = public_key.public_bytes(
            encoding=serialization.Encoding.Raw,
            format=serialization.PublicFormat.Raw,
        )

        # Format as SSH wire format
        algorithm = b"ssh-ed25519"
        result = struct.pack(">I", len(algorithm)) + algorithm
        result += struct.pack(">I", len(public_key_bytes)) + public_key_bytes
        return result
    except Exception as e:
        raise CryptoException(f"Failed to get Ed25519 public key bytes: {e}") from e
load_private_key(key_data, password=None)

Load Ed25519 private key.

Parameters:

Name Type Description Default
key_data bytes

Private key data (PEM or OpenSSH format)

required
password Optional[bytes]

Optional password for encrypted keys

None

Raises:

Type Description
CryptoException

If key loading fails

Source code in spindlex/crypto/pkey.py
def load_private_key(
    self, key_data: bytes, password: Optional[bytes] = None
) -> None:
    """
    Load Ed25519 private key.

    Args:
        key_data: Private key data (PEM or OpenSSH format)
        password: Optional password for encrypted keys

    Raises:
        CryptoException: If key loading fails
    """
    try:
        if b"BEGIN OPENSSH PRIVATE KEY" in key_data:
            self._key = serialization.load_ssh_private_key(
                key_data, password=password
            )
        else:
            self._key = serialization.load_pem_private_key(
                key_data, password=password
            )

        if not isinstance(self._key, ed25519.Ed25519PrivateKey):
            raise CryptoException("Key is not Ed25519 private key")
    except Exception as e:
        raise CryptoException(f"Failed to load Ed25519 private key: {e}") from e
load_public_key(key_data)

Load Ed25519 public key from SSH wire format.

Parameters:

Name Type Description Default
key_data bytes

Public key data in SSH wire format

required

Raises:

Type Description
CryptoException

If key loading fails

Source code in spindlex/crypto/pkey.py
def load_public_key(self, key_data: bytes) -> None:
    """
    Load Ed25519 public key from SSH wire format.

    Args:
        key_data: Public key data in SSH wire format

    Raises:
        CryptoException: If key loading fails
    """
    try:
        # Parse SSH wire format: string algorithm_name, string public_key
        offset = 0

        # Read algorithm name
        algo_len = struct.unpack(">I", key_data[offset : offset + 4])[0]
        offset += 4
        algorithm = key_data[offset : offset + algo_len].decode()
        offset += algo_len

        if algorithm not in ["ssh-ed25519", "ed25519"]:
            raise CryptoException(f"Expected ssh-ed25519, got {algorithm}")

        # Read public key bytes
        key_len = struct.unpack(">I", key_data[offset : offset + 4])[0]
        offset += 4
        public_key_bytes = key_data[offset : offset + key_len]

        if len(public_key_bytes) != 32:
            raise CryptoException("Invalid Ed25519 public key length")

        self._key = ed25519.Ed25519PublicKey.from_public_bytes(public_key_bytes)
    except Exception as e:
        raise CryptoException(f"Failed to load Ed25519 public key: {e}") from e
save_to_file(filename, password=None)

Save Ed25519 private key to file.

Source code in spindlex/crypto/pkey.py
def save_to_file(self, filename: str, password: Optional[str] = None) -> None:
    """Save Ed25519 private key to file."""
    try:
        if not isinstance(self._key, ed25519.Ed25519PrivateKey):
            raise CryptoException("No Ed25519 private key loaded")

        encryption_algorithm = (
            serialization.BestAvailableEncryption(password.encode())
            if password
            else serialization.NoEncryption()
        )

        pem = self._key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.OpenSSH,
            encryption_algorithm=encryption_algorithm,
        )

        with open(filename, "wb") as f:
            f.write(pem)
    except Exception as e:
        raise CryptoException(f"Failed to save Ed25519 key: {e}") from e
sign(data)

Sign data with Ed25519 private key.

Parameters:

Name Type Description Default
data bytes

Data to sign

required

Returns:

Type Description
bytes

Signature bytes in SSH format

Raises:

Type Description
CryptoException

If signing fails or no private key

Source code in spindlex/crypto/pkey.py
def sign(self, data: bytes) -> bytes:
    """
    Sign data with Ed25519 private key.

    Args:
        data: Data to sign

    Returns:
        Signature bytes in SSH format

    Raises:
        CryptoException: If signing fails or no private key
    """
    try:
        if not isinstance(self._key, ed25519.Ed25519PrivateKey):
            raise CryptoException("No Ed25519 private key loaded")

        # Sign data
        signature = self._key.sign(data)

        # Format as SSH signature: string algorithm_name, string signature_blob
        # Total size should be: 4(len) + 11("ssh-ed25519") + 4(len) + 64(sig)
        from ..protocol.utils import write_string

        return write_string("ssh-ed25519") + write_string(signature)

    except Exception as e:
        raise CryptoException(f"Ed25519 signing failed: {e}") from e
verify(signature, data)

Verify Ed25519 signature.

Parameters:

Name Type Description Default
signature bytes

Signature bytes in SSH format

required
data bytes

Original data that was signed

required

Returns:

Type Description
bool

True if signature is valid, False otherwise

Source code in spindlex/crypto/pkey.py
def verify(self, signature: bytes, data: bytes) -> bool:
    """
    Verify Ed25519 signature.

    Args:
        signature: Signature bytes in SSH format
        data: Original data that was signed

    Returns:
        True if signature is valid, False otherwise
    """
    try:
        if self._key is None:
            return False

        # Get public key
        if isinstance(self._key, ed25519.Ed25519PrivateKey):
            public_key = self._key.public_key()
        else:
            public_key = self._key

        # Parse SSH signature format
        offset = 0
        algo_len = struct.unpack(">I", signature[offset : offset + 4])[0]
        offset += 4
        algorithm = signature[offset : offset + algo_len].decode()
        offset += algo_len

        if algorithm != "ssh-ed25519":
            return False

        sig_len = struct.unpack(">I", signature[offset : offset + 4])[0]
        offset += 4
        sig_bytes = signature[offset : offset + sig_len]

        # Verify signature
        public_key.verify(sig_bytes, data)
        return True
    except Exception:
        return False

PKey

Base class for SSH public keys.

Provides common interface for different public key types with support for loading, saving, signing, verification, and fingerprinting.

Source code in spindlex/crypto/pkey.py
class PKey:
    """
    Base class for SSH public keys.

    Provides common interface for different public key types with support
    for loading, saving, signing, verification, and fingerprinting.
    """

    def __init__(self, crypto_backend: Optional[CryptoBackend] = None) -> None:
        """
        Initialize public key.

        Args:
            crypto_backend: Cryptographic backend to use
        """
        self.crypto_backend = crypto_backend or default_crypto_backend
        self._key: Any = None
        self.allow_sha1 = False

    def get_ssh_type(self) -> str:
        """Get the SSH key type name (e.g., 'ssh-rsa', 'ssh-ed25519')."""
        return self.algorithm_name

    @property
    def algorithm_name(self) -> str:
        """Get SSH algorithm name for this key type."""
        raise NotImplementedError("Subclasses must implement algorithm_name")

    def get_name(self) -> str:
        """
        Get the SSH algorithm name for this key.

        Returns:
            SSH algorithm name (e.g., 'ssh-rsa')
        """
        return self.algorithm_name

    def load_private_key(
        self, key_data: bytes, password: Optional[bytes] = None
    ) -> None:
        """
        Load private key from bytes.

        Args:
            key_data: Private key data (PEM or OpenSSH format)
            password: Optional password for encrypted keys

        Raises:
            CryptoException: If key loading fails
        """
        raise NotImplementedError("Subclasses must implement load_private_key")

    def load_public_key(self, key_data: bytes) -> None:
        """
        Load public key from bytes.

        Args:
            key_data: Public key data (SSH wire format)

        Raises:
            CryptoException: If key loading fails
        """
        raise NotImplementedError("Subclasses must implement load_public_key")

    def get_public_key_bytes(self) -> bytes:
        """
        Get public key in SSH wire format.

        Returns:
            Public key bytes in SSH wire format

        Raises:
            CryptoException: If no key loaded
        """
        raise NotImplementedError("Subclasses must implement get_public_key_bytes")

    def sign(self, data: bytes) -> bytes:
        """
        Sign data with private key.

        Args:
            data: Data to sign

        Returns:
            Signature bytes in SSH format

        Raises:
            CryptoException: If signing fails or no private key
        """
        raise NotImplementedError("Subclasses must implement sign")

    def verify(self, signature: bytes, data: bytes) -> bool:
        """
        Verify signature with public key.

        Args:
            signature: Signature bytes in SSH format
            data: Original data that was signed

        Returns:
            True if signature is valid, False otherwise
        """
        raise NotImplementedError("Subclasses must implement verify")

    def get_fingerprint(self, hash_algorithm: str = "sha256") -> str:
        """
        Get key fingerprint.

        Args:
            hash_algorithm: Hash algorithm to use (md5, sha256)

        Returns:
            Formatted fingerprint string

        Raises:
            CryptoException: If fingerprint generation fails
        """
        try:
            key_bytes = self.get_public_key_bytes()

            if hash_algorithm == "md5":
                digest = hashlib.md5(key_bytes, usedforsecurity=False).digest()
                return "MD5:" + ":".join(f"{b:02x}" for b in digest)
            elif hash_algorithm == "sha256":
                digest = hashlib.sha256(key_bytes).digest()
                return "SHA256:" + base64.b64encode(digest).decode().rstrip("=")
            else:
                raise CryptoException(f"Unsupported hash algorithm: {hash_algorithm}")
        except Exception as e:
            raise CryptoException(f"Fingerprint generation failed: {e}") from e

    def __eq__(self, other: object) -> bool:
        """Compare keys for equality."""
        if not isinstance(other, PKey):
            return False
        try:
            return self.get_public_key_bytes() == other.get_public_key_bytes()
        except Exception:
            return False

    @classmethod
    def from_string(cls, data: bytes) -> "PKey":
        """
        Create PKey instance from SSH wire format bytes.

        Args:
            data: Public key data in SSH wire format

        Returns:
            Loaded PKey instance

        Raises:
            CryptoException: If key loading fails
        """
        try:
            # Parse algorithm name from SSH blob
            import struct

            offset = 0
            algo_len = struct.unpack(">I", data[offset : offset + 4])[0]
            offset += 4
            algorithm = data[offset : offset + algo_len].decode()

            # Determine key type and load
            key: PKey
            if algorithm == "ssh-ed25519":
                key = Ed25519Key()
            elif algorithm.startswith("ecdsa-sha2-"):
                key = ECDSAKey()
            elif algorithm in ["rsa-sha2-256", "rsa-sha2-512", "ssh-rsa"]:
                key = RSAKey()
            else:
                raise CryptoException(f"Unsupported key algorithm: {algorithm}")

            key.load_public_key(data)
            return key
        except Exception as e:
            if isinstance(e, CryptoException):
                raise
            raise CryptoException(f"Failed to load public key from bytes: {e}") from e

    @classmethod
    def generate(
        cls, key_type: str = "ed25519", bits: int = 2048, **kwargs: Any
    ) -> "PKey":
        """
        Generate a new key pair.

        Args:
            key_type: Type of key to generate ('ed25519', 'rsa', 'ecdsa')
            bits: Number of bits for RSA keys (legacy, use kwargs for others)
            **kwargs: Additional arguments for specific key types

        Returns:
            New PKey instance with generated key pair

        Raises:
            CryptoException: If generation fails or unsupported key type
        """
        if cls is not PKey:
            raise NotImplementedError("Subclasses must implement generate")

        key_type = key_type.lower()
        if key_type == "ed25519":
            return Ed25519Key.generate(**kwargs)
        elif key_type == "rsa":
            return RSAKey.generate(bits=bits, **kwargs)
        elif key_type == "ecdsa":
            return ECDSAKey.generate(bits=bits, **kwargs)
        else:
            raise CryptoException(f"Unsupported key type for generation: {key_type}")

    def save_to_file(self, filename: str, password: Optional[str] = None) -> None:
        """
        Save private key to file in PEM format.

        Args:
            filename: Path to save key file
            password: Optional password for encryption

        Raises:
            CryptoException: If saving fails
        """
        raise NotImplementedError("Subclasses must implement save_to_file")

    def get_openssh_string(self) -> str:
        """
        Get public key in OpenSSH format.

        Returns:
            Public key string in OpenSSH format (e.g., "ssh-rsa AAAAB3N...")
        """
        key_bytes = self.get_public_key_bytes()
        # Parse out the algorithm name from the bytes (first string)
        algo_len = struct.unpack(">I", key_bytes[:4])[0]
        algo_name = key_bytes[4 : 4 + algo_len].decode()

        key_base64 = base64.b64encode(key_bytes).decode()
        return f"{algo_name} {key_base64}"

    @classmethod
    def from_private_key_file(
        cls, filename: str, password: Optional[str] = None
    ) -> "PKey":
        """
        Load private key from file.

        Args:
            filename: Path to key file
            password: Optional password for encrypted keys

        Returns:
            Loaded PKey instance
        """
        return load_key_from_file(filename, password)

    def get_public_key(self) -> "PKey":
        """
        Get a PKey instance containing only the public key.

        Returns:
            PKey instance with only public key loaded
        """
        new_key = self.__class__(self.crypto_backend)
        new_key.load_public_key(self.get_public_key_bytes())
        return new_key
Attributes
algorithm_name property

Get SSH algorithm name for this key type.

Methods:
__eq__(other)

Compare keys for equality.

Source code in spindlex/crypto/pkey.py
def __eq__(self, other: object) -> bool:
    """Compare keys for equality."""
    if not isinstance(other, PKey):
        return False
    try:
        return self.get_public_key_bytes() == other.get_public_key_bytes()
    except Exception:
        return False
__init__(crypto_backend=None)

Initialize public key.

Parameters:

Name Type Description Default
crypto_backend Optional[CryptoBackend]

Cryptographic backend to use

None
Source code in spindlex/crypto/pkey.py
def __init__(self, crypto_backend: Optional[CryptoBackend] = None) -> None:
    """
    Initialize public key.

    Args:
        crypto_backend: Cryptographic backend to use
    """
    self.crypto_backend = crypto_backend or default_crypto_backend
    self._key: Any = None
    self.allow_sha1 = False
from_private_key_file(filename, password=None) classmethod

Load private key from file.

Parameters:

Name Type Description Default
filename str

Path to key file

required
password Optional[str]

Optional password for encrypted keys

None

Returns:

Type Description
PKey

Loaded PKey instance

Source code in spindlex/crypto/pkey.py
@classmethod
def from_private_key_file(
    cls, filename: str, password: Optional[str] = None
) -> "PKey":
    """
    Load private key from file.

    Args:
        filename: Path to key file
        password: Optional password for encrypted keys

    Returns:
        Loaded PKey instance
    """
    return load_key_from_file(filename, password)
from_string(data) classmethod

Create PKey instance from SSH wire format bytes.

Parameters:

Name Type Description Default
data bytes

Public key data in SSH wire format

required

Returns:

Type Description
PKey

Loaded PKey instance

Raises:

Type Description
CryptoException

If key loading fails

Source code in spindlex/crypto/pkey.py
@classmethod
def from_string(cls, data: bytes) -> "PKey":
    """
    Create PKey instance from SSH wire format bytes.

    Args:
        data: Public key data in SSH wire format

    Returns:
        Loaded PKey instance

    Raises:
        CryptoException: If key loading fails
    """
    try:
        # Parse algorithm name from SSH blob
        import struct

        offset = 0
        algo_len = struct.unpack(">I", data[offset : offset + 4])[0]
        offset += 4
        algorithm = data[offset : offset + algo_len].decode()

        # Determine key type and load
        key: PKey
        if algorithm == "ssh-ed25519":
            key = Ed25519Key()
        elif algorithm.startswith("ecdsa-sha2-"):
            key = ECDSAKey()
        elif algorithm in ["rsa-sha2-256", "rsa-sha2-512", "ssh-rsa"]:
            key = RSAKey()
        else:
            raise CryptoException(f"Unsupported key algorithm: {algorithm}")

        key.load_public_key(data)
        return key
    except Exception as e:
        if isinstance(e, CryptoException):
            raise
        raise CryptoException(f"Failed to load public key from bytes: {e}") from e
generate(key_type='ed25519', bits=2048, **kwargs) classmethod

Generate a new key pair.

Parameters:

Name Type Description Default
key_type str

Type of key to generate ('ed25519', 'rsa', 'ecdsa')

'ed25519'
bits int

Number of bits for RSA keys (legacy, use kwargs for others)

2048
**kwargs Any

Additional arguments for specific key types

{}

Returns:

Type Description
PKey

New PKey instance with generated key pair

Raises:

Type Description
CryptoException

If generation fails or unsupported key type

Source code in spindlex/crypto/pkey.py
@classmethod
def generate(
    cls, key_type: str = "ed25519", bits: int = 2048, **kwargs: Any
) -> "PKey":
    """
    Generate a new key pair.

    Args:
        key_type: Type of key to generate ('ed25519', 'rsa', 'ecdsa')
        bits: Number of bits for RSA keys (legacy, use kwargs for others)
        **kwargs: Additional arguments for specific key types

    Returns:
        New PKey instance with generated key pair

    Raises:
        CryptoException: If generation fails or unsupported key type
    """
    if cls is not PKey:
        raise NotImplementedError("Subclasses must implement generate")

    key_type = key_type.lower()
    if key_type == "ed25519":
        return Ed25519Key.generate(**kwargs)
    elif key_type == "rsa":
        return RSAKey.generate(bits=bits, **kwargs)
    elif key_type == "ecdsa":
        return ECDSAKey.generate(bits=bits, **kwargs)
    else:
        raise CryptoException(f"Unsupported key type for generation: {key_type}")
get_fingerprint(hash_algorithm='sha256')

Get key fingerprint.

Parameters:

Name Type Description Default
hash_algorithm str

Hash algorithm to use (md5, sha256)

'sha256'

Returns:

Type Description
str

Formatted fingerprint string

Raises:

Type Description
CryptoException

If fingerprint generation fails

Source code in spindlex/crypto/pkey.py
def get_fingerprint(self, hash_algorithm: str = "sha256") -> str:
    """
    Get key fingerprint.

    Args:
        hash_algorithm: Hash algorithm to use (md5, sha256)

    Returns:
        Formatted fingerprint string

    Raises:
        CryptoException: If fingerprint generation fails
    """
    try:
        key_bytes = self.get_public_key_bytes()

        if hash_algorithm == "md5":
            digest = hashlib.md5(key_bytes, usedforsecurity=False).digest()
            return "MD5:" + ":".join(f"{b:02x}" for b in digest)
        elif hash_algorithm == "sha256":
            digest = hashlib.sha256(key_bytes).digest()
            return "SHA256:" + base64.b64encode(digest).decode().rstrip("=")
        else:
            raise CryptoException(f"Unsupported hash algorithm: {hash_algorithm}")
    except Exception as e:
        raise CryptoException(f"Fingerprint generation failed: {e}") from e
get_name()

Get the SSH algorithm name for this key.

Returns:

Type Description
str

SSH algorithm name (e.g., 'ssh-rsa')

Source code in spindlex/crypto/pkey.py
def get_name(self) -> str:
    """
    Get the SSH algorithm name for this key.

    Returns:
        SSH algorithm name (e.g., 'ssh-rsa')
    """
    return self.algorithm_name
get_openssh_string()

Get public key in OpenSSH format.

Returns:

Type Description
str

Public key string in OpenSSH format (e.g., "ssh-rsa AAAAB3N...")

Source code in spindlex/crypto/pkey.py
def get_openssh_string(self) -> str:
    """
    Get public key in OpenSSH format.

    Returns:
        Public key string in OpenSSH format (e.g., "ssh-rsa AAAAB3N...")
    """
    key_bytes = self.get_public_key_bytes()
    # Parse out the algorithm name from the bytes (first string)
    algo_len = struct.unpack(">I", key_bytes[:4])[0]
    algo_name = key_bytes[4 : 4 + algo_len].decode()

    key_base64 = base64.b64encode(key_bytes).decode()
    return f"{algo_name} {key_base64}"
get_public_key()

Get a PKey instance containing only the public key.

Returns:

Type Description
PKey

PKey instance with only public key loaded

Source code in spindlex/crypto/pkey.py
def get_public_key(self) -> "PKey":
    """
    Get a PKey instance containing only the public key.

    Returns:
        PKey instance with only public key loaded
    """
    new_key = self.__class__(self.crypto_backend)
    new_key.load_public_key(self.get_public_key_bytes())
    return new_key
get_public_key_bytes()

Get public key in SSH wire format.

Returns:

Type Description
bytes

Public key bytes in SSH wire format

Raises:

Type Description
CryptoException

If no key loaded

Source code in spindlex/crypto/pkey.py
def get_public_key_bytes(self) -> bytes:
    """
    Get public key in SSH wire format.

    Returns:
        Public key bytes in SSH wire format

    Raises:
        CryptoException: If no key loaded
    """
    raise NotImplementedError("Subclasses must implement get_public_key_bytes")
get_ssh_type()

Get the SSH key type name (e.g., 'ssh-rsa', 'ssh-ed25519').

Source code in spindlex/crypto/pkey.py
def get_ssh_type(self) -> str:
    """Get the SSH key type name (e.g., 'ssh-rsa', 'ssh-ed25519')."""
    return self.algorithm_name
load_private_key(key_data, password=None)

Load private key from bytes.

Parameters:

Name Type Description Default
key_data bytes

Private key data (PEM or OpenSSH format)

required
password Optional[bytes]

Optional password for encrypted keys

None

Raises:

Type Description
CryptoException

If key loading fails

Source code in spindlex/crypto/pkey.py
def load_private_key(
    self, key_data: bytes, password: Optional[bytes] = None
) -> None:
    """
    Load private key from bytes.

    Args:
        key_data: Private key data (PEM or OpenSSH format)
        password: Optional password for encrypted keys

    Raises:
        CryptoException: If key loading fails
    """
    raise NotImplementedError("Subclasses must implement load_private_key")
load_public_key(key_data)

Load public key from bytes.

Parameters:

Name Type Description Default
key_data bytes

Public key data (SSH wire format)

required

Raises:

Type Description
CryptoException

If key loading fails

Source code in spindlex/crypto/pkey.py
def load_public_key(self, key_data: bytes) -> None:
    """
    Load public key from bytes.

    Args:
        key_data: Public key data (SSH wire format)

    Raises:
        CryptoException: If key loading fails
    """
    raise NotImplementedError("Subclasses must implement load_public_key")
save_to_file(filename, password=None)

Save private key to file in PEM format.

Parameters:

Name Type Description Default
filename str

Path to save key file

required
password Optional[str]

Optional password for encryption

None

Raises:

Type Description
CryptoException

If saving fails

Source code in spindlex/crypto/pkey.py
def save_to_file(self, filename: str, password: Optional[str] = None) -> None:
    """
    Save private key to file in PEM format.

    Args:
        filename: Path to save key file
        password: Optional password for encryption

    Raises:
        CryptoException: If saving fails
    """
    raise NotImplementedError("Subclasses must implement save_to_file")
sign(data)

Sign data with private key.

Parameters:

Name Type Description Default
data bytes

Data to sign

required

Returns:

Type Description
bytes

Signature bytes in SSH format

Raises:

Type Description
CryptoException

If signing fails or no private key

Source code in spindlex/crypto/pkey.py
def sign(self, data: bytes) -> bytes:
    """
    Sign data with private key.

    Args:
        data: Data to sign

    Returns:
        Signature bytes in SSH format

    Raises:
        CryptoException: If signing fails or no private key
    """
    raise NotImplementedError("Subclasses must implement sign")
verify(signature, data)

Verify signature with public key.

Parameters:

Name Type Description Default
signature bytes

Signature bytes in SSH format

required
data bytes

Original data that was signed

required

Returns:

Type Description
bool

True if signature is valid, False otherwise

Source code in spindlex/crypto/pkey.py
def verify(self, signature: bytes, data: bytes) -> bool:
    """
    Verify signature with public key.

    Args:
        signature: Signature bytes in SSH format
        data: Original data that was signed

    Returns:
        True if signature is valid, False otherwise
    """
    raise NotImplementedError("Subclasses must implement verify")

RSAKey

Bases: PKey

RSA SSH key implementation.

Implements rsa-sha2-256 key type using RSA with SHA-256.

Source code in spindlex/crypto/pkey.py
class RSAKey(PKey):
    """
    RSA SSH key implementation.

    Implements rsa-sha2-256 key type using RSA with SHA-256.
    """

    @property
    def algorithm_name(self) -> str:
        """Get SSH algorithm name."""
        return getattr(self, "_algorithm_name", "rsa-sha2-256")

    # Remove hardcoded get_ssh_type to use the one in PKey which returns algorithm_name

    def load_private_key(
        self, key_data: bytes, password: Optional[bytes] = None
    ) -> None:
        """
        Load RSA private key.

        Args:
            key_data: Private key data (PEM format)
            password: Optional password for encrypted keys

        Raises:
            CryptoException: If key loading fails
        """
        try:
            self._key = serialization.load_pem_private_key(key_data, password=password)
            if not isinstance(self._key, rsa.RSAPrivateKey):
                raise CryptoException("Key is not RSA private key")
        except Exception as e:
            raise CryptoException(f"Failed to load RSA private key: {e}") from e

    def load_public_key(self, key_data: bytes) -> None:
        """
        Load RSA public key from SSH wire format.

        Args:
            key_data: Public key data in SSH wire format

        Raises:
            CryptoException: If key loading fails
        """
        try:
            # Parse SSH wire format
            offset = 0

            # Read algorithm name
            algo_len = struct.unpack(">I", key_data[offset : offset + 4])[0]
            offset += 4
            algorithm = key_data[offset : offset + algo_len].decode()
            offset += algo_len

            if algorithm not in ["ssh-rsa", "rsa-sha2-256", "rsa-sha2-512"]:
                raise CryptoException(f"Expected RSA algorithm, got {algorithm}")

            # Store the algorithm name if it's one of the SHA-2 variants
            if algorithm in ["rsa-sha2-256", "rsa-sha2-512"]:
                self._algorithm_name = algorithm

            # Read public exponent
            e_len = struct.unpack(">I", key_data[offset : offset + 4])[0]
            offset += 4
            e_bytes = key_data[offset : offset + e_len]
            offset += e_len
            e = int.from_bytes(e_bytes, "big")

            # Read modulus
            n_len = struct.unpack(">I", key_data[offset : offset + 4])[0]
            offset += 4
            n_bytes = key_data[offset : offset + n_len]
            n = int.from_bytes(n_bytes, "big")

            # Create RSA public key
            public_numbers = rsa.RSAPublicNumbers(e, n)
            self._key = public_numbers.public_key()
        except Exception as e:
            raise CryptoException(f"Failed to load RSA public key: {e}") from e

    def get_public_key_bytes(self) -> bytes:
        """
        Get RSA public key in SSH wire format.

        Returns:
            Public key bytes in SSH wire format

        Raises:
            CryptoException: If no key loaded
        """
        try:
            if self._key is None:
                raise CryptoException("No key loaded")

            # Get public key
            if isinstance(self._key, rsa.RSAPrivateKey):
                public_key = self._key.public_key()
            else:
                public_key = self._key

            # Get public numbers
            numbers = public_key.public_numbers()

            # Format as SSH wire format: string <algorithm>, mpint e, mpint n
            algorithm = self.algorithm_name.encode()
            result = struct.pack(">I", len(algorithm)) + algorithm
            result += write_mpint(numbers.e)
            result += write_mpint(numbers.n)
            return result

        except Exception as e:
            raise CryptoException(f"Failed to get RSA public key bytes: {e}") from e

    def sign(self, data: bytes) -> bytes:
        """
        Sign data with RSA private key using SHA-256.

        Args:
            data: Data to sign

        Returns:
            Signature bytes in SSH format

        Raises:
            CryptoException: If signing fails or no private key
        """
        try:
            if not isinstance(self._key, rsa.RSAPrivateKey):
                raise CryptoException("No RSA private key loaded")

            # Select hash algorithm based on current algorithm name
            algo_name = self.algorithm_name
            hash_algo: Any
            if algo_name == "rsa-sha2-512":
                hash_algo = hashes.SHA512()
            elif algo_name == "rsa-sha2-256":
                hash_algo = hashes.SHA256()
            else:
                # Default to SHA-1 for legacy ssh-rsa
                if not self.allow_sha1:
                    raise CryptoException(
                        "RSA with SHA-1 (ssh-rsa) is disabled by default for security. "
                        "Enable 'allow_sha1=True' on the key instance if you must use it."
                    )
                warnings.warn(
                    "Signing with ssh-rsa (SHA-1) is deprecated and disabled in "
                    "modern OpenSSH. Prefer rsa-sha2-256 or rsa-sha2-512.",
                    DeprecationWarning,
                    stacklevel=2,
                )
                # Legacy ssh-rsa compatibility is gated by allow_sha1=True.
                hash_algo = _legacy_ssh_rsa_sha1_hash()

            # Sign data with PKCS1v15 padding
            signature = self._key.sign(data, padding.PKCS1v15(), hash_algo)

            # Format as SSH signature
            identifier = algo_name.encode()
            result = struct.pack(">I", len(identifier)) + identifier
            result += struct.pack(">I", len(signature)) + signature
            return result
        except Exception as e:
            raise CryptoException(f"RSA signing failed: {e}") from e

    @classmethod
    def generate(
        cls, key_type: str = "rsa", bits: int = 2048, *args: Any, **kwargs: Any
    ) -> "RSAKey":
        """Generate a new RSA key pair."""
        key = cls()
        key._key = rsa.generate_private_key(public_exponent=65537, key_size=bits)
        return key

    def save_to_file(self, filename: str, password: Optional[str] = None) -> None:
        """Save RSA private key to file."""
        try:
            if not isinstance(self._key, rsa.RSAPrivateKey):
                raise CryptoException("No RSA private key loaded")

            encryption_algorithm = (
                serialization.BestAvailableEncryption(password.encode())
                if password
                else serialization.NoEncryption()
            )

            pem = self._key.private_bytes(
                encoding=serialization.Encoding.PEM,
                format=serialization.PrivateFormat.OpenSSH,
                encryption_algorithm=encryption_algorithm,
            )

            with open(filename, "wb") as f:
                f.write(pem)
        except Exception as e:
            raise CryptoException(f"Failed to save RSA key: {e}") from e

    def verify(self, signature: bytes, data: bytes) -> bool:
        """
        Verify RSA signature.

        Args:
            signature: Signature bytes in SSH format
            data: Original data that was signed

        Returns:
            True if signature is valid, False otherwise
        """
        try:
            if self._key is None:
                return False

            # Get public key
            if isinstance(self._key, rsa.RSAPrivateKey):
                public_key = self._key.public_key()
            else:
                public_key = self._key

            # Parse SSH signature format
            offset = 0
            algo_len = struct.unpack(">I", signature[offset : offset + 4])[0]
            offset += 4
            algorithm = signature[offset : offset + algo_len].decode()
            offset += algo_len

            sig_len = struct.unpack(">I", signature[offset : offset + 4])[0]
            offset += 4
            sig_bytes = signature[offset : offset + sig_len]

            # Choose hash algorithm based on signature type
            hash_algo: Any
            if algorithm == "rsa-sha2-512":
                hash_algo = hashes.SHA512()
            elif algorithm == "rsa-sha2-256":
                hash_algo = hashes.SHA256()
            elif algorithm == "ssh-rsa":
                if not self.allow_sha1:
                    return False
                warnings.warn(
                    "Verifying an ssh-rsa (SHA-1) signature is deprecated and "
                    "disabled in modern OpenSSH. Prefer rsa-sha2-256 or rsa-sha2-512.",
                    DeprecationWarning,
                    stacklevel=2,
                )
                # Legacy ssh-rsa compatibility is gated by allow_sha1=True.
                hash_algo = _legacy_ssh_rsa_sha1_hash()
            else:
                return False

            # Verify signature
            public_key.verify(sig_bytes, data, padding.PKCS1v15(), hash_algo)
            return True
        except Exception:
            return False
Attributes
algorithm_name property

Get SSH algorithm name.

Methods:
generate(key_type='rsa', bits=2048, *args, **kwargs) classmethod

Generate a new RSA key pair.

Source code in spindlex/crypto/pkey.py
@classmethod
def generate(
    cls, key_type: str = "rsa", bits: int = 2048, *args: Any, **kwargs: Any
) -> "RSAKey":
    """Generate a new RSA key pair."""
    key = cls()
    key._key = rsa.generate_private_key(public_exponent=65537, key_size=bits)
    return key
get_public_key_bytes()

Get RSA public key in SSH wire format.

Returns:

Type Description
bytes

Public key bytes in SSH wire format

Raises:

Type Description
CryptoException

If no key loaded

Source code in spindlex/crypto/pkey.py
def get_public_key_bytes(self) -> bytes:
    """
    Get RSA public key in SSH wire format.

    Returns:
        Public key bytes in SSH wire format

    Raises:
        CryptoException: If no key loaded
    """
    try:
        if self._key is None:
            raise CryptoException("No key loaded")

        # Get public key
        if isinstance(self._key, rsa.RSAPrivateKey):
            public_key = self._key.public_key()
        else:
            public_key = self._key

        # Get public numbers
        numbers = public_key.public_numbers()

        # Format as SSH wire format: string <algorithm>, mpint e, mpint n
        algorithm = self.algorithm_name.encode()
        result = struct.pack(">I", len(algorithm)) + algorithm
        result += write_mpint(numbers.e)
        result += write_mpint(numbers.n)
        return result

    except Exception as e:
        raise CryptoException(f"Failed to get RSA public key bytes: {e}") from e
load_private_key(key_data, password=None)

Load RSA private key.

Parameters:

Name Type Description Default
key_data bytes

Private key data (PEM format)

required
password Optional[bytes]

Optional password for encrypted keys

None

Raises:

Type Description
CryptoException

If key loading fails

Source code in spindlex/crypto/pkey.py
def load_private_key(
    self, key_data: bytes, password: Optional[bytes] = None
) -> None:
    """
    Load RSA private key.

    Args:
        key_data: Private key data (PEM format)
        password: Optional password for encrypted keys

    Raises:
        CryptoException: If key loading fails
    """
    try:
        self._key = serialization.load_pem_private_key(key_data, password=password)
        if not isinstance(self._key, rsa.RSAPrivateKey):
            raise CryptoException("Key is not RSA private key")
    except Exception as e:
        raise CryptoException(f"Failed to load RSA private key: {e}") from e
load_public_key(key_data)

Load RSA public key from SSH wire format.

Parameters:

Name Type Description Default
key_data bytes

Public key data in SSH wire format

required

Raises:

Type Description
CryptoException

If key loading fails

Source code in spindlex/crypto/pkey.py
def load_public_key(self, key_data: bytes) -> None:
    """
    Load RSA public key from SSH wire format.

    Args:
        key_data: Public key data in SSH wire format

    Raises:
        CryptoException: If key loading fails
    """
    try:
        # Parse SSH wire format
        offset = 0

        # Read algorithm name
        algo_len = struct.unpack(">I", key_data[offset : offset + 4])[0]
        offset += 4
        algorithm = key_data[offset : offset + algo_len].decode()
        offset += algo_len

        if algorithm not in ["ssh-rsa", "rsa-sha2-256", "rsa-sha2-512"]:
            raise CryptoException(f"Expected RSA algorithm, got {algorithm}")

        # Store the algorithm name if it's one of the SHA-2 variants
        if algorithm in ["rsa-sha2-256", "rsa-sha2-512"]:
            self._algorithm_name = algorithm

        # Read public exponent
        e_len = struct.unpack(">I", key_data[offset : offset + 4])[0]
        offset += 4
        e_bytes = key_data[offset : offset + e_len]
        offset += e_len
        e = int.from_bytes(e_bytes, "big")

        # Read modulus
        n_len = struct.unpack(">I", key_data[offset : offset + 4])[0]
        offset += 4
        n_bytes = key_data[offset : offset + n_len]
        n = int.from_bytes(n_bytes, "big")

        # Create RSA public key
        public_numbers = rsa.RSAPublicNumbers(e, n)
        self._key = public_numbers.public_key()
    except Exception as e:
        raise CryptoException(f"Failed to load RSA public key: {e}") from e
save_to_file(filename, password=None)

Save RSA private key to file.

Source code in spindlex/crypto/pkey.py
def save_to_file(self, filename: str, password: Optional[str] = None) -> None:
    """Save RSA private key to file."""
    try:
        if not isinstance(self._key, rsa.RSAPrivateKey):
            raise CryptoException("No RSA private key loaded")

        encryption_algorithm = (
            serialization.BestAvailableEncryption(password.encode())
            if password
            else serialization.NoEncryption()
        )

        pem = self._key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.OpenSSH,
            encryption_algorithm=encryption_algorithm,
        )

        with open(filename, "wb") as f:
            f.write(pem)
    except Exception as e:
        raise CryptoException(f"Failed to save RSA key: {e}") from e
sign(data)

Sign data with RSA private key using SHA-256.

Parameters:

Name Type Description Default
data bytes

Data to sign

required

Returns:

Type Description
bytes

Signature bytes in SSH format

Raises:

Type Description
CryptoException

If signing fails or no private key

Source code in spindlex/crypto/pkey.py
def sign(self, data: bytes) -> bytes:
    """
    Sign data with RSA private key using SHA-256.

    Args:
        data: Data to sign

    Returns:
        Signature bytes in SSH format

    Raises:
        CryptoException: If signing fails or no private key
    """
    try:
        if not isinstance(self._key, rsa.RSAPrivateKey):
            raise CryptoException("No RSA private key loaded")

        # Select hash algorithm based on current algorithm name
        algo_name = self.algorithm_name
        hash_algo: Any
        if algo_name == "rsa-sha2-512":
            hash_algo = hashes.SHA512()
        elif algo_name == "rsa-sha2-256":
            hash_algo = hashes.SHA256()
        else:
            # Default to SHA-1 for legacy ssh-rsa
            if not self.allow_sha1:
                raise CryptoException(
                    "RSA with SHA-1 (ssh-rsa) is disabled by default for security. "
                    "Enable 'allow_sha1=True' on the key instance if you must use it."
                )
            warnings.warn(
                "Signing with ssh-rsa (SHA-1) is deprecated and disabled in "
                "modern OpenSSH. Prefer rsa-sha2-256 or rsa-sha2-512.",
                DeprecationWarning,
                stacklevel=2,
            )
            # Legacy ssh-rsa compatibility is gated by allow_sha1=True.
            hash_algo = _legacy_ssh_rsa_sha1_hash()

        # Sign data with PKCS1v15 padding
        signature = self._key.sign(data, padding.PKCS1v15(), hash_algo)

        # Format as SSH signature
        identifier = algo_name.encode()
        result = struct.pack(">I", len(identifier)) + identifier
        result += struct.pack(">I", len(signature)) + signature
        return result
    except Exception as e:
        raise CryptoException(f"RSA signing failed: {e}") from e
verify(signature, data)

Verify RSA signature.

Parameters:

Name Type Description Default
signature bytes

Signature bytes in SSH format

required
data bytes

Original data that was signed

required

Returns:

Type Description
bool

True if signature is valid, False otherwise

Source code in spindlex/crypto/pkey.py
def verify(self, signature: bytes, data: bytes) -> bool:
    """
    Verify RSA signature.

    Args:
        signature: Signature bytes in SSH format
        data: Original data that was signed

    Returns:
        True if signature is valid, False otherwise
    """
    try:
        if self._key is None:
            return False

        # Get public key
        if isinstance(self._key, rsa.RSAPrivateKey):
            public_key = self._key.public_key()
        else:
            public_key = self._key

        # Parse SSH signature format
        offset = 0
        algo_len = struct.unpack(">I", signature[offset : offset + 4])[0]
        offset += 4
        algorithm = signature[offset : offset + algo_len].decode()
        offset += algo_len

        sig_len = struct.unpack(">I", signature[offset : offset + 4])[0]
        offset += 4
        sig_bytes = signature[offset : offset + sig_len]

        # Choose hash algorithm based on signature type
        hash_algo: Any
        if algorithm == "rsa-sha2-512":
            hash_algo = hashes.SHA512()
        elif algorithm == "rsa-sha2-256":
            hash_algo = hashes.SHA256()
        elif algorithm == "ssh-rsa":
            if not self.allow_sha1:
                return False
            warnings.warn(
                "Verifying an ssh-rsa (SHA-1) signature is deprecated and "
                "disabled in modern OpenSSH. Prefer rsa-sha2-256 or rsa-sha2-512.",
                DeprecationWarning,
                stacklevel=2,
            )
            # Legacy ssh-rsa compatibility is gated by allow_sha1=True.
            hash_algo = _legacy_ssh_rsa_sha1_hash()
        else:
            return False

        # Verify signature
        public_key.verify(sig_bytes, data, padding.PKCS1v15(), hash_algo)
        return True
    except Exception:
        return False

Functions:

get_crypto_backend()

Get the default cryptographic backend.

Returns:

Type Description
CryptoBackend

Default CryptoBackend instance

Source code in spindlex/crypto/backend.py
def get_crypto_backend() -> CryptoBackend:
    """
    Get the default cryptographic backend.

    Returns:
        Default CryptoBackend instance
    """
    return default_crypto_backend

load_key_from_file(filename, password=None)

Load SSH key from file.

Parameters:

Name Type Description Default
filename str

Path to key file

required
password Optional[str]

Optional password for encrypted keys

None

Returns:

Type Description
PKey

Loaded PKey instance

Raises:

Type Description
CryptoException

If key loading fails

Source code in spindlex/crypto/pkey.py
def load_key_from_file(filename: str, password: Optional[str] = None) -> PKey:
    """
    Load SSH key from file.

    Args:
        filename: Path to key file
        password: Optional password for encrypted keys

    Returns:
        Loaded PKey instance

    Raises:
        CryptoException: If key loading fails
    """
    try:
        with open(filename, "rb") as f:
            key_data = f.read()

        password_bytes = password.encode() if password else None
        errors = []

        # Try different key types
        for key_class in [Ed25519Key, ECDSAKey, RSAKey]:
            try:
                key: PKey = key_class()
                key.load_private_key(key_data, password_bytes)
                return key
            except Exception as e:
                errors.append(f"{key_class.__name__}: {e}")
                continue

        error_details = "; ".join(errors)
        raise CryptoException(
            f"Unable to load key - unsupported format or type. Details: {error_details}"
        )
    except Exception as e:
        if isinstance(e, CryptoException):
            raise
        raise CryptoException(f"Failed to load key from file: {e}") from e

load_public_key_from_string(key_string)

Load SSH public key from string (OpenSSH format).

Parameters:

Name Type Description Default
key_string str

Public key string in OpenSSH format

required

Returns:

Type Description
PKey

Loaded PKey instance

Raises:

Type Description
CryptoException

If key loading fails

Source code in spindlex/crypto/pkey.py
def load_public_key_from_string(key_string: str) -> PKey:
    """
    Load SSH public key from string (OpenSSH format).

    Args:
        key_string: Public key string in OpenSSH format

    Returns:
        Loaded PKey instance

    Raises:
        CryptoException: If key loading fails
    """
    try:
        # Parse OpenSSH public key format: "algorithm base64_key comment"
        parts = key_string.strip().split()
        if len(parts) < 2:
            raise CryptoException("Invalid public key format")

        algorithm = parts[0]
        key_data = base64.b64decode(parts[1])

        # Determine key type and load
        key: PKey
        if algorithm == "ssh-ed25519":
            key = Ed25519Key()
        elif algorithm == "ecdsa-sha2-nistp256":
            key = ECDSAKey()
        elif algorithm in ["rsa-sha2-256", "rsa-sha2-512", "ssh-rsa"]:
            key = RSAKey()
        else:
            raise CryptoException(f"Unsupported key algorithm: {algorithm}")

        key.load_public_key(key_data)
        return key
    except Exception as e:
        raise CryptoException(f"Failed to load public key from string: {e}") from e