Skip to content

Host Keys API

spindlex.hostkeys

SSH Host Key Module

Provides host key management including storage, verification policies, and key fingerprinting for secure host authentication.

Classes

AutoAddPolicy

Bases: MissingHostKeyPolicy

Automatically add unknown host keys.

WARNING: This policy is insecure and should only be used in trusted environments or for testing purposes. It trusts every first-seen host key and disables MITM protection.

Source code in spindlex/hostkeys/policy.py
class AutoAddPolicy(MissingHostKeyPolicy):
    """
    Automatically add unknown host keys.

    WARNING: This policy is insecure and should only be used
    in trusted environments or for testing purposes. It trusts
    every first-seen host key and disables MITM protection.
    """

    def __init__(self, accept_risk: bool = False) -> None:
        """
        Initialize auto-add policy.

        Args:
            accept_risk: Must be True to acknowledge security risks
        """
        self._logger = logging.getLogger(__name__)
        if not accept_risk:
            import warnings

            self._logger.warning(
                "AutoAddPolicy is insecure and disables MITM protection. "
                "Pass accept_risk=True to silence this warning."
            )
            warnings.warn(
                "AutoAddPolicy is insecure and disables MITM protection. "
                "In future versions, accept_risk=True will be mandatory.",
                UserWarning,
                stacklevel=2,
            )
        self._accept_risk = accept_risk

    def missing_host_key(self, client: Any, hostname: str, key: Any) -> None:
        """
        Automatically accept and store unknown host key.

        Args:
            client: SSH client instance
            hostname: Server hostname
            key: Server's host key
        """
        try:
            # Get host key storage from client
            storage = getattr(client, "_host_key_storage", None)
            if storage:
                storage.add(hostname, key)
                storage.save()
            else:
                self._logger.debug("No host key storage available on client")

            self._logger.warning(
                f"Automatically added host key for {hostname}: {key.algorithm_name} "
                f"{key.get_fingerprint()}"
            )
        except Exception as e:
            self._logger.error(f"Failed to add/save host key for {hostname}: {e}")
            from ..exceptions import SSHException

            raise SSHException(
                f"Failed to persist new host key for {hostname}: {e}"
            ) from e
Methods:
__init__(accept_risk=False)

Initialize auto-add policy.

Parameters:

Name Type Description Default
accept_risk bool

Must be True to acknowledge security risks

False
Source code in spindlex/hostkeys/policy.py
def __init__(self, accept_risk: bool = False) -> None:
    """
    Initialize auto-add policy.

    Args:
        accept_risk: Must be True to acknowledge security risks
    """
    self._logger = logging.getLogger(__name__)
    if not accept_risk:
        import warnings

        self._logger.warning(
            "AutoAddPolicy is insecure and disables MITM protection. "
            "Pass accept_risk=True to silence this warning."
        )
        warnings.warn(
            "AutoAddPolicy is insecure and disables MITM protection. "
            "In future versions, accept_risk=True will be mandatory.",
            UserWarning,
            stacklevel=2,
        )
    self._accept_risk = accept_risk
missing_host_key(client, hostname, key)

Automatically accept and store unknown host key.

Parameters:

Name Type Description Default
client Any

SSH client instance

required
hostname str

Server hostname

required
key Any

Server's host key

required
Source code in spindlex/hostkeys/policy.py
def missing_host_key(self, client: Any, hostname: str, key: Any) -> None:
    """
    Automatically accept and store unknown host key.

    Args:
        client: SSH client instance
        hostname: Server hostname
        key: Server's host key
    """
    try:
        # Get host key storage from client
        storage = getattr(client, "_host_key_storage", None)
        if storage:
            storage.add(hostname, key)
            storage.save()
        else:
            self._logger.debug("No host key storage available on client")

        self._logger.warning(
            f"Automatically added host key for {hostname}: {key.algorithm_name} "
            f"{key.get_fingerprint()}"
        )
    except Exception as e:
        self._logger.error(f"Failed to add/save host key for {hostname}: {e}")
        from ..exceptions import SSHException

        raise SSHException(
            f"Failed to persist new host key for {hostname}: {e}"
        ) from e

HostKeyStorage

Host key storage implementation.

Manages storage and retrieval of known host keys for host verification and security policy enforcement.

Source code in spindlex/hostkeys/storage.py
class HostKeyStorage:
    """
    Host key storage implementation.

    Manages storage and retrieval of known host keys for
    host verification and security policy enforcement.
    """

    def __init__(self, filename: Optional[str] = None) -> None:
        """
        Initialize host key storage.

        Args:
            filename: Path to known_hosts file (optional)
        """
        self._filename = filename or os.path.expanduser("~/.ssh/known_hosts")
        self._keys: dict[str, list[PKey]] = {}
        self._logger = logging.getLogger(__name__)

        # Try to load existing keys
        try:
            self.load()
        except FileNotFoundError:
            pass  # normal - known_hosts file not yet created
        except Exception as e:
            self._logger.warning(f"Could not load host keys from {self._filename}: {e}")

    def load(self, filename: Optional[str] = None) -> None:
        """
        Load host keys from storage file.

        Args:
            filename: Path to file (defaults to storage's filename)

        Raises:
            SSHException: If loading fails
        """
        target_file = filename or self._filename
        if not os.path.exists(target_file):
            self._logger.debug(f"Host key file {target_file} does not exist")
            return

        try:
            with open(target_file, encoding="utf-8") as f:
                for line_num, line in enumerate(f, 1):
                    line = line.strip()

                    # Skip empty lines and comments
                    if not line or line.startswith("#"):
                        continue

                    try:
                        self._parse_host_key_line(line)
                    except Exception as e:
                        self._logger.warning(
                            f"Error parsing line {line_num} in {target_file}: {e}"
                        )

        except Exception as e:
            raise SSHException(
                f"Failed to load host keys from {target_file}: {e}"
            ) from e

    def _parse_host_key_line(self, line: str) -> None:
        """
        Parse a single host key line from known_hosts format.

        Args:
            line: Line to parse
        """
        parts = line.split()
        if len(parts) < 3:
            return  # Invalid line format

        hostnames_part = parts[0]
        key_type = parts[1]
        key_data = parts[2]

        # Parse hostnames (can be comma-separated)
        hostnames = [h.strip() for h in hostnames_part.split(",")]

        try:
            # Decode base64 key data
            key_bytes = base64.b64decode(key_data)

            # Create appropriate key object based on type
            key = self._create_key_from_type_and_data(key_type, key_bytes)

            if key:
                # Add key for each hostname
                for hostname in hostnames:
                    if hostname not in self._keys:
                        self._keys[hostname] = []
                    self._keys[hostname].append(key)

        except Exception as e:
            self._logger.debug(f"Failed to parse key data: {e}")

    def _create_key_from_type_and_data(
        self, key_type: str, key_data: bytes
    ) -> Optional[PKey]:
        """
        Create PKey instance from key type and data.

        Args:
            key_type: SSH key type string
            key_data: Key data bytes

        Returns:
            PKey instance or None if unsupported
        """
        try:
            # Import key classes
            from ..crypto.pkey import ECDSAKey, Ed25519Key, RSAKey

            pkey: PKey
            if key_type == "ssh-ed25519":
                pkey = Ed25519Key()
                pkey.load_public_key(key_data)
                return pkey
            elif key_type.startswith("ecdsa-sha2-"):
                pkey = ECDSAKey()
                pkey.load_public_key(key_data)
                return pkey
            elif key_type.startswith("ssh-rsa") or key_type.startswith("rsa-sha2-"):
                pkey = RSAKey()
                pkey.load_public_key(key_data)
                return pkey
            else:
                self._logger.debug(f"Unsupported key type: {key_type}")
                return None

        except Exception as e:
            self._logger.debug(f"Failed to create key from type {key_type}: {e}")
            return None

    def save(self) -> None:
        """
        Save host keys to storage file.

        Raises:
            SSHException: If saving fails
        """
        temp_filename = self._filename + ".tmp"
        try:
            # Create directory if it doesn't exist
            dirname = os.path.dirname(self._filename)
            if dirname:
                os.makedirs(dirname, exist_ok=True)

            with open(temp_filename, "w", encoding="utf-8") as f:
                f.write("# SSH known hosts file\n")
                f.write("# Generated by spindlex\n\n")

                for hostname, keys in self._keys.items():
                    for key in keys:
                        try:
                            key_data = base64.b64encode(
                                key.get_public_key_bytes()
                            ).decode("ascii")
                            f.write(f"{hostname} {key.algorithm_name} {key_data}\n")
                        except Exception as e:
                            self._logger.warning(
                                f"Failed to save key for {hostname}: {e}"
                            )

            # Atomic replace
            os.replace(temp_filename, self._filename)

        except Exception as e:
            if os.path.exists(temp_filename):
                try:
                    os.remove(temp_filename)
                except OSError:
                    pass  # Ignore errors if file already gone or inaccessible
            raise SSHException(
                f"Failed to save host keys to {self._filename}: {e}"
            ) from e

    def add(self, hostname: str, key: PKey) -> None:
        """
        Add host key to storage.

        Args:
            hostname: Server hostname
            key: Host key to store
        """
        if hostname not in self._keys:
            self._keys[hostname] = []

        # Check if key already exists
        for existing_key in self._keys[hostname]:
            if existing_key == key:
                return  # Key already exists

        # Add new key
        self._keys[hostname].append(key)
        self._logger.debug(f"Added host key for {hostname}: {key.algorithm_name}")

    def get(self, hostname: str, key_type: Optional[str] = None) -> Optional[PKey]:
        """
        Get host key for hostname.

        Args:
            hostname: Server hostname
            key_type: Optional algorithm name to filter by

        Returns:
            Host key if found, None otherwise
        """
        if hostname in self._keys and self._keys[hostname]:
            if key_type:
                # Find matching key type
                for key in self._keys[hostname]:
                    if key.algorithm_name == key_type:
                        return key
                return None
            # Return the first key (most recent or preferred)
            return self._keys[hostname][0]
        return None

    def get_all(self, hostname: str) -> list[PKey]:
        """
        Get all host keys for hostname.

        Args:
            hostname: Server hostname

        Returns:
            List of host keys
        """
        return self._keys.get(hostname, [])

    def copy_from(self, other: "HostKeyStorage") -> None:
        """
        Merge all keys from another storage instance into this one.

        Args:
            other: Source storage whose keys are merged into self
        """
        for hostname, keys in other._keys.items():
            if hostname not in self._keys:
                self._keys[hostname] = []
            for key in keys:
                if key not in self._keys[hostname]:
                    self._keys[hostname].append(key)

    def remove(self, hostname: str, key: Optional[PKey] = None) -> bool:
        """
        Remove host key(s) for hostname.

        Args:
            hostname: Server hostname
            key: Specific key to remove (if None, removes all keys for hostname)

        Returns:
            True if any keys were removed
        """
        if hostname not in self._keys:
            return False

        if key is None:
            # Remove all keys for hostname
            del self._keys[hostname]
            return True
        else:
            # Remove specific key
            try:
                self._keys[hostname].remove(key)
                if not self._keys[hostname]:
                    del self._keys[hostname]
                return True
            except ValueError:
                return False
Methods:
__init__(filename=None)

Initialize host key storage.

Parameters:

Name Type Description Default
filename Optional[str]

Path to known_hosts file (optional)

None
Source code in spindlex/hostkeys/storage.py
def __init__(self, filename: Optional[str] = None) -> None:
    """
    Initialize host key storage.

    Args:
        filename: Path to known_hosts file (optional)
    """
    self._filename = filename or os.path.expanduser("~/.ssh/known_hosts")
    self._keys: dict[str, list[PKey]] = {}
    self._logger = logging.getLogger(__name__)

    # Try to load existing keys
    try:
        self.load()
    except FileNotFoundError:
        pass  # normal - known_hosts file not yet created
    except Exception as e:
        self._logger.warning(f"Could not load host keys from {self._filename}: {e}")
add(hostname, key)

Add host key to storage.

Parameters:

Name Type Description Default
hostname str

Server hostname

required
key PKey

Host key to store

required
Source code in spindlex/hostkeys/storage.py
def add(self, hostname: str, key: PKey) -> None:
    """
    Add host key to storage.

    Args:
        hostname: Server hostname
        key: Host key to store
    """
    if hostname not in self._keys:
        self._keys[hostname] = []

    # Check if key already exists
    for existing_key in self._keys[hostname]:
        if existing_key == key:
            return  # Key already exists

    # Add new key
    self._keys[hostname].append(key)
    self._logger.debug(f"Added host key for {hostname}: {key.algorithm_name}")
copy_from(other)

Merge all keys from another storage instance into this one.

Parameters:

Name Type Description Default
other HostKeyStorage

Source storage whose keys are merged into self

required
Source code in spindlex/hostkeys/storage.py
def copy_from(self, other: "HostKeyStorage") -> None:
    """
    Merge all keys from another storage instance into this one.

    Args:
        other: Source storage whose keys are merged into self
    """
    for hostname, keys in other._keys.items():
        if hostname not in self._keys:
            self._keys[hostname] = []
        for key in keys:
            if key not in self._keys[hostname]:
                self._keys[hostname].append(key)
get(hostname, key_type=None)

Get host key for hostname.

Parameters:

Name Type Description Default
hostname str

Server hostname

required
key_type Optional[str]

Optional algorithm name to filter by

None

Returns:

Type Description
Optional[PKey]

Host key if found, None otherwise

Source code in spindlex/hostkeys/storage.py
def get(self, hostname: str, key_type: Optional[str] = None) -> Optional[PKey]:
    """
    Get host key for hostname.

    Args:
        hostname: Server hostname
        key_type: Optional algorithm name to filter by

    Returns:
        Host key if found, None otherwise
    """
    if hostname in self._keys and self._keys[hostname]:
        if key_type:
            # Find matching key type
            for key in self._keys[hostname]:
                if key.algorithm_name == key_type:
                    return key
            return None
        # Return the first key (most recent or preferred)
        return self._keys[hostname][0]
    return None
get_all(hostname)

Get all host keys for hostname.

Parameters:

Name Type Description Default
hostname str

Server hostname

required

Returns:

Type Description
list[PKey]

List of host keys

Source code in spindlex/hostkeys/storage.py
def get_all(self, hostname: str) -> list[PKey]:
    """
    Get all host keys for hostname.

    Args:
        hostname: Server hostname

    Returns:
        List of host keys
    """
    return self._keys.get(hostname, [])
load(filename=None)

Load host keys from storage file.

Parameters:

Name Type Description Default
filename Optional[str]

Path to file (defaults to storage's filename)

None

Raises:

Type Description
SSHException

If loading fails

Source code in spindlex/hostkeys/storage.py
def load(self, filename: Optional[str] = None) -> None:
    """
    Load host keys from storage file.

    Args:
        filename: Path to file (defaults to storage's filename)

    Raises:
        SSHException: If loading fails
    """
    target_file = filename or self._filename
    if not os.path.exists(target_file):
        self._logger.debug(f"Host key file {target_file} does not exist")
        return

    try:
        with open(target_file, encoding="utf-8") as f:
            for line_num, line in enumerate(f, 1):
                line = line.strip()

                # Skip empty lines and comments
                if not line or line.startswith("#"):
                    continue

                try:
                    self._parse_host_key_line(line)
                except Exception as e:
                    self._logger.warning(
                        f"Error parsing line {line_num} in {target_file}: {e}"
                    )

    except Exception as e:
        raise SSHException(
            f"Failed to load host keys from {target_file}: {e}"
        ) from e
remove(hostname, key=None)

Remove host key(s) for hostname.

Parameters:

Name Type Description Default
hostname str

Server hostname

required
key Optional[PKey]

Specific key to remove (if None, removes all keys for hostname)

None

Returns:

Type Description
bool

True if any keys were removed

Source code in spindlex/hostkeys/storage.py
def remove(self, hostname: str, key: Optional[PKey] = None) -> bool:
    """
    Remove host key(s) for hostname.

    Args:
        hostname: Server hostname
        key: Specific key to remove (if None, removes all keys for hostname)

    Returns:
        True if any keys were removed
    """
    if hostname not in self._keys:
        return False

    if key is None:
        # Remove all keys for hostname
        del self._keys[hostname]
        return True
    else:
        # Remove specific key
        try:
            self._keys[hostname].remove(key)
            if not self._keys[hostname]:
                del self._keys[hostname]
            return True
        except ValueError:
            return False
save()

Save host keys to storage file.

Raises:

Type Description
SSHException

If saving fails

Source code in spindlex/hostkeys/storage.py
def save(self) -> None:
    """
    Save host keys to storage file.

    Raises:
        SSHException: If saving fails
    """
    temp_filename = self._filename + ".tmp"
    try:
        # Create directory if it doesn't exist
        dirname = os.path.dirname(self._filename)
        if dirname:
            os.makedirs(dirname, exist_ok=True)

        with open(temp_filename, "w", encoding="utf-8") as f:
            f.write("# SSH known hosts file\n")
            f.write("# Generated by spindlex\n\n")

            for hostname, keys in self._keys.items():
                for key in keys:
                    try:
                        key_data = base64.b64encode(
                            key.get_public_key_bytes()
                        ).decode("ascii")
                        f.write(f"{hostname} {key.algorithm_name} {key_data}\n")
                    except Exception as e:
                        self._logger.warning(
                            f"Failed to save key for {hostname}: {e}"
                        )

        # Atomic replace
        os.replace(temp_filename, self._filename)

    except Exception as e:
        if os.path.exists(temp_filename):
            try:
                os.remove(temp_filename)
            except OSError:
                pass  # Ignore errors if file already gone or inaccessible
        raise SSHException(
            f"Failed to save host keys to {self._filename}: {e}"
        ) from e

MissingHostKeyPolicy

Bases: ABC

Abstract base class for host key policies.

Defines the interface for handling unknown host keys during SSH connection establishment.

Source code in spindlex/hostkeys/policy.py
class MissingHostKeyPolicy(ABC):
    """
    Abstract base class for host key policies.

    Defines the interface for handling unknown host keys during
    SSH connection establishment.
    """

    @abstractmethod
    def missing_host_key(self, client: Any, hostname: str, key: Any) -> None:
        """
        Handle unknown host key.

        Args:
            client: SSH client instance
            hostname: Server hostname
            key: Server's host key

        Raises:
            BadHostKeyException: If host key should be rejected
        """
        pass
Methods:
missing_host_key(client, hostname, key) abstractmethod

Handle unknown host key.

Parameters:

Name Type Description Default
client Any

SSH client instance

required
hostname str

Server hostname

required
key Any

Server's host key

required

Raises:

Type Description
BadHostKeyException

If host key should be rejected

Source code in spindlex/hostkeys/policy.py
@abstractmethod
def missing_host_key(self, client: Any, hostname: str, key: Any) -> None:
    """
    Handle unknown host key.

    Args:
        client: SSH client instance
        hostname: Server hostname
        key: Server's host key

    Raises:
        BadHostKeyException: If host key should be rejected
    """
    pass

RejectPolicy

Bases: MissingHostKeyPolicy

Reject all unknown host keys.

This is the secure default policy that rejects any unknown host keys to prevent man-in-the-middle attacks.

Source code in spindlex/hostkeys/policy.py
class RejectPolicy(MissingHostKeyPolicy):
    """
    Reject all unknown host keys.

    This is the secure default policy that rejects any unknown
    host keys to prevent man-in-the-middle attacks.
    """

    def missing_host_key(self, client: Any, hostname: str, key: Any) -> None:
        """
        Reject unknown host key.

        Args:
            client: SSH client instance
            hostname: Server hostname
            key: Server's host key

        Raises:
            BadHostKeyException: Always raised for unknown keys
        """
        raise BadHostKeyException(hostname, key)
Methods:
missing_host_key(client, hostname, key)

Reject unknown host key.

Parameters:

Name Type Description Default
client Any

SSH client instance

required
hostname str

Server hostname

required
key Any

Server's host key

required

Raises:

Type Description
BadHostKeyException

Always raised for unknown keys

Source code in spindlex/hostkeys/policy.py
def missing_host_key(self, client: Any, hostname: str, key: Any) -> None:
    """
    Reject unknown host key.

    Args:
        client: SSH client instance
        hostname: Server hostname
        key: Server's host key

    Raises:
        BadHostKeyException: Always raised for unknown keys
    """
    raise BadHostKeyException(hostname, key)

WarningPolicy

Bases: MissingHostKeyPolicy

Warn and persist unknown host keys (Trust On First Use).

Logs a warning about unknown host keys and persists them to storage so subsequent connections are verified. This is TOFU behavior: the first connection is trusted unconditionally, with no MITM protection for that initial handshake. Use with caution in untrusted networks.

Source code in spindlex/hostkeys/policy.py
class WarningPolicy(MissingHostKeyPolicy):
    """
    Warn and persist unknown host keys (Trust On First Use).

    Logs a warning about unknown host keys and persists them to storage
    so subsequent connections are verified. This is TOFU behavior: the
    first connection is trusted unconditionally, with no MITM protection
    for that initial handshake. Use with caution in untrusted networks.
    """

    def __init__(self) -> None:
        """Initialize warning policy with logger."""
        self._logger = logging.getLogger(__name__)

    def missing_host_key(self, client: Any, hostname: str, key: Any) -> None:
        """
        Log warning and accept unknown host key (TOFU - stores on first use).

        Args:
            client: SSH client instance
            hostname: Server hostname
            key: Server's host key
        """
        self._logger.warning(
            f"Unknown host key for {hostname}: {key.algorithm_name} {key.get_fingerprint()}"
        )
        storage = getattr(client, "_host_key_storage", None)
        if storage:
            storage.add(hostname, key)
            storage.save()
Methods:
__init__()

Initialize warning policy with logger.

Source code in spindlex/hostkeys/policy.py
def __init__(self) -> None:
    """Initialize warning policy with logger."""
    self._logger = logging.getLogger(__name__)
missing_host_key(client, hostname, key)

Log warning and accept unknown host key (TOFU - stores on first use).

Parameters:

Name Type Description Default
client Any

SSH client instance

required
hostname str

Server hostname

required
key Any

Server's host key

required
Source code in spindlex/hostkeys/policy.py
def missing_host_key(self, client: Any, hostname: str, key: Any) -> None:
    """
    Log warning and accept unknown host key (TOFU - stores on first use).

    Args:
        client: SSH client instance
        hostname: Server hostname
        key: Server's host key
    """
    self._logger.warning(
        f"Unknown host key for {hostname}: {key.algorithm_name} {key.get_fingerprint()}"
    )
    storage = getattr(client, "_host_key_storage", None)
    if storage:
        storage.add(hostname, key)
        storage.save()