Skip to content

Protocol API

SpindleX's protocol layer implements SSHv2 and SFTPv3 message structures and constants.

SSH Messages

spindlex.protocol.messages

SSH Protocol Message Implementation

Implements SSH protocol message parsing, serialization, and validation according to RFC 4251-4254 specifications.

Classes

ChannelCloseMessage

Bases: Message

SSH channel close message (MSG_CHANNEL_CLOSE).

Source code in spindlex/protocol/messages.py
class ChannelCloseMessage(Message):
    """SSH channel close message (MSG_CHANNEL_CLOSE)."""

    def __init__(self, recipient_channel: int) -> None:
        """
        Initialize channel close message.

        Args:
            recipient_channel: Recipient's channel number
        """
        super().__init__(MSG_CHANNEL_CLOSE)
        self.recipient_channel = recipient_channel

        # Build message data
        self.add_uint32(recipient_channel)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "ChannelCloseMessage":
        """Unpack channel close message data."""
        recipient_channel, _ = read_uint32(data, 0)
        return cls(recipient_channel)
Methods:
__init__(recipient_channel)

Initialize channel close message.

Parameters:

Name Type Description Default
recipient_channel int

Recipient's channel number

required
Source code in spindlex/protocol/messages.py
def __init__(self, recipient_channel: int) -> None:
    """
    Initialize channel close message.

    Args:
        recipient_channel: Recipient's channel number
    """
    super().__init__(MSG_CHANNEL_CLOSE)
    self.recipient_channel = recipient_channel

    # Build message data
    self.add_uint32(recipient_channel)

ChannelDataMessage

Bases: Message

SSH channel data message (MSG_CHANNEL_DATA).

Source code in spindlex/protocol/messages.py
class ChannelDataMessage(Message):
    """SSH channel data message (MSG_CHANNEL_DATA)."""

    def __init__(self, recipient_channel: int, data: bytes) -> None:
        """
        Initialize channel data message.

        Args:
            recipient_channel: Recipient's channel number
            data: Data to send
        """
        super().__init__(MSG_CHANNEL_DATA)
        self.recipient_channel = recipient_channel
        self.data = data

        # Build message data
        self.add_uint32(recipient_channel)
        self.add_string(data)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "ChannelDataMessage":
        """Unpack channel data message data."""
        offset = 0
        recipient_channel, offset = read_uint32(data, offset)
        message_data, offset = read_string(data, offset)

        return cls(recipient_channel, message_data)
Methods:
__init__(recipient_channel, data)

Initialize channel data message.

Parameters:

Name Type Description Default
recipient_channel int

Recipient's channel number

required
data bytes

Data to send

required
Source code in spindlex/protocol/messages.py
def __init__(self, recipient_channel: int, data: bytes) -> None:
    """
    Initialize channel data message.

    Args:
        recipient_channel: Recipient's channel number
        data: Data to send
    """
    super().__init__(MSG_CHANNEL_DATA)
    self.recipient_channel = recipient_channel
    self.data = data

    # Build message data
    self.add_uint32(recipient_channel)
    self.add_string(data)

ChannelEOFMessage

Bases: Message

SSH channel EOF message (MSG_CHANNEL_EOF).

Source code in spindlex/protocol/messages.py
class ChannelEOFMessage(Message):
    """SSH channel EOF message (MSG_CHANNEL_EOF)."""

    def __init__(self, recipient_channel: int) -> None:
        """
        Initialize channel EOF message.

        Args:
            recipient_channel: Recipient's channel number
        """
        super().__init__(MSG_CHANNEL_EOF)
        self.recipient_channel = recipient_channel

        # Build message data
        self.add_uint32(recipient_channel)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "ChannelEOFMessage":
        """Unpack channel EOF message data."""
        recipient_channel, _ = read_uint32(data, 0)
        return cls(recipient_channel)
Methods:
__init__(recipient_channel)

Initialize channel EOF message.

Parameters:

Name Type Description Default
recipient_channel int

Recipient's channel number

required
Source code in spindlex/protocol/messages.py
def __init__(self, recipient_channel: int) -> None:
    """
    Initialize channel EOF message.

    Args:
        recipient_channel: Recipient's channel number
    """
    super().__init__(MSG_CHANNEL_EOF)
    self.recipient_channel = recipient_channel

    # Build message data
    self.add_uint32(recipient_channel)

ChannelExtendedDataMessage

Bases: Message

SSH channel extended data message (MSG_CHANNEL_EXTENDED_DATA).

Source code in spindlex/protocol/messages.py
class ChannelExtendedDataMessage(Message):
    """SSH channel extended data message (MSG_CHANNEL_EXTENDED_DATA)."""

    def __init__(self, recipient_channel: int, data_type: int, data: bytes) -> None:
        """
        Initialize channel extended data message.

        Args:
            recipient_channel: Recipient's channel number
            data_type: Extended data type code
            data: Extended data
        """
        super().__init__(MSG_CHANNEL_EXTENDED_DATA)
        self.recipient_channel = recipient_channel
        self.data_type = data_type
        self.data = data

        # Build message data
        self.add_uint32(recipient_channel)
        self.add_uint32(data_type)
        self.add_string(data)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "ChannelExtendedDataMessage":
        """Unpack channel extended data message data."""
        offset = 0
        recipient_channel, offset = read_uint32(data, offset)
        data_type, offset = read_uint32(data, offset)
        message_data, offset = read_string(data, offset)

        return cls(recipient_channel, data_type, message_data)
Methods:
__init__(recipient_channel, data_type, data)

Initialize channel extended data message.

Parameters:

Name Type Description Default
recipient_channel int

Recipient's channel number

required
data_type int

Extended data type code

required
data bytes

Extended data

required
Source code in spindlex/protocol/messages.py
def __init__(self, recipient_channel: int, data_type: int, data: bytes) -> None:
    """
    Initialize channel extended data message.

    Args:
        recipient_channel: Recipient's channel number
        data_type: Extended data type code
        data: Extended data
    """
    super().__init__(MSG_CHANNEL_EXTENDED_DATA)
    self.recipient_channel = recipient_channel
    self.data_type = data_type
    self.data = data

    # Build message data
    self.add_uint32(recipient_channel)
    self.add_uint32(data_type)
    self.add_string(data)

ChannelFailureMessage

Bases: Message

SSH channel failure message (MSG_CHANNEL_FAILURE).

Source code in spindlex/protocol/messages.py
class ChannelFailureMessage(Message):
    """SSH channel failure message (MSG_CHANNEL_FAILURE)."""

    def __init__(self, recipient_channel: int) -> None:
        """
        Initialize channel failure message.

        Args:
            recipient_channel: Recipient's channel number
        """
        super().__init__(MSG_CHANNEL_FAILURE)
        self.recipient_channel = recipient_channel

        # Build message data
        self.add_uint32(recipient_channel)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "ChannelFailureMessage":
        """Unpack channel failure message data."""
        recipient_channel, _ = read_uint32(data, 0)
        return cls(recipient_channel)
Methods:
__init__(recipient_channel)

Initialize channel failure message.

Parameters:

Name Type Description Default
recipient_channel int

Recipient's channel number

required
Source code in spindlex/protocol/messages.py
def __init__(self, recipient_channel: int) -> None:
    """
    Initialize channel failure message.

    Args:
        recipient_channel: Recipient's channel number
    """
    super().__init__(MSG_CHANNEL_FAILURE)
    self.recipient_channel = recipient_channel

    # Build message data
    self.add_uint32(recipient_channel)

ChannelOpenConfirmationMessage

Bases: Message

SSH channel open confirmation message (MSG_CHANNEL_OPEN_CONFIRMATION).

Source code in spindlex/protocol/messages.py
class ChannelOpenConfirmationMessage(Message):
    """SSH channel open confirmation message (MSG_CHANNEL_OPEN_CONFIRMATION)."""

    def __init__(
        self,
        recipient_channel: int,
        sender_channel: int,
        initial_window_size: int,
        maximum_packet_size: int,
        type_specific_data: bytes = b"",
    ) -> None:
        """
        Initialize channel open confirmation message.

        Args:
            recipient_channel: Recipient's channel number
            sender_channel: Sender's channel number
            initial_window_size: Initial window size
            maximum_packet_size: Maximum packet size
            type_specific_data: Channel type specific data
        """
        super().__init__(MSG_CHANNEL_OPEN_CONFIRMATION)
        self.recipient_channel = recipient_channel
        self.sender_channel = sender_channel
        self.initial_window_size = initial_window_size
        self.maximum_packet_size = maximum_packet_size
        self.type_specific_data = type_specific_data

        # Build message data
        self.add_uint32(recipient_channel)
        self.add_uint32(sender_channel)
        self.add_uint32(initial_window_size)
        self.add_uint32(maximum_packet_size)
        if type_specific_data:
            self._data.extend(type_specific_data)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "ChannelOpenConfirmationMessage":
        """Unpack channel open confirmation message data."""
        offset = 0
        recipient_channel, offset = read_uint32(data, offset)
        sender_channel, offset = read_uint32(data, offset)
        initial_window_size, offset = read_uint32(data, offset)
        maximum_packet_size, offset = read_uint32(data, offset)

        type_specific_data = data[offset:] if offset < len(data) else b""

        return cls(
            recipient_channel,
            sender_channel,
            initial_window_size,
            maximum_packet_size,
            type_specific_data,
        )
Methods:
__init__(recipient_channel, sender_channel, initial_window_size, maximum_packet_size, type_specific_data=b'')

Initialize channel open confirmation message.

Parameters:

Name Type Description Default
recipient_channel int

Recipient's channel number

required
sender_channel int

Sender's channel number

required
initial_window_size int

Initial window size

required
maximum_packet_size int

Maximum packet size

required
type_specific_data bytes

Channel type specific data

b''
Source code in spindlex/protocol/messages.py
def __init__(
    self,
    recipient_channel: int,
    sender_channel: int,
    initial_window_size: int,
    maximum_packet_size: int,
    type_specific_data: bytes = b"",
) -> None:
    """
    Initialize channel open confirmation message.

    Args:
        recipient_channel: Recipient's channel number
        sender_channel: Sender's channel number
        initial_window_size: Initial window size
        maximum_packet_size: Maximum packet size
        type_specific_data: Channel type specific data
    """
    super().__init__(MSG_CHANNEL_OPEN_CONFIRMATION)
    self.recipient_channel = recipient_channel
    self.sender_channel = sender_channel
    self.initial_window_size = initial_window_size
    self.maximum_packet_size = maximum_packet_size
    self.type_specific_data = type_specific_data

    # Build message data
    self.add_uint32(recipient_channel)
    self.add_uint32(sender_channel)
    self.add_uint32(initial_window_size)
    self.add_uint32(maximum_packet_size)
    if type_specific_data:
        self._data.extend(type_specific_data)

ChannelOpenFailureMessage

Bases: Message

SSH channel open failure message (MSG_CHANNEL_OPEN_FAILURE).

Source code in spindlex/protocol/messages.py
class ChannelOpenFailureMessage(Message):
    """SSH channel open failure message (MSG_CHANNEL_OPEN_FAILURE)."""

    def __init__(
        self,
        recipient_channel: int,
        reason_code: int,
        description: str = "",
        language: str = "",
    ) -> None:
        """
        Initialize channel open failure message.

        Args:
            recipient_channel: Recipient's channel number
            reason_code: Failure reason code
            description: Human-readable description
            language: Language tag (RFC 3066)
        """
        super().__init__(MSG_CHANNEL_OPEN_FAILURE)
        self.recipient_channel = recipient_channel
        self.reason_code = reason_code
        self.description = description
        self.language = language

        # Build message data
        self.add_uint32(recipient_channel)
        self.add_uint32(reason_code)
        self.add_string(description)
        self.add_string(language)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "ChannelOpenFailureMessage":
        """Unpack channel open failure message data."""
        offset = 0
        recipient_channel, offset = read_uint32(data, offset)
        reason_code, offset = read_uint32(data, offset)
        description_bytes, offset = read_string(data, offset)
        language_bytes, offset = read_string(data, offset)

        description = description_bytes.decode(SSH_STRING_ENCODING, errors="replace")
        language = language_bytes.decode(SSH_STRING_ENCODING, errors="replace")

        return cls(recipient_channel, reason_code, description, language)
Methods:
__init__(recipient_channel, reason_code, description='', language='')

Initialize channel open failure message.

Parameters:

Name Type Description Default
recipient_channel int

Recipient's channel number

required
reason_code int

Failure reason code

required
description str

Human-readable description

''
language str

Language tag (RFC 3066)

''
Source code in spindlex/protocol/messages.py
def __init__(
    self,
    recipient_channel: int,
    reason_code: int,
    description: str = "",
    language: str = "",
) -> None:
    """
    Initialize channel open failure message.

    Args:
        recipient_channel: Recipient's channel number
        reason_code: Failure reason code
        description: Human-readable description
        language: Language tag (RFC 3066)
    """
    super().__init__(MSG_CHANNEL_OPEN_FAILURE)
    self.recipient_channel = recipient_channel
    self.reason_code = reason_code
    self.description = description
    self.language = language

    # Build message data
    self.add_uint32(recipient_channel)
    self.add_uint32(reason_code)
    self.add_string(description)
    self.add_string(language)

ChannelOpenMessage

Bases: Message

SSH channel open message (MSG_CHANNEL_OPEN).

Source code in spindlex/protocol/messages.py
class ChannelOpenMessage(Message):
    """SSH channel open message (MSG_CHANNEL_OPEN)."""

    def __init__(
        self,
        channel_type: str,
        sender_channel: int,
        initial_window_size: int,
        maximum_packet_size: int,
        type_specific_data: bytes = b"",
    ) -> None:
        """
        Initialize channel open message.

        Args:
            channel_type: Type of channel to open
            sender_channel: Sender's channel number
            initial_window_size: Initial window size
            maximum_packet_size: Maximum packet size
            type_specific_data: Channel type specific data
        """
        super().__init__(MSG_CHANNEL_OPEN)
        self.channel_type = channel_type
        self.sender_channel = sender_channel
        self.initial_window_size = initial_window_size
        self.maximum_packet_size = maximum_packet_size
        self.type_specific_data = type_specific_data

        # Build message data
        self.add_string(channel_type)
        self.add_uint32(sender_channel)
        self.add_uint32(initial_window_size)
        self.add_uint32(maximum_packet_size)
        if type_specific_data:
            self._data.extend(type_specific_data)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "ChannelOpenMessage":
        """Unpack channel open message data."""
        offset = 0
        channel_type_bytes, offset = read_string(data, offset)
        sender_channel, offset = read_uint32(data, offset)
        initial_window_size, offset = read_uint32(data, offset)
        maximum_packet_size, offset = read_uint32(data, offset)

        channel_type = channel_type_bytes.decode(SSH_STRING_ENCODING)
        type_specific_data = data[offset:] if offset < len(data) else b""

        return cls(
            channel_type,
            sender_channel,
            initial_window_size,
            maximum_packet_size,
            type_specific_data,
        )

    def validate(self) -> bool:
        """
        Validate channel open message content.

        Returns:
            True if message is valid

        Raises:
            ProtocolException: If message is invalid
        """
        super().validate()

        # Validate channel type
        if not self.channel_type:
            raise ProtocolException("Channel type cannot be empty")

        # Validate window and packet sizes
        if self.initial_window_size <= 0:
            raise ProtocolException("Initial window size must be positive")
        if self.maximum_packet_size <= 0:
            raise ProtocolException("Maximum packet size must be positive")
        if self.maximum_packet_size > MAX_PACKET_SIZE:
            raise ProtocolException(
                f"Maximum packet size too large: {self.maximum_packet_size}"
            )

        return True
Methods:
__init__(channel_type, sender_channel, initial_window_size, maximum_packet_size, type_specific_data=b'')

Initialize channel open message.

Parameters:

Name Type Description Default
channel_type str

Type of channel to open

required
sender_channel int

Sender's channel number

required
initial_window_size int

Initial window size

required
maximum_packet_size int

Maximum packet size

required
type_specific_data bytes

Channel type specific data

b''
Source code in spindlex/protocol/messages.py
def __init__(
    self,
    channel_type: str,
    sender_channel: int,
    initial_window_size: int,
    maximum_packet_size: int,
    type_specific_data: bytes = b"",
) -> None:
    """
    Initialize channel open message.

    Args:
        channel_type: Type of channel to open
        sender_channel: Sender's channel number
        initial_window_size: Initial window size
        maximum_packet_size: Maximum packet size
        type_specific_data: Channel type specific data
    """
    super().__init__(MSG_CHANNEL_OPEN)
    self.channel_type = channel_type
    self.sender_channel = sender_channel
    self.initial_window_size = initial_window_size
    self.maximum_packet_size = maximum_packet_size
    self.type_specific_data = type_specific_data

    # Build message data
    self.add_string(channel_type)
    self.add_uint32(sender_channel)
    self.add_uint32(initial_window_size)
    self.add_uint32(maximum_packet_size)
    if type_specific_data:
        self._data.extend(type_specific_data)
validate()

Validate channel open message content.

Returns:

Type Description
bool

True if message is valid

Raises:

Type Description
ProtocolException

If message is invalid

Source code in spindlex/protocol/messages.py
def validate(self) -> bool:
    """
    Validate channel open message content.

    Returns:
        True if message is valid

    Raises:
        ProtocolException: If message is invalid
    """
    super().validate()

    # Validate channel type
    if not self.channel_type:
        raise ProtocolException("Channel type cannot be empty")

    # Validate window and packet sizes
    if self.initial_window_size <= 0:
        raise ProtocolException("Initial window size must be positive")
    if self.maximum_packet_size <= 0:
        raise ProtocolException("Maximum packet size must be positive")
    if self.maximum_packet_size > MAX_PACKET_SIZE:
        raise ProtocolException(
            f"Maximum packet size too large: {self.maximum_packet_size}"
        )

    return True

ChannelRequestMessage

Bases: Message

SSH channel request message (MSG_CHANNEL_REQUEST).

Source code in spindlex/protocol/messages.py
class ChannelRequestMessage(Message):
    """SSH channel request message (MSG_CHANNEL_REQUEST)."""

    def __init__(
        self,
        recipient_channel: int,
        request_type: str,
        want_reply: bool,
        request_data: bytes = b"",
    ) -> None:
        """
        Initialize channel request message.

        Args:
            recipient_channel: Recipient's channel number
            request_type: Type of request
            want_reply: Whether a reply is wanted
            request_data: Request-specific data
        """
        super().__init__(MSG_CHANNEL_REQUEST)
        self.recipient_channel = recipient_channel
        self.request_type = request_type
        self.want_reply = want_reply
        self.request_data = request_data

        # Build message data
        self.add_uint32(recipient_channel)
        self.add_string(request_type)
        self.add_boolean(want_reply)
        if request_data:
            self._data.extend(request_data)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "ChannelRequestMessage":
        """Unpack channel request message data."""
        offset = 0
        recipient_channel, offset = read_uint32(data, offset)
        request_type_bytes, offset = read_string(data, offset)
        want_reply, offset = read_boolean(data, offset)

        request_type = request_type_bytes.decode(SSH_STRING_ENCODING)
        request_data = data[offset:] if offset < len(data) else b""

        return cls(recipient_channel, request_type, want_reply, request_data)
Methods:
__init__(recipient_channel, request_type, want_reply, request_data=b'')

Initialize channel request message.

Parameters:

Name Type Description Default
recipient_channel int

Recipient's channel number

required
request_type str

Type of request

required
want_reply bool

Whether a reply is wanted

required
request_data bytes

Request-specific data

b''
Source code in spindlex/protocol/messages.py
def __init__(
    self,
    recipient_channel: int,
    request_type: str,
    want_reply: bool,
    request_data: bytes = b"",
) -> None:
    """
    Initialize channel request message.

    Args:
        recipient_channel: Recipient's channel number
        request_type: Type of request
        want_reply: Whether a reply is wanted
        request_data: Request-specific data
    """
    super().__init__(MSG_CHANNEL_REQUEST)
    self.recipient_channel = recipient_channel
    self.request_type = request_type
    self.want_reply = want_reply
    self.request_data = request_data

    # Build message data
    self.add_uint32(recipient_channel)
    self.add_string(request_type)
    self.add_boolean(want_reply)
    if request_data:
        self._data.extend(request_data)

ChannelSuccessMessage

Bases: Message

SSH channel success message (MSG_CHANNEL_SUCCESS).

Source code in spindlex/protocol/messages.py
class ChannelSuccessMessage(Message):
    """SSH channel success message (MSG_CHANNEL_SUCCESS)."""

    def __init__(self, recipient_channel: int) -> None:
        """
        Initialize channel success message.

        Args:
            recipient_channel: Recipient's channel number
        """
        super().__init__(MSG_CHANNEL_SUCCESS)
        self.recipient_channel = recipient_channel

        # Build message data
        self.add_uint32(recipient_channel)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "ChannelSuccessMessage":
        """Unpack channel success message data."""
        recipient_channel, _ = read_uint32(data, 0)
        return cls(recipient_channel)
Methods:
__init__(recipient_channel)

Initialize channel success message.

Parameters:

Name Type Description Default
recipient_channel int

Recipient's channel number

required
Source code in spindlex/protocol/messages.py
def __init__(self, recipient_channel: int) -> None:
    """
    Initialize channel success message.

    Args:
        recipient_channel: Recipient's channel number
    """
    super().__init__(MSG_CHANNEL_SUCCESS)
    self.recipient_channel = recipient_channel

    # Build message data
    self.add_uint32(recipient_channel)

ChannelWindowAdjustMessage

Bases: Message

SSH channel window adjust message (MSG_CHANNEL_WINDOW_ADJUST).

Source code in spindlex/protocol/messages.py
class ChannelWindowAdjustMessage(Message):
    """SSH channel window adjust message (MSG_CHANNEL_WINDOW_ADJUST)."""

    def __init__(self, recipient_channel: int, bytes_to_add: int) -> None:
        """
        Initialize channel window adjust message.

        Args:
            recipient_channel: Recipient's channel number
            bytes_to_add: Number of bytes to add to window
        """
        super().__init__(MSG_CHANNEL_WINDOW_ADJUST)
        self.recipient_channel = recipient_channel
        self.bytes_to_add = bytes_to_add

        # Build message data
        self.add_uint32(recipient_channel)
        self.add_uint32(bytes_to_add)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "ChannelWindowAdjustMessage":
        """Unpack channel window adjust message data."""
        offset = 0
        recipient_channel, offset = read_uint32(data, offset)
        bytes_to_add, offset = read_uint32(data, offset)

        return cls(recipient_channel, bytes_to_add)
Methods:
__init__(recipient_channel, bytes_to_add)

Initialize channel window adjust message.

Parameters:

Name Type Description Default
recipient_channel int

Recipient's channel number

required
bytes_to_add int

Number of bytes to add to window

required
Source code in spindlex/protocol/messages.py
def __init__(self, recipient_channel: int, bytes_to_add: int) -> None:
    """
    Initialize channel window adjust message.

    Args:
        recipient_channel: Recipient's channel number
        bytes_to_add: Number of bytes to add to window
    """
    super().__init__(MSG_CHANNEL_WINDOW_ADJUST)
    self.recipient_channel = recipient_channel
    self.bytes_to_add = bytes_to_add

    # Build message data
    self.add_uint32(recipient_channel)
    self.add_uint32(bytes_to_add)

DebugMessage

Bases: Message

SSH debug message (MSG_DEBUG).

Source code in spindlex/protocol/messages.py
class DebugMessage(Message):
    """SSH debug message (MSG_DEBUG)."""

    def __init__(self, always_display: bool, message: str, language: str = "") -> None:
        """
        Initialize debug message.

        Args:
            always_display: Whether message should always be displayed
            message: Debug message text
            language: Language tag (RFC 3066)
        """
        super().__init__(MSG_DEBUG)
        self.always_display = always_display
        self.message = message
        self.language = language

        # Build message data
        self.add_boolean(always_display)
        self.add_string(message)
        self.add_string(language)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "DebugMessage":
        """Unpack debug message data."""
        offset = 0
        always_display, offset = read_boolean(data, offset)
        message_bytes, offset = read_string(data, offset)
        language_bytes, offset = read_string(data, offset)

        message = message_bytes.decode(SSH_STRING_ENCODING, errors="replace")
        language = language_bytes.decode(SSH_STRING_ENCODING, errors="replace")

        return cls(always_display, message, language)
Methods:
__init__(always_display, message, language='')

Initialize debug message.

Parameters:

Name Type Description Default
always_display bool

Whether message should always be displayed

required
message str

Debug message text

required
language str

Language tag (RFC 3066)

''
Source code in spindlex/protocol/messages.py
def __init__(self, always_display: bool, message: str, language: str = "") -> None:
    """
    Initialize debug message.

    Args:
        always_display: Whether message should always be displayed
        message: Debug message text
        language: Language tag (RFC 3066)
    """
    super().__init__(MSG_DEBUG)
    self.always_display = always_display
    self.message = message
    self.language = language

    # Build message data
    self.add_boolean(always_display)
    self.add_string(message)
    self.add_string(language)

DisconnectMessage

Bases: Message

SSH disconnect message (MSG_DISCONNECT).

Source code in spindlex/protocol/messages.py
class DisconnectMessage(Message):
    """SSH disconnect message (MSG_DISCONNECT)."""

    def __init__(
        self, reason_code: int, description: str = "", language: str = ""
    ) -> None:
        """
        Initialize disconnect message.

        Args:
            reason_code: Disconnect reason code
            description: Human-readable description
            language: Language tag (RFC 3066)
        """
        super().__init__(MSG_DISCONNECT)
        self.reason_code = reason_code
        self.description = description
        self.language = language

        # Build message data
        self.add_uint32(reason_code)
        self.add_string(description)
        self.add_string(language)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "DisconnectMessage":
        """Unpack disconnect message data."""
        offset = 0
        reason_code, offset = read_uint32(data, offset)
        description_bytes, offset = read_string(data, offset)
        language_bytes, offset = read_string(data, offset)

        description = description_bytes.decode(SSH_STRING_ENCODING, errors="replace")
        language = language_bytes.decode(SSH_STRING_ENCODING, errors="replace")

        return cls(reason_code, description, language)
Methods:
__init__(reason_code, description='', language='')

Initialize disconnect message.

Parameters:

Name Type Description Default
reason_code int

Disconnect reason code

required
description str

Human-readable description

''
language str

Language tag (RFC 3066)

''
Source code in spindlex/protocol/messages.py
def __init__(
    self, reason_code: int, description: str = "", language: str = ""
) -> None:
    """
    Initialize disconnect message.

    Args:
        reason_code: Disconnect reason code
        description: Human-readable description
        language: Language tag (RFC 3066)
    """
    super().__init__(MSG_DISCONNECT)
    self.reason_code = reason_code
    self.description = description
    self.language = language

    # Build message data
    self.add_uint32(reason_code)
    self.add_string(description)
    self.add_string(language)

GlobalRequestMessage

Bases: Message

SSH global request message (MSG_GLOBAL_REQUEST).

Source code in spindlex/protocol/messages.py
class GlobalRequestMessage(Message):
    """SSH global request message (MSG_GLOBAL_REQUEST)."""

    def __init__(
        self, request_name: str, want_reply: bool, request_data: bytes = b""
    ) -> None:
        """
        Initialize global request message.

        Args:
            request_name: Name of the request
            want_reply: Whether a reply is wanted
            request_data: Request-specific data
        """
        super().__init__(MSG_GLOBAL_REQUEST)
        self.request_name = request_name
        self.want_reply = want_reply
        self.request_data = request_data

        # Build message data
        self.add_string(request_name)
        self.add_boolean(want_reply)
        if request_data:
            self._data.extend(request_data)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "GlobalRequestMessage":
        """Unpack global request message data."""
        offset = 0
        request_name_bytes, offset = read_string(data, offset)
        want_reply, offset = read_boolean(data, offset)

        request_name = request_name_bytes.decode(SSH_STRING_ENCODING)
        request_data = data[offset:] if offset < len(data) else b""

        return cls(request_name, want_reply, request_data)
Methods:
__init__(request_name, want_reply, request_data=b'')

Initialize global request message.

Parameters:

Name Type Description Default
request_name str

Name of the request

required
want_reply bool

Whether a reply is wanted

required
request_data bytes

Request-specific data

b''
Source code in spindlex/protocol/messages.py
def __init__(
    self, request_name: str, want_reply: bool, request_data: bytes = b""
) -> None:
    """
    Initialize global request message.

    Args:
        request_name: Name of the request
        want_reply: Whether a reply is wanted
        request_data: Request-specific data
    """
    super().__init__(MSG_GLOBAL_REQUEST)
    self.request_name = request_name
    self.want_reply = want_reply
    self.request_data = request_data

    # Build message data
    self.add_string(request_name)
    self.add_boolean(want_reply)
    if request_data:
        self._data.extend(request_data)

IgnoreMessage

Bases: Message

SSH ignore message (MSG_IGNORE).

Source code in spindlex/protocol/messages.py
class IgnoreMessage(Message):
    """SSH ignore message (MSG_IGNORE)."""

    def __init__(self, data: bytes = b"") -> None:
        """
        Initialize ignore message.

        Args:
            data: Arbitrary data to ignore
        """
        super().__init__(MSG_IGNORE)
        self.data = data

        # Build message data
        self.add_string(data)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "IgnoreMessage":
        """Unpack ignore message data."""
        ignore_data, _ = read_string(data, 0)
        return cls(ignore_data)
Methods:
__init__(data=b'')

Initialize ignore message.

Parameters:

Name Type Description Default
data bytes

Arbitrary data to ignore

b''
Source code in spindlex/protocol/messages.py
def __init__(self, data: bytes = b"") -> None:
    """
    Initialize ignore message.

    Args:
        data: Arbitrary data to ignore
    """
    super().__init__(MSG_IGNORE)
    self.data = data

    # Build message data
    self.add_string(data)

KexDHInitMessage

Bases: Message

SSH Diffie-Hellman key exchange init message (MSG_KEXDH_INIT).

Source code in spindlex/protocol/messages.py
class KexDHInitMessage(Message):
    """SSH Diffie-Hellman key exchange init message (MSG_KEXDH_INIT)."""

    def __init__(self, e: int) -> None:
        """
        Initialize DH init message.

        Args:
            e: Client's DH public key
        """
        super().__init__(MSG_KEXDH_INIT)
        self.e = e

        # Build message data
        self.add_mpint(e)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "KexDHInitMessage":
        """Unpack DH init message data."""
        e, _ = read_mpint(data, 0)
        return cls(e)
Methods:
__init__(e)

Initialize DH init message.

Parameters:

Name Type Description Default
e int

Client's DH public key

required
Source code in spindlex/protocol/messages.py
def __init__(self, e: int) -> None:
    """
    Initialize DH init message.

    Args:
        e: Client's DH public key
    """
    super().__init__(MSG_KEXDH_INIT)
    self.e = e

    # Build message data
    self.add_mpint(e)

KexDHReplyMessage

Bases: Message

SSH Diffie-Hellman key exchange reply message (MSG_KEXDH_REPLY).

Source code in spindlex/protocol/messages.py
class KexDHReplyMessage(Message):
    """SSH Diffie-Hellman key exchange reply message (MSG_KEXDH_REPLY)."""

    def __init__(self, host_key: bytes, f: int, signature: bytes) -> None:
        """
        Initialize DH reply message.

        Args:
            host_key: Server's host key
            f: Server's DH public key
            signature: Signature of exchange hash
        """
        super().__init__(MSG_KEXDH_REPLY)
        self.host_key = host_key
        self.f = f
        self.signature = signature

        # Build message data
        self.add_string(host_key)
        self.add_mpint(f)
        self.add_string(signature)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "KexDHReplyMessage":
        """Unpack DH reply message data."""
        offset = 0
        host_key, offset = read_string(data, offset)
        f, offset = read_mpint(data, offset)
        signature, offset = read_string(data, offset)

        return cls(host_key, f, signature)
Methods:
__init__(host_key, f, signature)

Initialize DH reply message.

Parameters:

Name Type Description Default
host_key bytes

Server's host key

required
f int

Server's DH public key

required
signature bytes

Signature of exchange hash

required
Source code in spindlex/protocol/messages.py
def __init__(self, host_key: bytes, f: int, signature: bytes) -> None:
    """
    Initialize DH reply message.

    Args:
        host_key: Server's host key
        f: Server's DH public key
        signature: Signature of exchange hash
    """
    super().__init__(MSG_KEXDH_REPLY)
    self.host_key = host_key
    self.f = f
    self.signature = signature

    # Build message data
    self.add_string(host_key)
    self.add_mpint(f)
    self.add_string(signature)

KexInitMessage

Bases: Message

SSH key exchange init message (MSG_KEXINIT).

Source code in spindlex/protocol/messages.py
class KexInitMessage(Message):
    """SSH key exchange init message (MSG_KEXINIT)."""

    def __init__(
        self,
        cookie: bytes,
        kex_algorithms: list[str],
        server_host_key_algorithms: list[str],
        encryption_algorithms_client_to_server: list[str],
        encryption_algorithms_server_to_client: list[str],
        mac_algorithms_client_to_server: list[str],
        mac_algorithms_server_to_client: list[str],
        compression_algorithms_client_to_server: list[str],
        compression_algorithms_server_to_client: list[str],
        languages_client_to_server: Optional[list[str]] = None,
        languages_server_to_client: Optional[list[str]] = None,
        first_kex_packet_follows: bool = False,
    ) -> None:
        """
        Initialize KEX init message.

        Args:
            cookie: Random 16-byte cookie
            kex_algorithms: Key exchange algorithms
            server_host_key_algorithms: Server host key algorithms
            encryption_algorithms_client_to_server: Encryption algorithms (C->S)
            encryption_algorithms_server_to_client: Encryption algorithms (S->C)
            mac_algorithms_client_to_server: MAC algorithms (C->S)
            mac_algorithms_server_to_client: MAC algorithms (S->C)
            compression_algorithms_client_to_server: Compression algorithms (C->S)
            compression_algorithms_server_to_client: Compression algorithms (S->C)
            languages_client_to_server: Language tags (C->S)
            languages_server_to_client: Language tags (S->C)
            first_kex_packet_follows: Whether first KEX packet follows
        """
        super().__init__(MSG_KEXINIT)

        if len(cookie) != KEX_COOKIE_SIZE:
            raise ProtocolException(f"Invalid cookie size: {len(cookie)}")

        self.cookie = cookie
        self.kex_algorithms = kex_algorithms
        self.server_host_key_algorithms = server_host_key_algorithms
        self.encryption_algorithms_client_to_server = (
            encryption_algorithms_client_to_server
        )
        self.encryption_algorithms_server_to_client = (
            encryption_algorithms_server_to_client
        )
        self.mac_algorithms_client_to_server = mac_algorithms_client_to_server
        self.mac_algorithms_server_to_client = mac_algorithms_server_to_client
        self.compression_algorithms_client_to_server = (
            compression_algorithms_client_to_server
        )
        self.compression_algorithms_server_to_client = (
            compression_algorithms_server_to_client
        )
        self.languages_client_to_server = languages_client_to_server or []
        self.languages_server_to_client = languages_server_to_client or []
        self.first_kex_packet_follows = first_kex_packet_follows

        # Build message data
        self._data.extend(cookie)
        self.add_string(",".join(kex_algorithms))
        self.add_string(",".join(server_host_key_algorithms))
        self.add_string(",".join(encryption_algorithms_client_to_server))
        self.add_string(",".join(encryption_algorithms_server_to_client))
        self.add_string(",".join(mac_algorithms_client_to_server))
        self.add_string(",".join(mac_algorithms_server_to_client))
        self.add_string(",".join(compression_algorithms_client_to_server))
        self.add_string(",".join(compression_algorithms_server_to_client))
        self.add_string(",".join(self.languages_client_to_server))
        self.add_string(",".join(self.languages_server_to_client))
        self.add_boolean(first_kex_packet_follows)
        self.add_uint32(0)  # Reserved for future extension

    @classmethod
    def _unpack_data(cls, data: bytes) -> "KexInitMessage":
        """Unpack KEX init message data."""
        offset = 0

        # Read cookie
        if len(data) < KEX_COOKIE_SIZE:
            raise ProtocolException("Invalid KEXINIT message: missing cookie")
        cookie = data[offset : offset + KEX_COOKIE_SIZE]
        offset += KEX_COOKIE_SIZE

        # Read algorithm lists
        kex_algs_bytes, offset = read_string(data, offset)
        host_key_algs_bytes, offset = read_string(data, offset)
        enc_c2s_bytes, offset = read_string(data, offset)
        enc_s2c_bytes, offset = read_string(data, offset)
        mac_c2s_bytes, offset = read_string(data, offset)
        mac_s2c_bytes, offset = read_string(data, offset)
        comp_c2s_bytes, offset = read_string(data, offset)
        comp_s2c_bytes, offset = read_string(data, offset)
        lang_c2s_bytes, offset = read_string(data, offset)
        lang_s2c_bytes, offset = read_string(data, offset)

        first_kex_follows, offset = read_boolean(data, offset)
        reserved, offset = read_uint32(data, offset)

        # Parse algorithm lists
        def parse_list(data: bytes) -> list[str]:
            s = data.decode(SSH_STRING_ENCODING)
            return [alg.strip() for alg in s.split(",") if alg.strip()]

        return cls(
            cookie=cookie,
            kex_algorithms=parse_list(kex_algs_bytes),
            server_host_key_algorithms=parse_list(host_key_algs_bytes),
            encryption_algorithms_client_to_server=parse_list(enc_c2s_bytes),
            encryption_algorithms_server_to_client=parse_list(enc_s2c_bytes),
            mac_algorithms_client_to_server=parse_list(mac_c2s_bytes),
            mac_algorithms_server_to_client=parse_list(mac_s2c_bytes),
            compression_algorithms_client_to_server=parse_list(comp_c2s_bytes),
            compression_algorithms_server_to_client=parse_list(comp_s2c_bytes),
            languages_client_to_server=parse_list(lang_c2s_bytes),
            languages_server_to_client=parse_list(lang_s2c_bytes),
            first_kex_packet_follows=first_kex_follows,
        )

    def validate(self) -> bool:
        """
        Validate KEX init message content.

        Returns:
            True if message is valid

        Raises:
            ProtocolException: If message is invalid
        """
        super().validate()

        # Validate required algorithm lists are not empty
        if not self.kex_algorithms:
            raise ProtocolException("KEX algorithms list cannot be empty")
        if not self.server_host_key_algorithms:
            raise ProtocolException("Server host key algorithms list cannot be empty")
        if not self.encryption_algorithms_client_to_server:
            raise ProtocolException(
                "Client-to-server encryption algorithms list cannot be empty"
            )
        if not self.encryption_algorithms_server_to_client:
            raise ProtocolException(
                "Server-to-client encryption algorithms list cannot be empty"
            )

        return True
Methods:
__init__(cookie, kex_algorithms, server_host_key_algorithms, encryption_algorithms_client_to_server, encryption_algorithms_server_to_client, mac_algorithms_client_to_server, mac_algorithms_server_to_client, compression_algorithms_client_to_server, compression_algorithms_server_to_client, languages_client_to_server=None, languages_server_to_client=None, first_kex_packet_follows=False)

Initialize KEX init message.

Parameters:

Name Type Description Default
cookie bytes

Random 16-byte cookie

required
kex_algorithms list[str]

Key exchange algorithms

required
server_host_key_algorithms list[str]

Server host key algorithms

required
encryption_algorithms_client_to_server list[str]

Encryption algorithms (C->S)

required
encryption_algorithms_server_to_client list[str]

Encryption algorithms (S->C)

required
mac_algorithms_client_to_server list[str]

MAC algorithms (C->S)

required
mac_algorithms_server_to_client list[str]

MAC algorithms (S->C)

required
compression_algorithms_client_to_server list[str]

Compression algorithms (C->S)

required
compression_algorithms_server_to_client list[str]

Compression algorithms (S->C)

required
languages_client_to_server Optional[list[str]]

Language tags (C->S)

None
languages_server_to_client Optional[list[str]]

Language tags (S->C)

None
first_kex_packet_follows bool

Whether first KEX packet follows

False
Source code in spindlex/protocol/messages.py
def __init__(
    self,
    cookie: bytes,
    kex_algorithms: list[str],
    server_host_key_algorithms: list[str],
    encryption_algorithms_client_to_server: list[str],
    encryption_algorithms_server_to_client: list[str],
    mac_algorithms_client_to_server: list[str],
    mac_algorithms_server_to_client: list[str],
    compression_algorithms_client_to_server: list[str],
    compression_algorithms_server_to_client: list[str],
    languages_client_to_server: Optional[list[str]] = None,
    languages_server_to_client: Optional[list[str]] = None,
    first_kex_packet_follows: bool = False,
) -> None:
    """
    Initialize KEX init message.

    Args:
        cookie: Random 16-byte cookie
        kex_algorithms: Key exchange algorithms
        server_host_key_algorithms: Server host key algorithms
        encryption_algorithms_client_to_server: Encryption algorithms (C->S)
        encryption_algorithms_server_to_client: Encryption algorithms (S->C)
        mac_algorithms_client_to_server: MAC algorithms (C->S)
        mac_algorithms_server_to_client: MAC algorithms (S->C)
        compression_algorithms_client_to_server: Compression algorithms (C->S)
        compression_algorithms_server_to_client: Compression algorithms (S->C)
        languages_client_to_server: Language tags (C->S)
        languages_server_to_client: Language tags (S->C)
        first_kex_packet_follows: Whether first KEX packet follows
    """
    super().__init__(MSG_KEXINIT)

    if len(cookie) != KEX_COOKIE_SIZE:
        raise ProtocolException(f"Invalid cookie size: {len(cookie)}")

    self.cookie = cookie
    self.kex_algorithms = kex_algorithms
    self.server_host_key_algorithms = server_host_key_algorithms
    self.encryption_algorithms_client_to_server = (
        encryption_algorithms_client_to_server
    )
    self.encryption_algorithms_server_to_client = (
        encryption_algorithms_server_to_client
    )
    self.mac_algorithms_client_to_server = mac_algorithms_client_to_server
    self.mac_algorithms_server_to_client = mac_algorithms_server_to_client
    self.compression_algorithms_client_to_server = (
        compression_algorithms_client_to_server
    )
    self.compression_algorithms_server_to_client = (
        compression_algorithms_server_to_client
    )
    self.languages_client_to_server = languages_client_to_server or []
    self.languages_server_to_client = languages_server_to_client or []
    self.first_kex_packet_follows = first_kex_packet_follows

    # Build message data
    self._data.extend(cookie)
    self.add_string(",".join(kex_algorithms))
    self.add_string(",".join(server_host_key_algorithms))
    self.add_string(",".join(encryption_algorithms_client_to_server))
    self.add_string(",".join(encryption_algorithms_server_to_client))
    self.add_string(",".join(mac_algorithms_client_to_server))
    self.add_string(",".join(mac_algorithms_server_to_client))
    self.add_string(",".join(compression_algorithms_client_to_server))
    self.add_string(",".join(compression_algorithms_server_to_client))
    self.add_string(",".join(self.languages_client_to_server))
    self.add_string(",".join(self.languages_server_to_client))
    self.add_boolean(first_kex_packet_follows)
    self.add_uint32(0)  # Reserved for future extension
validate()

Validate KEX init message content.

Returns:

Type Description
bool

True if message is valid

Raises:

Type Description
ProtocolException

If message is invalid

Source code in spindlex/protocol/messages.py
def validate(self) -> bool:
    """
    Validate KEX init message content.

    Returns:
        True if message is valid

    Raises:
        ProtocolException: If message is invalid
    """
    super().validate()

    # Validate required algorithm lists are not empty
    if not self.kex_algorithms:
        raise ProtocolException("KEX algorithms list cannot be empty")
    if not self.server_host_key_algorithms:
        raise ProtocolException("Server host key algorithms list cannot be empty")
    if not self.encryption_algorithms_client_to_server:
        raise ProtocolException(
            "Client-to-server encryption algorithms list cannot be empty"
        )
    if not self.encryption_algorithms_server_to_client:
        raise ProtocolException(
            "Server-to-client encryption algorithms list cannot be empty"
        )

    return True

Message

Base SSH protocol message class.

Provides message serialization, deserialization, and validation functionality for SSH protocol messages.

Source code in spindlex/protocol/messages.py
class Message:
    """
    Base SSH protocol message class.

    Provides message serialization, deserialization, and validation
    functionality for SSH protocol messages.
    """

    def __init__(self, msg_type: int) -> None:
        """
        Initialize message with type.

        Args:
            msg_type: SSH message type code

        Raises:
            ProtocolException: If message type is invalid
        """
        if not validate_message_type(msg_type):
            raise ProtocolException(f"Invalid message type: {msg_type}")

        self.msg_type = msg_type
        self._data = bytearray()

    def pack(self) -> bytes:
        """
        Serialize message to bytes.

        Returns:
            Serialized message data including message type

        Raises:
            ProtocolException: If serialization fails
        """
        try:
            # Start with message type
            result = write_byte(self.msg_type)

            # Add message-specific data
            result += bytes(self._data)

            return result
        except Exception as e:
            raise ProtocolException(f"Failed to pack message: {e}") from e

    @classmethod
    def unpack(cls, data: bytes) -> "Message":
        """
        Deserialize message from bytes.

        Args:
            data: Serialized message data

        Returns:
            Deserialized message instance

        Raises:
            ProtocolException: If deserialization fails
        """
        if len(data) < 1:
            raise ProtocolException("Message data too short")

        msg_type, offset = read_byte(data, 0)

        # If called on a specific subclass, unpack that subclass directly
        if cls != Message:
            return cls._unpack_data(data[offset:])

        # Create appropriate message class based on type
        message_classes = {
            MSG_DISCONNECT: DisconnectMessage,
            MSG_IGNORE: IgnoreMessage,
            MSG_UNIMPLEMENTED: UnimplementedMessage,
            MSG_DEBUG: DebugMessage,
            MSG_SERVICE_REQUEST: ServiceRequestMessage,
            MSG_SERVICE_ACCEPT: ServiceAcceptMessage,
            MSG_KEXINIT: KexInitMessage,
            MSG_NEWKEYS: NewKeysMessage,
            MSG_USERAUTH_REQUEST: UserAuthRequestMessage,
            MSG_USERAUTH_FAILURE: UserAuthFailureMessage,
            MSG_USERAUTH_SUCCESS: UserAuthSuccessMessage,
            MSG_USERAUTH_BANNER: UserAuthBannerMessage,
            # Types 60 and 61 are INTENTIONALLY omitted from this table (see constants.py).
            # Four auth methods share type 60 and two share type 61 (RFC 4252 / 4256 /
            # draft-ietf-secsh-gss).  The wire type alone is ambiguous - each auth handler
            # MUST receive these as a raw Message and re-interpret the payload using the
            # known active auth method as context before casting to a concrete class.
            # MSG_USERAUTH_PK_OK / MSG_USERAUTH_INFO_REQUEST / MSG_USERAUTH_GSSAPI_RESPONSE
            MSG_GLOBAL_REQUEST: GlobalRequestMessage,
            MSG_REQUEST_SUCCESS: RequestSuccessMessage,
            MSG_REQUEST_FAILURE: RequestFailureMessage,
            MSG_CHANNEL_OPEN: ChannelOpenMessage,
            MSG_CHANNEL_OPEN_CONFIRMATION: ChannelOpenConfirmationMessage,
            MSG_CHANNEL_OPEN_FAILURE: ChannelOpenFailureMessage,
            MSG_CHANNEL_WINDOW_ADJUST: ChannelWindowAdjustMessage,
            MSG_CHANNEL_DATA: ChannelDataMessage,
            MSG_CHANNEL_EXTENDED_DATA: ChannelExtendedDataMessage,
            MSG_CHANNEL_EOF: ChannelEOFMessage,
            MSG_CHANNEL_CLOSE: ChannelCloseMessage,
            MSG_CHANNEL_REQUEST: ChannelRequestMessage,
            MSG_CHANNEL_SUCCESS: ChannelSuccessMessage,
            MSG_CHANNEL_FAILURE: ChannelFailureMessage,
        }

        from typing import cast

        message_class = cast(type["Message"], message_classes.get(msg_type, Message))

        if message_class == Message:
            # Generic message
            msg = Message(msg_type)
            msg._data = bytearray(data[offset:])
            return msg
        else:
            # Specific message class
            return message_class._unpack_data(data[offset:])

    @classmethod
    def _unpack_data(cls, data: bytes) -> "Message":
        """
        Unpack message-specific data. Override in subclasses.

        Args:
            data: Message data without type byte

        Returns:
            Message instance
        """
        raise NotImplementedError("Subclasses must implement _unpack_data")

    def add_byte(self, value: int) -> None:
        """Add single byte to message."""
        self._data.extend(write_byte(value))

    def add_boolean(self, value: bool) -> None:
        """Add boolean to message."""
        self._data.extend(write_boolean(value))

    def add_uint32(self, value: int) -> None:
        """Add 32-bit unsigned integer to message."""
        self._data.extend(write_uint32(value))

    def add_uint64(self, value: int) -> None:
        """Add 64-bit unsigned integer to message."""
        self._data.extend(write_uint64(value))

    def add_string(self, value: Union[str, bytes]) -> None:
        """Add string to message."""
        self._data.extend(write_string(value))

    def add_mpint(self, value: int) -> None:
        """Add multiple precision integer to message."""
        self._data.extend(write_mpint(value))

    def validate(self) -> bool:
        """
        Validate message content.

        Returns:
            True if message is valid

        Raises:
            ProtocolException: If message is invalid
        """
        # Base validation - subclasses can override
        if not validate_message_type(self.msg_type):
            raise ProtocolException(f"Invalid message type: {self.msg_type}")

        # Validate message size
        if len(self._data) > MAX_MESSAGE_SIZE:
            raise ProtocolException(f"Message too large: {len(self._data)} bytes")

        return True

    def __str__(self) -> str:
        """String representation of message."""
        return (
            f"{self.__class__.__name__}(type={self.msg_type}, size={len(self._data)})"
        )

    def __repr__(self) -> str:
        """Detailed string representation of message."""
        return f"{self.__class__.__name__}(msg_type={self.msg_type}, data_size={len(self._data)})"
Methods:
__init__(msg_type)

Initialize message with type.

Parameters:

Name Type Description Default
msg_type int

SSH message type code

required

Raises:

Type Description
ProtocolException

If message type is invalid

Source code in spindlex/protocol/messages.py
def __init__(self, msg_type: int) -> None:
    """
    Initialize message with type.

    Args:
        msg_type: SSH message type code

    Raises:
        ProtocolException: If message type is invalid
    """
    if not validate_message_type(msg_type):
        raise ProtocolException(f"Invalid message type: {msg_type}")

    self.msg_type = msg_type
    self._data = bytearray()
__repr__()

Detailed string representation of message.

Source code in spindlex/protocol/messages.py
def __repr__(self) -> str:
    """Detailed string representation of message."""
    return f"{self.__class__.__name__}(msg_type={self.msg_type}, data_size={len(self._data)})"
__str__()

String representation of message.

Source code in spindlex/protocol/messages.py
def __str__(self) -> str:
    """String representation of message."""
    return (
        f"{self.__class__.__name__}(type={self.msg_type}, size={len(self._data)})"
    )
add_boolean(value)

Add boolean to message.

Source code in spindlex/protocol/messages.py
def add_boolean(self, value: bool) -> None:
    """Add boolean to message."""
    self._data.extend(write_boolean(value))
add_byte(value)

Add single byte to message.

Source code in spindlex/protocol/messages.py
def add_byte(self, value: int) -> None:
    """Add single byte to message."""
    self._data.extend(write_byte(value))
add_mpint(value)

Add multiple precision integer to message.

Source code in spindlex/protocol/messages.py
def add_mpint(self, value: int) -> None:
    """Add multiple precision integer to message."""
    self._data.extend(write_mpint(value))
add_string(value)

Add string to message.

Source code in spindlex/protocol/messages.py
def add_string(self, value: Union[str, bytes]) -> None:
    """Add string to message."""
    self._data.extend(write_string(value))
add_uint32(value)

Add 32-bit unsigned integer to message.

Source code in spindlex/protocol/messages.py
def add_uint32(self, value: int) -> None:
    """Add 32-bit unsigned integer to message."""
    self._data.extend(write_uint32(value))
add_uint64(value)

Add 64-bit unsigned integer to message.

Source code in spindlex/protocol/messages.py
def add_uint64(self, value: int) -> None:
    """Add 64-bit unsigned integer to message."""
    self._data.extend(write_uint64(value))
pack()

Serialize message to bytes.

Returns:

Type Description
bytes

Serialized message data including message type

Raises:

Type Description
ProtocolException

If serialization fails

Source code in spindlex/protocol/messages.py
def pack(self) -> bytes:
    """
    Serialize message to bytes.

    Returns:
        Serialized message data including message type

    Raises:
        ProtocolException: If serialization fails
    """
    try:
        # Start with message type
        result = write_byte(self.msg_type)

        # Add message-specific data
        result += bytes(self._data)

        return result
    except Exception as e:
        raise ProtocolException(f"Failed to pack message: {e}") from e
unpack(data) classmethod

Deserialize message from bytes.

Parameters:

Name Type Description Default
data bytes

Serialized message data

required

Returns:

Type Description
Message

Deserialized message instance

Raises:

Type Description
ProtocolException

If deserialization fails

Source code in spindlex/protocol/messages.py
@classmethod
def unpack(cls, data: bytes) -> "Message":
    """
    Deserialize message from bytes.

    Args:
        data: Serialized message data

    Returns:
        Deserialized message instance

    Raises:
        ProtocolException: If deserialization fails
    """
    if len(data) < 1:
        raise ProtocolException("Message data too short")

    msg_type, offset = read_byte(data, 0)

    # If called on a specific subclass, unpack that subclass directly
    if cls != Message:
        return cls._unpack_data(data[offset:])

    # Create appropriate message class based on type
    message_classes = {
        MSG_DISCONNECT: DisconnectMessage,
        MSG_IGNORE: IgnoreMessage,
        MSG_UNIMPLEMENTED: UnimplementedMessage,
        MSG_DEBUG: DebugMessage,
        MSG_SERVICE_REQUEST: ServiceRequestMessage,
        MSG_SERVICE_ACCEPT: ServiceAcceptMessage,
        MSG_KEXINIT: KexInitMessage,
        MSG_NEWKEYS: NewKeysMessage,
        MSG_USERAUTH_REQUEST: UserAuthRequestMessage,
        MSG_USERAUTH_FAILURE: UserAuthFailureMessage,
        MSG_USERAUTH_SUCCESS: UserAuthSuccessMessage,
        MSG_USERAUTH_BANNER: UserAuthBannerMessage,
        # Types 60 and 61 are INTENTIONALLY omitted from this table (see constants.py).
        # Four auth methods share type 60 and two share type 61 (RFC 4252 / 4256 /
        # draft-ietf-secsh-gss).  The wire type alone is ambiguous - each auth handler
        # MUST receive these as a raw Message and re-interpret the payload using the
        # known active auth method as context before casting to a concrete class.
        # MSG_USERAUTH_PK_OK / MSG_USERAUTH_INFO_REQUEST / MSG_USERAUTH_GSSAPI_RESPONSE
        MSG_GLOBAL_REQUEST: GlobalRequestMessage,
        MSG_REQUEST_SUCCESS: RequestSuccessMessage,
        MSG_REQUEST_FAILURE: RequestFailureMessage,
        MSG_CHANNEL_OPEN: ChannelOpenMessage,
        MSG_CHANNEL_OPEN_CONFIRMATION: ChannelOpenConfirmationMessage,
        MSG_CHANNEL_OPEN_FAILURE: ChannelOpenFailureMessage,
        MSG_CHANNEL_WINDOW_ADJUST: ChannelWindowAdjustMessage,
        MSG_CHANNEL_DATA: ChannelDataMessage,
        MSG_CHANNEL_EXTENDED_DATA: ChannelExtendedDataMessage,
        MSG_CHANNEL_EOF: ChannelEOFMessage,
        MSG_CHANNEL_CLOSE: ChannelCloseMessage,
        MSG_CHANNEL_REQUEST: ChannelRequestMessage,
        MSG_CHANNEL_SUCCESS: ChannelSuccessMessage,
        MSG_CHANNEL_FAILURE: ChannelFailureMessage,
    }

    from typing import cast

    message_class = cast(type["Message"], message_classes.get(msg_type, Message))

    if message_class == Message:
        # Generic message
        msg = Message(msg_type)
        msg._data = bytearray(data[offset:])
        return msg
    else:
        # Specific message class
        return message_class._unpack_data(data[offset:])
validate()

Validate message content.

Returns:

Type Description
bool

True if message is valid

Raises:

Type Description
ProtocolException

If message is invalid

Source code in spindlex/protocol/messages.py
def validate(self) -> bool:
    """
    Validate message content.

    Returns:
        True if message is valid

    Raises:
        ProtocolException: If message is invalid
    """
    # Base validation - subclasses can override
    if not validate_message_type(self.msg_type):
        raise ProtocolException(f"Invalid message type: {self.msg_type}")

    # Validate message size
    if len(self._data) > MAX_MESSAGE_SIZE:
        raise ProtocolException(f"Message too large: {len(self._data)} bytes")

    return True

NewKeysMessage

Bases: Message

SSH new keys message (MSG_NEWKEYS).

Source code in spindlex/protocol/messages.py
class NewKeysMessage(Message):
    """SSH new keys message (MSG_NEWKEYS)."""

    def __init__(self) -> None:
        """Initialize new keys message."""
        super().__init__(MSG_NEWKEYS)
        # No additional data for new keys message

    @classmethod
    def _unpack_data(cls, data: bytes) -> "NewKeysMessage":
        """Unpack new keys message data."""
        return cls()
Methods:
__init__()

Initialize new keys message.

Source code in spindlex/protocol/messages.py
def __init__(self) -> None:
    """Initialize new keys message."""
    super().__init__(MSG_NEWKEYS)

RequestFailureMessage

Bases: Message

SSH request failure message (MSG_REQUEST_FAILURE).

Source code in spindlex/protocol/messages.py
class RequestFailureMessage(Message):
    """SSH request failure message (MSG_REQUEST_FAILURE)."""

    def __init__(self) -> None:
        """Initialize request failure message."""
        super().__init__(MSG_REQUEST_FAILURE)
        # No additional data for request failure message

    @classmethod
    def _unpack_data(cls, data: bytes) -> "RequestFailureMessage":
        """Unpack request failure message data."""
        return cls()
Methods:
__init__()

Initialize request failure message.

Source code in spindlex/protocol/messages.py
def __init__(self) -> None:
    """Initialize request failure message."""
    super().__init__(MSG_REQUEST_FAILURE)

RequestSuccessMessage

Bases: Message

SSH request success message (MSG_REQUEST_SUCCESS).

Source code in spindlex/protocol/messages.py
class RequestSuccessMessage(Message):
    """SSH request success message (MSG_REQUEST_SUCCESS)."""

    def __init__(self, response_data: bytes = b"") -> None:
        """
        Initialize request success message.

        Args:
            response_data: Response-specific data
        """
        super().__init__(MSG_REQUEST_SUCCESS)
        self.response_data = response_data

        # Build message data
        if response_data:
            self._data.extend(response_data)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "RequestSuccessMessage":
        """Unpack request success message data."""
        return cls(data)
Methods:
__init__(response_data=b'')

Initialize request success message.

Parameters:

Name Type Description Default
response_data bytes

Response-specific data

b''
Source code in spindlex/protocol/messages.py
def __init__(self, response_data: bytes = b"") -> None:
    """
    Initialize request success message.

    Args:
        response_data: Response-specific data
    """
    super().__init__(MSG_REQUEST_SUCCESS)
    self.response_data = response_data

    # Build message data
    if response_data:
        self._data.extend(response_data)

ServiceAcceptMessage

Bases: Message

SSH service accept message (MSG_SERVICE_ACCEPT).

Source code in spindlex/protocol/messages.py
class ServiceAcceptMessage(Message):
    """SSH service accept message (MSG_SERVICE_ACCEPT)."""

    def __init__(self, service_name: str) -> None:
        """
        Initialize service accept message.

        Args:
            service_name: Name of accepted service
        """
        super().__init__(MSG_SERVICE_ACCEPT)
        self.service_name = service_name

        # Build message data
        self.add_string(service_name)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "ServiceAcceptMessage":
        """Unpack service accept message data."""
        service_name_bytes, _ = read_string(data, 0)
        service_name = service_name_bytes.decode(SSH_STRING_ENCODING)
        return cls(service_name)
Methods:
__init__(service_name)

Initialize service accept message.

Parameters:

Name Type Description Default
service_name str

Name of accepted service

required
Source code in spindlex/protocol/messages.py
def __init__(self, service_name: str) -> None:
    """
    Initialize service accept message.

    Args:
        service_name: Name of accepted service
    """
    super().__init__(MSG_SERVICE_ACCEPT)
    self.service_name = service_name

    # Build message data
    self.add_string(service_name)

ServiceRequestMessage

Bases: Message

SSH service request message (MSG_SERVICE_REQUEST).

Source code in spindlex/protocol/messages.py
class ServiceRequestMessage(Message):
    """SSH service request message (MSG_SERVICE_REQUEST)."""

    def __init__(self, service_name: str) -> None:
        """
        Initialize service request message.

        Args:
            service_name: Name of requested service
        """
        super().__init__(MSG_SERVICE_REQUEST)
        self.service_name = service_name

        # Build message data
        self.add_string(service_name)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "ServiceRequestMessage":
        """Unpack service request message data."""
        service_name_bytes, _ = read_string(data, 0)
        service_name = service_name_bytes.decode(SSH_STRING_ENCODING)
        return cls(service_name)
Methods:
__init__(service_name)

Initialize service request message.

Parameters:

Name Type Description Default
service_name str

Name of requested service

required
Source code in spindlex/protocol/messages.py
def __init__(self, service_name: str) -> None:
    """
    Initialize service request message.

    Args:
        service_name: Name of requested service
    """
    super().__init__(MSG_SERVICE_REQUEST)
    self.service_name = service_name

    # Build message data
    self.add_string(service_name)

UnimplementedMessage

Bases: Message

SSH unimplemented message (MSG_UNIMPLEMENTED).

Source code in spindlex/protocol/messages.py
class UnimplementedMessage(Message):
    """SSH unimplemented message (MSG_UNIMPLEMENTED)."""

    def __init__(self, sequence_number: int) -> None:
        """
        Initialize unimplemented message.

        Args:
            sequence_number: Sequence number of unimplemented message
        """
        super().__init__(MSG_UNIMPLEMENTED)
        self.sequence_number = sequence_number

        # Build message data
        self.add_uint32(sequence_number)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "UnimplementedMessage":
        """Unpack unimplemented message data."""
        sequence_number, _ = read_uint32(data, 0)
        return cls(sequence_number)
Methods:
__init__(sequence_number)

Initialize unimplemented message.

Parameters:

Name Type Description Default
sequence_number int

Sequence number of unimplemented message

required
Source code in spindlex/protocol/messages.py
def __init__(self, sequence_number: int) -> None:
    """
    Initialize unimplemented message.

    Args:
        sequence_number: Sequence number of unimplemented message
    """
    super().__init__(MSG_UNIMPLEMENTED)
    self.sequence_number = sequence_number

    # Build message data
    self.add_uint32(sequence_number)

UserAuthBannerMessage

Bases: Message

SSH user authentication banner message (MSG_USERAUTH_BANNER).

Source code in spindlex/protocol/messages.py
class UserAuthBannerMessage(Message):
    """SSH user authentication banner message (MSG_USERAUTH_BANNER)."""

    def __init__(self, message: str, language: str = "") -> None:
        """
        Initialize user auth banner message.

        Args:
            message: Banner message text
            language: Language tag (RFC 3066)
        """
        super().__init__(MSG_USERAUTH_BANNER)
        self.message = message
        self.language = language

        # Build message data
        self.add_string(message)
        self.add_string(language)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "UserAuthBannerMessage":
        """Unpack user auth banner message data."""
        offset = 0
        message_bytes, offset = read_string(data, offset)
        language_bytes, offset = read_string(data, offset)

        message = message_bytes.decode(SSH_STRING_ENCODING, errors="replace")
        language = language_bytes.decode(SSH_STRING_ENCODING, errors="replace")

        return cls(message, language)
Methods:
__init__(message, language='')

Initialize user auth banner message.

Parameters:

Name Type Description Default
message str

Banner message text

required
language str

Language tag (RFC 3066)

''
Source code in spindlex/protocol/messages.py
def __init__(self, message: str, language: str = "") -> None:
    """
    Initialize user auth banner message.

    Args:
        message: Banner message text
        language: Language tag (RFC 3066)
    """
    super().__init__(MSG_USERAUTH_BANNER)
    self.message = message
    self.language = language

    # Build message data
    self.add_string(message)
    self.add_string(language)

UserAuthFailureMessage

Bases: Message

SSH user authentication failure message (MSG_USERAUTH_FAILURE).

Source code in spindlex/protocol/messages.py
class UserAuthFailureMessage(Message):
    """SSH user authentication failure message (MSG_USERAUTH_FAILURE)."""

    def __init__(
        self, authentications: list[str], partial_success: bool = False
    ) -> None:
        """
        Initialize user auth failure message.

        Args:
            authentications: List of authentication methods that can continue
            partial_success: Whether partial success occurred
        """
        super().__init__(MSG_USERAUTH_FAILURE)
        self.authentications = authentications
        self.partial_success = partial_success

        # Build message data
        self.add_string(",".join(authentications))
        self.add_boolean(partial_success)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "UserAuthFailureMessage":
        """Unpack user auth failure message data."""
        offset = 0
        auth_bytes, offset = read_string(data, offset)
        partial_success, offset = read_boolean(data, offset)

        auth_list = auth_bytes.decode(SSH_STRING_ENCODING)
        authentications = [
            auth.strip() for auth in auth_list.split(",") if auth.strip()
        ]

        return cls(authentications, partial_success)
Methods:
__init__(authentications, partial_success=False)

Initialize user auth failure message.

Parameters:

Name Type Description Default
authentications list[str]

List of authentication methods that can continue

required
partial_success bool

Whether partial success occurred

False
Source code in spindlex/protocol/messages.py
def __init__(
    self, authentications: list[str], partial_success: bool = False
) -> None:
    """
    Initialize user auth failure message.

    Args:
        authentications: List of authentication methods that can continue
        partial_success: Whether partial success occurred
    """
    super().__init__(MSG_USERAUTH_FAILURE)
    self.authentications = authentications
    self.partial_success = partial_success

    # Build message data
    self.add_string(",".join(authentications))
    self.add_boolean(partial_success)

UserAuthInfoRequestMessage

Bases: Message

SSH user authentication info request message (MSG_USERAUTH_INFO_REQUEST).

Source code in spindlex/protocol/messages.py
class UserAuthInfoRequestMessage(Message):
    """SSH user authentication info request message (MSG_USERAUTH_INFO_REQUEST)."""

    def __init__(
        self, name: str, instruction: str, language: str, prompts: list
    ) -> None:
        """
        Initialize user auth info request message.

        Args:
            name: Name of the authentication method
            instruction: Instructions for the user
            language: Language tag
            prompts: List of (prompt, echo) tuples
        """
        super().__init__(MSG_USERAUTH_INFO_REQUEST)
        self.name = name
        self.instruction = instruction
        self.language = language
        self.prompts = prompts

        # Build message data
        self.add_string(name)
        self.add_string(instruction)
        self.add_string(language)
        self.add_uint32(len(prompts))
        for prompt, echo in prompts:
            self.add_string(prompt)
            self.add_boolean(echo)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "UserAuthInfoRequestMessage":
        """Unpack user auth info request message data."""
        offset = 0
        name_bytes, offset = read_string(data, offset)
        instruction_bytes, offset = read_string(data, offset)
        language_bytes, offset = read_string(data, offset)
        num_prompts, offset = read_uint32(data, offset)

        name = name_bytes.decode(SSH_STRING_ENCODING, errors="replace")
        instruction = instruction_bytes.decode(SSH_STRING_ENCODING, errors="replace")
        language = language_bytes.decode(SSH_STRING_ENCODING, errors="replace")

        prompts = []
        for _ in range(num_prompts):
            prompt_bytes, offset = read_string(data, offset)
            echo, offset = read_boolean(data, offset)
            prompt_text = prompt_bytes.decode(SSH_STRING_ENCODING, errors="replace")
            prompts.append((prompt_text, echo))

        return cls(name, instruction, language, prompts)
Methods:
__init__(name, instruction, language, prompts)

Initialize user auth info request message.

Parameters:

Name Type Description Default
name str

Name of the authentication method

required
instruction str

Instructions for the user

required
language str

Language tag

required
prompts list

List of (prompt, echo) tuples

required
Source code in spindlex/protocol/messages.py
def __init__(
    self, name: str, instruction: str, language: str, prompts: list
) -> None:
    """
    Initialize user auth info request message.

    Args:
        name: Name of the authentication method
        instruction: Instructions for the user
        language: Language tag
        prompts: List of (prompt, echo) tuples
    """
    super().__init__(MSG_USERAUTH_INFO_REQUEST)
    self.name = name
    self.instruction = instruction
    self.language = language
    self.prompts = prompts

    # Build message data
    self.add_string(name)
    self.add_string(instruction)
    self.add_string(language)
    self.add_uint32(len(prompts))
    for prompt, echo in prompts:
        self.add_string(prompt)
        self.add_boolean(echo)

UserAuthInfoResponseMessage

Bases: Message

SSH user authentication info response message (MSG_USERAUTH_INFO_RESPONSE).

Source code in spindlex/protocol/messages.py
class UserAuthInfoResponseMessage(Message):
    """SSH user authentication info response message (MSG_USERAUTH_INFO_RESPONSE)."""

    def __init__(self, responses: list[str]) -> None:
        """
        Initialize user auth info response message.

        Args:
            responses: List of responses to prompts
        """
        super().__init__(MSG_USERAUTH_INFO_RESPONSE)
        self.responses = responses

        # Build message data
        self.add_uint32(len(responses))
        for response in responses:
            self.add_string(response)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "UserAuthInfoResponseMessage":
        """Unpack user auth info response message data."""
        offset = 0
        num_responses, offset = read_uint32(data, offset)

        responses = []
        for _ in range(num_responses):
            response_bytes, offset = read_string(data, offset)
            response = response_bytes.decode(SSH_STRING_ENCODING, errors="replace")
            responses.append(response)

        return cls(responses)
Methods:
__init__(responses)

Initialize user auth info response message.

Parameters:

Name Type Description Default
responses list[str]

List of responses to prompts

required
Source code in spindlex/protocol/messages.py
def __init__(self, responses: list[str]) -> None:
    """
    Initialize user auth info response message.

    Args:
        responses: List of responses to prompts
    """
    super().__init__(MSG_USERAUTH_INFO_RESPONSE)
    self.responses = responses

    # Build message data
    self.add_uint32(len(responses))
    for response in responses:
        self.add_string(response)

UserAuthPKOKMessage

Bases: Message

SSH user authentication public key OK message (MSG_USERAUTH_PK_OK).

Source code in spindlex/protocol/messages.py
class UserAuthPKOKMessage(Message):
    """SSH user authentication public key OK message (MSG_USERAUTH_PK_OK)."""

    def __init__(self, algorithm: str, public_key: bytes) -> None:
        """
        Initialize user auth PK OK message.

        Args:
            algorithm: Public key algorithm name
            public_key: Public key blob
        """
        super().__init__(MSG_USERAUTH_PK_OK)
        self.algorithm = algorithm
        self.public_key = public_key

        # Build message data
        self.add_string(algorithm)
        self.add_string(public_key)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "UserAuthPKOKMessage":
        """Unpack user auth PK OK message data."""
        offset = 0
        algorithm_bytes, offset = read_string(data, offset)
        public_key, offset = read_string(data, offset)

        algorithm = algorithm_bytes.decode(SSH_STRING_ENCODING)
        return cls(algorithm, public_key)
Methods:
__init__(algorithm, public_key)

Initialize user auth PK OK message.

Parameters:

Name Type Description Default
algorithm str

Public key algorithm name

required
public_key bytes

Public key blob

required
Source code in spindlex/protocol/messages.py
def __init__(self, algorithm: str, public_key: bytes) -> None:
    """
    Initialize user auth PK OK message.

    Args:
        algorithm: Public key algorithm name
        public_key: Public key blob
    """
    super().__init__(MSG_USERAUTH_PK_OK)
    self.algorithm = algorithm
    self.public_key = public_key

    # Build message data
    self.add_string(algorithm)
    self.add_string(public_key)

UserAuthRequestMessage

Bases: Message

SSH user authentication request message (MSG_USERAUTH_REQUEST).

Source code in spindlex/protocol/messages.py
class UserAuthRequestMessage(Message):
    """SSH user authentication request message (MSG_USERAUTH_REQUEST)."""

    def __init__(
        self, username: str, service: str, method: str, method_data: bytes = b""
    ) -> None:
        """
        Initialize user auth request message.

        Args:
            username: Username for authentication
            service: Service name
            method: Authentication method
            method_data: Method-specific data
        """
        super().__init__(MSG_USERAUTH_REQUEST)
        self.username = username
        self.service = service
        self.method = method
        self.method_data = method_data

        # Build message data
        self.add_string(username)
        self.add_string(service)
        self.add_string(method)
        if method_data:
            self._data.extend(method_data)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "UserAuthRequestMessage":
        """Unpack user auth request message data."""
        offset = 0
        username_bytes, offset = read_string(data, offset)
        service_bytes, offset = read_string(data, offset)
        method_bytes, offset = read_string(data, offset)

        username = username_bytes.decode(SSH_STRING_ENCODING)
        service = service_bytes.decode(SSH_STRING_ENCODING)
        method = method_bytes.decode(SSH_STRING_ENCODING)
        method_data = data[offset:] if offset < len(data) else b""

        return cls(username, service, method, method_data)

    def validate(self) -> bool:
        """
        Validate user auth request message content.

        Returns:
            True if message is valid

        Raises:
            ProtocolException: If message is invalid
        """
        super().validate()

        # Validate required fields
        if not self.username:
            raise ProtocolException("Username cannot be empty")
        if not self.service:
            raise ProtocolException("Service name cannot be empty")
        if not self.method:
            raise ProtocolException("Authentication method cannot be empty")

        # Validate service name
        if self.service not in [SERVICE_USERAUTH, SERVICE_CONNECTION]:
            raise ProtocolException(f"Invalid service name: {self.service}")

        # Validate authentication method
        valid_methods = [
            AUTH_PASSWORD,
            AUTH_PUBLICKEY,
            AUTH_HOSTBASED,
            AUTH_KEYBOARD_INTERACTIVE,
            AUTH_GSSAPI_WITH_MIC,
        ]
        if self.method not in valid_methods:
            raise ProtocolException(f"Invalid authentication method: {self.method}")

        return True
Methods:
__init__(username, service, method, method_data=b'')

Initialize user auth request message.

Parameters:

Name Type Description Default
username str

Username for authentication

required
service str

Service name

required
method str

Authentication method

required
method_data bytes

Method-specific data

b''
Source code in spindlex/protocol/messages.py
def __init__(
    self, username: str, service: str, method: str, method_data: bytes = b""
) -> None:
    """
    Initialize user auth request message.

    Args:
        username: Username for authentication
        service: Service name
        method: Authentication method
        method_data: Method-specific data
    """
    super().__init__(MSG_USERAUTH_REQUEST)
    self.username = username
    self.service = service
    self.method = method
    self.method_data = method_data

    # Build message data
    self.add_string(username)
    self.add_string(service)
    self.add_string(method)
    if method_data:
        self._data.extend(method_data)
validate()

Validate user auth request message content.

Returns:

Type Description
bool

True if message is valid

Raises:

Type Description
ProtocolException

If message is invalid

Source code in spindlex/protocol/messages.py
def validate(self) -> bool:
    """
    Validate user auth request message content.

    Returns:
        True if message is valid

    Raises:
        ProtocolException: If message is invalid
    """
    super().validate()

    # Validate required fields
    if not self.username:
        raise ProtocolException("Username cannot be empty")
    if not self.service:
        raise ProtocolException("Service name cannot be empty")
    if not self.method:
        raise ProtocolException("Authentication method cannot be empty")

    # Validate service name
    if self.service not in [SERVICE_USERAUTH, SERVICE_CONNECTION]:
        raise ProtocolException(f"Invalid service name: {self.service}")

    # Validate authentication method
    valid_methods = [
        AUTH_PASSWORD,
        AUTH_PUBLICKEY,
        AUTH_HOSTBASED,
        AUTH_KEYBOARD_INTERACTIVE,
        AUTH_GSSAPI_WITH_MIC,
    ]
    if self.method not in valid_methods:
        raise ProtocolException(f"Invalid authentication method: {self.method}")

    return True

UserAuthSuccessMessage

Bases: Message

SSH user authentication success message (MSG_USERAUTH_SUCCESS).

Source code in spindlex/protocol/messages.py
class UserAuthSuccessMessage(Message):
    """SSH user authentication success message (MSG_USERAUTH_SUCCESS)."""

    def __init__(self) -> None:
        """Initialize user auth success message."""
        super().__init__(MSG_USERAUTH_SUCCESS)
        # No additional data for success message

    @classmethod
    def _unpack_data(cls, data: bytes) -> "UserAuthSuccessMessage":
        """Unpack user auth success message data."""
        return cls()
Methods:
__init__()

Initialize user auth success message.

Source code in spindlex/protocol/messages.py
def __init__(self) -> None:
    """Initialize user auth success message."""
    super().__init__(MSG_USERAUTH_SUCCESS)

Functions:

create_version_string(software_name='spindlex', software_version=None)

Create SSH version string for this implementation.

Parameters:

Name Type Description Default
software_name str

Name of SSH software

'spindlex'
software_version Optional[str]

Version of SSH software (defaults to package version)

None

Returns:

Type Description
str

Complete SSH version string

Source code in spindlex/protocol/constants.py
def create_version_string(
    software_name: str = "spindlex", software_version: Optional[str] = None
) -> str:
    """
    Create SSH version string for this implementation.

    Args:
        software_name: Name of SSH software
        software_version: Version of SSH software (defaults to package version)

    Returns:
        Complete SSH version string
    """
    if software_version is None:
        software_version = __version__
    return f"SSH-{SSH_PROTOCOL_VERSION_2}-{software_name}_{software_version}"

get_message_name(msg_type)

Get human-readable name for message type.

Parameters:

Name Type Description Default
msg_type int

Message type code

required

Returns:

Type Description
str

Message type name or "UNKNOWN" if not recognized

Source code in spindlex/protocol/constants.py
def get_message_name(msg_type: int) -> str:
    """
    Get human-readable name for message type.

    Args:
        msg_type: Message type code

    Returns:
        Message type name or "UNKNOWN" if not recognized
    """
    message_names = {
        MSG_DISCONNECT: "MSG_DISCONNECT",
        MSG_IGNORE: "MSG_IGNORE",
        MSG_UNIMPLEMENTED: "MSG_UNIMPLEMENTED",
        MSG_DEBUG: "MSG_DEBUG",
        MSG_SERVICE_REQUEST: "MSG_SERVICE_REQUEST",
        MSG_SERVICE_ACCEPT: "MSG_SERVICE_ACCEPT",
        MSG_KEXINIT: "MSG_KEXINIT",
        MSG_NEWKEYS: "MSG_NEWKEYS",
        MSG_KEXDH_INIT: "MSG_KEXDH_INIT",
        MSG_KEXDH_REPLY: "MSG_KEXDH_REPLY",
        MSG_KEX_ECDH_INIT: "MSG_KEX_ECDH_INIT",
        MSG_KEX_ECDH_REPLY: "MSG_KEX_ECDH_REPLY",
        MSG_USERAUTH_REQUEST: "MSG_USERAUTH_REQUEST",
        MSG_USERAUTH_FAILURE: "MSG_USERAUTH_FAILURE",
        MSG_USERAUTH_SUCCESS: "MSG_USERAUTH_SUCCESS",
        MSG_USERAUTH_BANNER: "MSG_USERAUTH_BANNER",
        MSG_USERAUTH_PK_OK: "MSG_USERAUTH_PK_OK",
        MSG_GLOBAL_REQUEST: "MSG_GLOBAL_REQUEST",
        MSG_REQUEST_SUCCESS: "MSG_REQUEST_SUCCESS",
        MSG_REQUEST_FAILURE: "MSG_REQUEST_FAILURE",
        MSG_CHANNEL_OPEN: "MSG_CHANNEL_OPEN",
        MSG_CHANNEL_OPEN_CONFIRMATION: "MSG_CHANNEL_OPEN_CONFIRMATION",
        MSG_CHANNEL_OPEN_FAILURE: "MSG_CHANNEL_OPEN_FAILURE",
        MSG_CHANNEL_WINDOW_ADJUST: "MSG_CHANNEL_WINDOW_ADJUST",
        MSG_CHANNEL_DATA: "MSG_CHANNEL_DATA",
        MSG_CHANNEL_EXTENDED_DATA: "MSG_CHANNEL_EXTENDED_DATA",
        MSG_CHANNEL_EOF: "MSG_CHANNEL_EOF",
        MSG_CHANNEL_CLOSE: "MSG_CHANNEL_CLOSE",
        MSG_CHANNEL_REQUEST: "MSG_CHANNEL_REQUEST",
        MSG_CHANNEL_SUCCESS: "MSG_CHANNEL_SUCCESS",
        MSG_CHANNEL_FAILURE: "MSG_CHANNEL_FAILURE",
    }

    return message_names.get(msg_type, f"UNKNOWN({msg_type})")

is_supported_version(protocol_version)

Check if protocol version is supported.

Parameters:

Name Type Description Default
protocol_version str

Protocol version string (e.g., "2.0")

required

Returns:

Type Description
bool

True if version is supported, False otherwise

Source code in spindlex/protocol/constants.py
def is_supported_version(protocol_version: str) -> bool:
    """
    Check if protocol version is supported.

    Args:
        protocol_version: Protocol version string (e.g., "2.0")

    Returns:
        True if version is supported, False otherwise
    """
    return protocol_version in SUPPORTED_PROTOCOL_VERSIONS

parse_version_string(version_line)

Parse SSH version string.

Parameters:

Name Type Description Default
version_line str

SSH version line (e.g., "SSH-2.0-OpenSSH_8.0")

required

Returns:

Type Description
tuple[str, str]

Tuple of (protocol_version, software_version)

Raises:

Type Description
ValueError

If version string is invalid

Source code in spindlex/protocol/constants.py
def parse_version_string(version_line: str) -> tuple[str, str]:
    """
    Parse SSH version string.

    Args:
        version_line: SSH version line (e.g., "SSH-2.0-OpenSSH_8.0")

    Returns:
        Tuple of (protocol_version, software_version)

    Raises:
        ValueError: If version string is invalid
    """
    if not version_line.startswith("SSH-"):
        raise ValueError(f"Invalid SSH version string: {version_line}")

    parts = version_line.split("-", 2)
    if len(parts) < 2:
        raise ValueError(f"Invalid SSH version string format: {version_line}")

    protocol_version = parts[1]
    software_version = parts[2] if len(parts) > 2 else ""

    return protocol_version, software_version

validate_message_type(msg_type)

Validate SSH message type.

Parameters:

Name Type Description Default
msg_type int

Message type code

required

Returns:

Type Description
bool

True if message type is valid, False otherwise

Source code in spindlex/protocol/constants.py
def validate_message_type(msg_type: int) -> bool:
    """
    Validate SSH message type.

    Args:
        msg_type: Message type code

    Returns:
        True if message type is valid, False otherwise
    """
    # Valid message type ranges according to RFC 4250
    return (
        (1 <= msg_type <= 19)  # Transport layer generic
        or (20 <= msg_type <= 29)  # Algorithm negotiation
        or (30 <= msg_type <= 41)  # Key exchange method specific
        or (50 <= msg_type <= 59)  # User authentication generic
        or (60 <= msg_type <= 79)  # User authentication method specific
        or (80 <= msg_type <= 89)  # Connection protocol generic
        or (90 <= msg_type <= 127)  # Channel related messages
        or (128 <= msg_type <= 191)  # Reserved for client protocols
        or (192 <= msg_type <= 255)  # Local extensions
    )

SFTP Messages

spindlex.protocol.sftp_messages

SFTP Protocol Message Implementation

Implements SFTP protocol message parsing, serialization, and validation according to RFC 4254 and draft-ietf-secsh-filexfer specifications.

Classes

SFTPAttributes

SFTP file attributes.

Represents file/directory attributes in SFTP protocol.

Source code in spindlex/protocol/sftp_messages.py
class SFTPAttributes:
    """
    SFTP file attributes.

    Represents file/directory attributes in SFTP protocol.
    """

    def __init__(self) -> None:
        """Initialize empty attributes."""
        self.flags = 0
        self.size: Optional[int] = None
        self.uid: Optional[int] = None
        self.gid: Optional[int] = None
        self.permissions: Optional[int] = None
        self.atime: Optional[int] = None
        self.mtime: Optional[int] = None
        self.extended: dict[str, str] = {}

    @property
    def st_mode(self) -> Optional[int]:
        """Get file mode (permissions)."""
        return self.permissions

    @st_mode.setter
    def st_mode(self, value: int) -> None:
        """Set file mode (permissions)."""
        self.permissions = value
        if value is not None:
            self.flags |= SSH_FILEXFER_ATTR_PERMISSIONS

    @property
    def st_size(self) -> Optional[int]:
        """Get file size."""
        return self.size

    @st_size.setter
    def st_size(self, value: int) -> None:
        """Set file size."""
        self.size = value
        if value is not None:
            self.flags |= SSH_FILEXFER_ATTR_SIZE

    @property
    def st_uid(self) -> Optional[int]:
        """Get user ID."""
        return self.uid

    @st_uid.setter
    def st_uid(self, value: int) -> None:
        """Set user ID."""
        self.uid = value
        if value is not None:
            self.flags |= SSH_FILEXFER_ATTR_UIDGID

    @property
    def st_gid(self) -> Optional[int]:
        """Get group ID."""
        return self.gid

    @st_gid.setter
    def st_gid(self, value: int) -> None:
        """Set group ID."""
        self.gid = value
        if value is not None:
            self.flags |= SSH_FILEXFER_ATTR_UIDGID

    @property
    def st_atime(self) -> Optional[int]:
        """Get access time."""
        return self.atime

    @st_atime.setter
    def st_atime(self, value: int) -> None:
        """Set access time."""
        self.atime = value
        if value is not None:
            self.flags |= SSH_FILEXFER_ATTR_ACMODTIME

    @property
    def st_mtime(self) -> Optional[int]:
        """Get modification time."""
        return self.mtime

    @st_mtime.setter
    def st_mtime(self, value: int) -> None:
        """Set modification time."""
        self.mtime = value
        if value is not None:
            self.flags |= SSH_FILEXFER_ATTR_ACMODTIME

    def pack(self) -> bytes:
        """
        Serialize attributes to bytes.

        Returns:
            Serialized attributes
        """
        data = write_uint32(self.flags)

        if self.flags & SSH_FILEXFER_ATTR_SIZE:
            data += write_uint64(self.size or 0)

        if self.flags & SSH_FILEXFER_ATTR_UIDGID:
            data += write_uint32(self.uid or 0)
            data += write_uint32(self.gid or 0)

        if self.flags & SSH_FILEXFER_ATTR_PERMISSIONS:
            data += write_uint32(self.permissions or 0)

        if self.flags & SSH_FILEXFER_ATTR_ACMODTIME:
            data += write_uint32(self.atime or 0)
            data += write_uint32(self.mtime or 0)

        if self.flags & SSH_FILEXFER_ATTR_EXTENDED:
            data += write_uint32(len(self.extended))
            for key, value in self.extended.items():
                data += write_string(key)
                data += write_string(value)

        return data

    @classmethod
    def unpack(cls, data: bytes, offset: int = 0) -> tuple["SFTPAttributes", int]:
        """
        Deserialize attributes from bytes.

        Args:
            data: Serialized attributes data
            offset: Starting offset in data

        Returns:
            Tuple of (attributes, new_offset)
        """
        attrs = cls()

        attrs.flags, offset = read_uint32(data, offset)

        if attrs.flags & SSH_FILEXFER_ATTR_SIZE:
            attrs.size, offset = read_uint64(data, offset)

        if attrs.flags & SSH_FILEXFER_ATTR_UIDGID:
            attrs.uid, offset = read_uint32(data, offset)
            attrs.gid, offset = read_uint32(data, offset)

        if attrs.flags & SSH_FILEXFER_ATTR_PERMISSIONS:
            attrs.permissions, offset = read_uint32(data, offset)

        if attrs.flags & SSH_FILEXFER_ATTR_ACMODTIME:
            attrs.atime, offset = read_uint32(data, offset)
            attrs.mtime, offset = read_uint32(data, offset)

        if attrs.flags & SSH_FILEXFER_ATTR_EXTENDED:
            count, offset = read_uint32(data, offset)
            for _ in range(count):
                key_bytes, offset = read_string(data, offset)
                value_bytes, offset = read_string(data, offset)
                key = key_bytes.decode("utf-8")
                value = value_bytes.decode("utf-8")
                attrs.extended[key] = value

        return attrs, offset

    def is_dir(self) -> bool:
        """Check if attributes represent a directory."""
        if self.permissions is None:
            return False
        return stat.S_ISDIR(self.permissions)

    def is_file(self) -> bool:
        """Check if attributes represent a regular file."""
        if self.permissions is None:
            return False
        return stat.S_ISREG(self.permissions)

    def is_symlink(self) -> bool:
        """Check if attributes represent a symbolic link."""
        if self.permissions is None:
            return False
        return stat.S_ISLNK(self.permissions)
Attributes
st_atime property writable

Get access time.

st_gid property writable

Get group ID.

st_mode property writable

Get file mode (permissions).

st_mtime property writable

Get modification time.

st_size property writable

Get file size.

st_uid property writable

Get user ID.

Methods:
__init__()

Initialize empty attributes.

Source code in spindlex/protocol/sftp_messages.py
def __init__(self) -> None:
    """Initialize empty attributes."""
    self.flags = 0
    self.size: Optional[int] = None
    self.uid: Optional[int] = None
    self.gid: Optional[int] = None
    self.permissions: Optional[int] = None
    self.atime: Optional[int] = None
    self.mtime: Optional[int] = None
    self.extended: dict[str, str] = {}
is_dir()

Check if attributes represent a directory.

Source code in spindlex/protocol/sftp_messages.py
def is_dir(self) -> bool:
    """Check if attributes represent a directory."""
    if self.permissions is None:
        return False
    return stat.S_ISDIR(self.permissions)
is_file()

Check if attributes represent a regular file.

Source code in spindlex/protocol/sftp_messages.py
def is_file(self) -> bool:
    """Check if attributes represent a regular file."""
    if self.permissions is None:
        return False
    return stat.S_ISREG(self.permissions)

Check if attributes represent a symbolic link.

Source code in spindlex/protocol/sftp_messages.py
def is_symlink(self) -> bool:
    """Check if attributes represent a symbolic link."""
    if self.permissions is None:
        return False
    return stat.S_ISLNK(self.permissions)
pack()

Serialize attributes to bytes.

Returns:

Type Description
bytes

Serialized attributes

Source code in spindlex/protocol/sftp_messages.py
def pack(self) -> bytes:
    """
    Serialize attributes to bytes.

    Returns:
        Serialized attributes
    """
    data = write_uint32(self.flags)

    if self.flags & SSH_FILEXFER_ATTR_SIZE:
        data += write_uint64(self.size or 0)

    if self.flags & SSH_FILEXFER_ATTR_UIDGID:
        data += write_uint32(self.uid or 0)
        data += write_uint32(self.gid or 0)

    if self.flags & SSH_FILEXFER_ATTR_PERMISSIONS:
        data += write_uint32(self.permissions or 0)

    if self.flags & SSH_FILEXFER_ATTR_ACMODTIME:
        data += write_uint32(self.atime or 0)
        data += write_uint32(self.mtime or 0)

    if self.flags & SSH_FILEXFER_ATTR_EXTENDED:
        data += write_uint32(len(self.extended))
        for key, value in self.extended.items():
            data += write_string(key)
            data += write_string(value)

    return data
unpack(data, offset=0) classmethod

Deserialize attributes from bytes.

Parameters:

Name Type Description Default
data bytes

Serialized attributes data

required
offset int

Starting offset in data

0

Returns:

Type Description
tuple[SFTPAttributes, int]

Tuple of (attributes, new_offset)

Source code in spindlex/protocol/sftp_messages.py
@classmethod
def unpack(cls, data: bytes, offset: int = 0) -> tuple["SFTPAttributes", int]:
    """
    Deserialize attributes from bytes.

    Args:
        data: Serialized attributes data
        offset: Starting offset in data

    Returns:
        Tuple of (attributes, new_offset)
    """
    attrs = cls()

    attrs.flags, offset = read_uint32(data, offset)

    if attrs.flags & SSH_FILEXFER_ATTR_SIZE:
        attrs.size, offset = read_uint64(data, offset)

    if attrs.flags & SSH_FILEXFER_ATTR_UIDGID:
        attrs.uid, offset = read_uint32(data, offset)
        attrs.gid, offset = read_uint32(data, offset)

    if attrs.flags & SSH_FILEXFER_ATTR_PERMISSIONS:
        attrs.permissions, offset = read_uint32(data, offset)

    if attrs.flags & SSH_FILEXFER_ATTR_ACMODTIME:
        attrs.atime, offset = read_uint32(data, offset)
        attrs.mtime, offset = read_uint32(data, offset)

    if attrs.flags & SSH_FILEXFER_ATTR_EXTENDED:
        count, offset = read_uint32(data, offset)
        for _ in range(count):
            key_bytes, offset = read_string(data, offset)
            value_bytes, offset = read_string(data, offset)
            key = key_bytes.decode("utf-8")
            value = value_bytes.decode("utf-8")
            attrs.extended[key] = value

    return attrs, offset

SFTPAttrsMessage

Bases: SFTPMessage

SFTP attributes message (SSH_FXP_ATTRS).

Source code in spindlex/protocol/sftp_messages.py
class SFTPAttrsMessage(SFTPMessage):
    """SFTP attributes message (SSH_FXP_ATTRS)."""

    def __init__(self, request_id: int, attrs: SFTPAttributes) -> None:
        """
        Initialize SFTP attributes message.

        Args:
            request_id: Request ID this responds to
            attrs: File attributes
        """
        super().__init__(SSH_FXP_ATTRS, request_id)
        self.attrs = attrs

        # Build message data
        self._data.extend(attrs.pack())

    @classmethod
    def _unpack_data(cls, data: bytes) -> "SFTPAttrsMessage":
        """Unpack SFTP attributes message data."""
        offset = 0
        request_id, offset = read_uint32(data, offset)
        attrs, offset = SFTPAttributes.unpack(data, offset)

        return cls(request_id, attrs)
Methods:
__init__(request_id, attrs)

Initialize SFTP attributes message.

Parameters:

Name Type Description Default
request_id int

Request ID this responds to

required
attrs SFTPAttributes

File attributes

required
Source code in spindlex/protocol/sftp_messages.py
def __init__(self, request_id: int, attrs: SFTPAttributes) -> None:
    """
    Initialize SFTP attributes message.

    Args:
        request_id: Request ID this responds to
        attrs: File attributes
    """
    super().__init__(SSH_FXP_ATTRS, request_id)
    self.attrs = attrs

    # Build message data
    self._data.extend(attrs.pack())

SFTPCloseMessage

Bases: SFTPMessage

SFTP close file message (SSH_FXP_CLOSE).

Source code in spindlex/protocol/sftp_messages.py
class SFTPCloseMessage(SFTPMessage):
    """SFTP close file message (SSH_FXP_CLOSE)."""

    def __init__(self, request_id: int, handle: bytes) -> None:
        """
        Initialize SFTP close message.

        Args:
            request_id: Request ID
            handle: File handle to close
        """
        super().__init__(SSH_FXP_CLOSE, request_id)
        self.handle = handle

        # Build message data
        self.add_string(handle)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "SFTPCloseMessage":
        """Unpack SFTP close message data."""
        offset = 0
        request_id, offset = read_uint32(data, offset)
        handle, offset = read_string(data, offset)

        return cls(request_id, handle)
Methods:
__init__(request_id, handle)

Initialize SFTP close message.

Parameters:

Name Type Description Default
request_id int

Request ID

required
handle bytes

File handle to close

required
Source code in spindlex/protocol/sftp_messages.py
def __init__(self, request_id: int, handle: bytes) -> None:
    """
    Initialize SFTP close message.

    Args:
        request_id: Request ID
        handle: File handle to close
    """
    super().__init__(SSH_FXP_CLOSE, request_id)
    self.handle = handle

    # Build message data
    self.add_string(handle)

SFTPDataMessage

Bases: SFTPMessage

SFTP data message (SSH_FXP_DATA).

Source code in spindlex/protocol/sftp_messages.py
class SFTPDataMessage(SFTPMessage):
    """SFTP data message (SSH_FXP_DATA)."""

    def __init__(self, request_id: int, data: bytes) -> None:
        """
        Initialize SFTP data message.

        Args:
            request_id: Request ID this data responds to
            data: File data
        """
        super().__init__(SSH_FXP_DATA, request_id)
        self.data = data

        # Build message data
        self.add_string(data)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "SFTPDataMessage":
        """Unpack SFTP data message data."""
        offset = 0
        request_id, offset = read_uint32(data, offset)
        file_data, offset = read_string(data, offset)

        return cls(request_id, file_data)
Methods:
__init__(request_id, data)

Initialize SFTP data message.

Parameters:

Name Type Description Default
request_id int

Request ID this data responds to

required
data bytes

File data

required
Source code in spindlex/protocol/sftp_messages.py
def __init__(self, request_id: int, data: bytes) -> None:
    """
    Initialize SFTP data message.

    Args:
        request_id: Request ID this data responds to
        data: File data
    """
    super().__init__(SSH_FXP_DATA, request_id)
    self.data = data

    # Build message data
    self.add_string(data)

SFTPExtendedMessage

Bases: SFTPMessage

SFTP extended message (SSH_FXP_EXTENDED).

Source code in spindlex/protocol/sftp_messages.py
class SFTPExtendedMessage(SFTPMessage):
    """SFTP extended message (SSH_FXP_EXTENDED)."""

    def __init__(
        self, request_id: int, extended_request: str, extended_data: bytes = b""
    ) -> None:
        """
        Initialize SFTP extended message.

        Args:
            request_id: Request ID
            extended_request: Extended request name
            extended_data: Extended request data
        """
        super().__init__(SSH_FXP_EXTENDED, request_id)
        self.extended_request = extended_request
        self.extended_data = extended_data

        # Build message data
        self.add_string(extended_request)
        if extended_data:
            self._data.extend(extended_data)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "SFTPExtendedMessage":
        """Unpack SFTP extended message data."""
        offset = 0
        request_id, offset = read_uint32(data, offset)
        extended_request_bytes, offset = read_string(data, offset)

        extended_request = extended_request_bytes.decode("utf-8")
        extended_data = data[offset:] if offset < len(data) else b""

        return cls(request_id, extended_request, extended_data)
Methods:
__init__(request_id, extended_request, extended_data=b'')

Initialize SFTP extended message.

Parameters:

Name Type Description Default
request_id int

Request ID

required
extended_request str

Extended request name

required
extended_data bytes

Extended request data

b''
Source code in spindlex/protocol/sftp_messages.py
def __init__(
    self, request_id: int, extended_request: str, extended_data: bytes = b""
) -> None:
    """
    Initialize SFTP extended message.

    Args:
        request_id: Request ID
        extended_request: Extended request name
        extended_data: Extended request data
    """
    super().__init__(SSH_FXP_EXTENDED, request_id)
    self.extended_request = extended_request
    self.extended_data = extended_data

    # Build message data
    self.add_string(extended_request)
    if extended_data:
        self._data.extend(extended_data)

SFTPExtendedReplyMessage

Bases: SFTPMessage

SFTP extended reply message (SSH_FXP_EXTENDED_REPLY).

Source code in spindlex/protocol/sftp_messages.py
class SFTPExtendedReplyMessage(SFTPMessage):
    """SFTP extended reply message (SSH_FXP_EXTENDED_REPLY)."""

    def __init__(self, request_id: int, extended_data: bytes = b"") -> None:
        """
        Initialize SFTP extended reply message.

        Args:
            request_id: Request ID this reply responds to
            extended_data: Extended reply data
        """
        super().__init__(SSH_FXP_EXTENDED_REPLY, request_id)
        self.extended_data = extended_data

        # Build message data
        if extended_data:
            self._data.extend(extended_data)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "SFTPExtendedReplyMessage":
        """Unpack SFTP extended reply message data."""
        offset = 0
        request_id, offset = read_uint32(data, offset)
        extended_data = data[offset:] if offset < len(data) else b""

        return cls(request_id, extended_data)
Methods:
__init__(request_id, extended_data=b'')

Initialize SFTP extended reply message.

Parameters:

Name Type Description Default
request_id int

Request ID this reply responds to

required
extended_data bytes

Extended reply data

b''
Source code in spindlex/protocol/sftp_messages.py
def __init__(self, request_id: int, extended_data: bytes = b"") -> None:
    """
    Initialize SFTP extended reply message.

    Args:
        request_id: Request ID this reply responds to
        extended_data: Extended reply data
    """
    super().__init__(SSH_FXP_EXTENDED_REPLY, request_id)
    self.extended_data = extended_data

    # Build message data
    if extended_data:
        self._data.extend(extended_data)

SFTPFStatMessage

Bases: SFTPMessage

SFTP fstat message (SSH_FXP_FSTAT).

Source code in spindlex/protocol/sftp_messages.py
class SFTPFStatMessage(SFTPMessage):
    """SFTP fstat message (SSH_FXP_FSTAT)."""

    def __init__(self, request_id: int, handle: bytes) -> None:
        """
        Initialize SFTP fstat message.

        Args:
            request_id: Request ID
            handle: File handle to get attributes for
        """
        super().__init__(SSH_FXP_FSTAT, request_id)
        self.handle = handle

        # Build message data
        self.add_string(handle)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "SFTPFStatMessage":
        """Unpack SFTP fstat message data."""
        offset = 0
        request_id, offset = read_uint32(data, offset)
        handle, offset = read_string(data, offset)

        return cls(request_id, handle)
Methods:
__init__(request_id, handle)

Initialize SFTP fstat message.

Parameters:

Name Type Description Default
request_id int

Request ID

required
handle bytes

File handle to get attributes for

required
Source code in spindlex/protocol/sftp_messages.py
def __init__(self, request_id: int, handle: bytes) -> None:
    """
    Initialize SFTP fstat message.

    Args:
        request_id: Request ID
        handle: File handle to get attributes for
    """
    super().__init__(SSH_FXP_FSTAT, request_id)
    self.handle = handle

    # Build message data
    self.add_string(handle)

SFTPHandleMessage

Bases: SFTPMessage

SFTP handle message (SSH_FXP_HANDLE).

Source code in spindlex/protocol/sftp_messages.py
class SFTPHandleMessage(SFTPMessage):
    """SFTP handle message (SSH_FXP_HANDLE)."""

    def __init__(self, request_id: int, handle: bytes) -> None:
        """
        Initialize SFTP handle message.

        Args:
            request_id: Request ID this handle responds to
            handle: File handle
        """
        super().__init__(SSH_FXP_HANDLE, request_id)
        self.handle = handle

        # Build message data
        self.add_string(handle)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "SFTPHandleMessage":
        """Unpack SFTP handle message data."""
        offset = 0
        request_id, offset = read_uint32(data, offset)
        handle, offset = read_string(data, offset)

        return cls(request_id, handle)
Methods:
__init__(request_id, handle)

Initialize SFTP handle message.

Parameters:

Name Type Description Default
request_id int

Request ID this handle responds to

required
handle bytes

File handle

required
Source code in spindlex/protocol/sftp_messages.py
def __init__(self, request_id: int, handle: bytes) -> None:
    """
    Initialize SFTP handle message.

    Args:
        request_id: Request ID this handle responds to
        handle: File handle
    """
    super().__init__(SSH_FXP_HANDLE, request_id)
    self.handle = handle

    # Build message data
    self.add_string(handle)

SFTPInitMessage

Bases: SFTPMessage

SFTP initialization message (SSH_FXP_INIT).

Source code in spindlex/protocol/sftp_messages.py
class SFTPInitMessage(SFTPMessage):
    """SFTP initialization message (SSH_FXP_INIT)."""

    def __init__(self, version: int = SFTP_VERSION) -> None:
        """
        Initialize SFTP init message.

        Args:
            version: SFTP protocol version
        """
        super().__init__(SSH_FXP_INIT)
        self.version = version

        # Build message data
        self.add_uint32(version)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "SFTPInitMessage":
        """Unpack SFTP init message data."""
        version, _ = read_uint32(data, 0)
        return cls(version)
Methods:
__init__(version=SFTP_VERSION)

Initialize SFTP init message.

Parameters:

Name Type Description Default
version int

SFTP protocol version

SFTP_VERSION
Source code in spindlex/protocol/sftp_messages.py
def __init__(self, version: int = SFTP_VERSION) -> None:
    """
    Initialize SFTP init message.

    Args:
        version: SFTP protocol version
    """
    super().__init__(SSH_FXP_INIT)
    self.version = version

    # Build message data
    self.add_uint32(version)

SFTPLStatMessage

Bases: SFTPMessage

SFTP lstat message (SSH_FXP_LSTAT).

Source code in spindlex/protocol/sftp_messages.py
class SFTPLStatMessage(SFTPMessage):
    """SFTP lstat message (SSH_FXP_LSTAT)."""

    def __init__(self, request_id: int, path: str) -> None:
        """
        Initialize SFTP lstat message.

        Args:
            request_id: Request ID
            path: Path to get attributes for (don't follow symlinks)
        """
        super().__init__(SSH_FXP_LSTAT, request_id)
        self.path = path

        # Build message data
        self.add_string(path)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "SFTPLStatMessage":
        """Unpack SFTP lstat message data."""
        offset = 0
        request_id, offset = read_uint32(data, offset)
        path_bytes, offset = read_string(data, offset)

        path = path_bytes.decode("utf-8")
        return cls(request_id, path)
Methods:
__init__(request_id, path)

Initialize SFTP lstat message.

Parameters:

Name Type Description Default
request_id int

Request ID

required
path str

Path to get attributes for (don't follow symlinks)

required
Source code in spindlex/protocol/sftp_messages.py
def __init__(self, request_id: int, path: str) -> None:
    """
    Initialize SFTP lstat message.

    Args:
        request_id: Request ID
        path: Path to get attributes for (don't follow symlinks)
    """
    super().__init__(SSH_FXP_LSTAT, request_id)
    self.path = path

    # Build message data
    self.add_string(path)

SFTPLinkMessage

Bases: SFTPMessage

SFTP link message (SSH_FXP_LINK).

Source code in spindlex/protocol/sftp_messages.py
class SFTPLinkMessage(SFTPMessage):
    """SFTP link message (SSH_FXP_LINK)."""

    def __init__(self, request_id: int, linkpath: str, targetpath: str) -> None:
        """
        Initialize SFTP link message.

        Args:
            request_id: Request ID
            linkpath: Path where link should be created
            targetpath: Target path for the link
        """
        super().__init__(SSH_FXP_LINK, request_id)
        self.linkpath = linkpath
        self.targetpath = targetpath

        # Build message data
        self.add_string(linkpath)
        self.add_string(targetpath)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "SFTPLinkMessage":
        """Unpack SFTP link message data."""
        offset = 0
        request_id, offset = read_uint32(data, offset)
        linkpath_bytes, offset = read_string(data, offset)
        targetpath_bytes, offset = read_string(data, offset)

        linkpath = linkpath_bytes.decode("utf-8")
        targetpath = targetpath_bytes.decode("utf-8")
        return cls(request_id, linkpath, targetpath)
Methods:
__init__(request_id, linkpath, targetpath)

Initialize SFTP link message.

Parameters:

Name Type Description Default
request_id int

Request ID

required
linkpath str

Path where link should be created

required
targetpath str

Target path for the link

required
Source code in spindlex/protocol/sftp_messages.py
def __init__(self, request_id: int, linkpath: str, targetpath: str) -> None:
    """
    Initialize SFTP link message.

    Args:
        request_id: Request ID
        linkpath: Path where link should be created
        targetpath: Target path for the link
    """
    super().__init__(SSH_FXP_LINK, request_id)
    self.linkpath = linkpath
    self.targetpath = targetpath

    # Build message data
    self.add_string(linkpath)
    self.add_string(targetpath)

SFTPMessage

Base SFTP protocol message class.

Provides message serialization, deserialization, and validation functionality for SFTP protocol messages.

Source code in spindlex/protocol/sftp_messages.py
class SFTPMessage:
    """
    Base SFTP protocol message class.

    Provides message serialization, deserialization, and validation
    functionality for SFTP protocol messages.
    """

    def __init__(self, msg_type: int, request_id: Optional[int] = None) -> None:
        """
        Initialize SFTP message with type.

        Args:
            msg_type: SFTP message type code
            request_id: Request ID for request/response correlation

        Raises:
            ProtocolException: If message type is invalid
        """
        if not validate_sftp_message_type(msg_type):
            raise ProtocolException(f"Invalid SFTP message type: {msg_type}")

        self.msg_type = msg_type
        self.request_id = request_id
        self._data = bytearray()

    def pack(self) -> bytes:
        """
        Serialize SFTP message to bytes.

        Returns:
            Serialized message data with length prefix

        Raises:
            ProtocolException: If serialization fails
        """
        try:
            # Build message content
            content = write_byte(self.msg_type)

            # Add request ID for request messages
            if self.request_id is not None:
                content += write_uint32(self.request_id)

            # Add message-specific data
            content += bytes(self._data)

            # Prepend length
            return write_uint32(len(content)) + content
        except Exception as e:
            raise ProtocolException(f"Failed to pack SFTP message: {e}") from e

    @classmethod
    def unpack(cls, data: bytes) -> "SFTPMessage":
        """
        Deserialize SFTP message from bytes.

        Args:
            data: Serialized message data with length prefix

        Returns:
            Deserialized SFTP message instance

        Raises:
            ProtocolException: If deserialization fails
        """
        if len(data) < 5:  # Minimum: length(4) + type(1)
            raise ProtocolException("SFTP message data too short")

        # Read message length
        msg_length, offset = read_uint32(data, 0)

        if len(data) < 4 + msg_length:
            raise ProtocolException("Incomplete SFTP message")

        # Read message type
        msg_type, offset = read_byte(data, offset)

        # Create appropriate message class based on type
        message_classes = {
            SSH_FXP_INIT: SFTPInitMessage,
            SSH_FXP_VERSION: SFTPVersionMessage,
            SSH_FXP_OPEN: SFTPOpenMessage,
            SSH_FXP_CLOSE: SFTPCloseMessage,
            SSH_FXP_READ: SFTPReadMessage,
            SSH_FXP_WRITE: SFTPWriteMessage,
            SSH_FXP_STAT: SFTPStatMessage,
            SSH_FXP_LSTAT: SFTPLStatMessage,
            SSH_FXP_FSTAT: SFTPFStatMessage,
            SSH_FXP_SETSTAT: SFTPSetStatMessage,
            SSH_FXP_OPENDIR: SFTPOpenDirMessage,
            SSH_FXP_READDIR: SFTPReadDirMessage,
            SSH_FXP_REMOVE: SFTPRemoveMessage,
            SSH_FXP_MKDIR: SFTPMkdirMessage,
            SSH_FXP_RMDIR: SFTPRmdirMessage,
            SSH_FXP_REALPATH: SFTPRealPathMessage,
            SSH_FXP_RENAME: SFTPRenameMessage,
            SSH_FXP_READLINK: SFTPReadLinkMessage,
            SSH_FXP_SYMLINK: SFTPSymlinkMessage,
            SSH_FXP_LINK: SFTPLinkMessage,
            SSH_FXP_STATUS: SFTPStatusMessage,
            SSH_FXP_HANDLE: SFTPHandleMessage,
            SSH_FXP_DATA: SFTPDataMessage,
            SSH_FXP_NAME: SFTPNameMessage,
            SSH_FXP_ATTRS: SFTPAttrsMessage,
            SSH_FXP_EXTENDED: SFTPExtendedMessage,
            SSH_FXP_EXTENDED_REPLY: SFTPExtendedReplyMessage,
        }

        from typing import cast

        message_class = cast(
            type["SFTPMessage"], message_classes.get(msg_type, SFTPMessage)
        )

        if message_class == SFTPMessage:
            # Generic message
            msg = SFTPMessage(msg_type)
            msg._data = bytearray(data[offset : 4 + msg_length])
            return msg
        else:
            # Specific message class
            return message_class._unpack_data(data[offset : 4 + msg_length])

    @classmethod
    def _unpack_data(cls, data: bytes) -> "SFTPMessage":
        """
        Unpack message-specific data. Override in subclasses.

        Args:
            data: Message data without length prefix and type byte

        Returns:
            Message instance
        """
        raise NotImplementedError("Subclasses must implement _unpack_data")

    def add_uint32(self, value: int) -> None:
        """Add 32-bit unsigned integer to message."""
        self._data.extend(write_uint32(value))

    def add_uint64(self, value: int) -> None:
        """Add 64-bit unsigned integer to message."""
        self._data.extend(write_uint64(value))

    def add_string(self, value: Union[str, bytes]) -> None:
        """Add string to message."""
        self._data.extend(write_string(value))

    def add_byte(self, value: int) -> None:
        """Add single byte to message."""
        self._data.extend(write_byte(value))

    def validate(self) -> bool:
        """
        Validate SFTP message content.

        Returns:
            True if message is valid

        Raises:
            ProtocolException: If message is invalid
        """
        if not validate_sftp_message_type(self.msg_type):
            raise ProtocolException(f"Invalid SFTP message type: {self.msg_type}")

        # Validate message size
        if len(self._data) > SFTP_MAX_PACKET_SIZE:
            raise ProtocolException(f"SFTP message too large: {len(self._data)} bytes")

        return True
Methods:
__init__(msg_type, request_id=None)

Initialize SFTP message with type.

Parameters:

Name Type Description Default
msg_type int

SFTP message type code

required
request_id Optional[int]

Request ID for request/response correlation

None

Raises:

Type Description
ProtocolException

If message type is invalid

Source code in spindlex/protocol/sftp_messages.py
def __init__(self, msg_type: int, request_id: Optional[int] = None) -> None:
    """
    Initialize SFTP message with type.

    Args:
        msg_type: SFTP message type code
        request_id: Request ID for request/response correlation

    Raises:
        ProtocolException: If message type is invalid
    """
    if not validate_sftp_message_type(msg_type):
        raise ProtocolException(f"Invalid SFTP message type: {msg_type}")

    self.msg_type = msg_type
    self.request_id = request_id
    self._data = bytearray()
add_byte(value)

Add single byte to message.

Source code in spindlex/protocol/sftp_messages.py
def add_byte(self, value: int) -> None:
    """Add single byte to message."""
    self._data.extend(write_byte(value))
add_string(value)

Add string to message.

Source code in spindlex/protocol/sftp_messages.py
def add_string(self, value: Union[str, bytes]) -> None:
    """Add string to message."""
    self._data.extend(write_string(value))
add_uint32(value)

Add 32-bit unsigned integer to message.

Source code in spindlex/protocol/sftp_messages.py
def add_uint32(self, value: int) -> None:
    """Add 32-bit unsigned integer to message."""
    self._data.extend(write_uint32(value))
add_uint64(value)

Add 64-bit unsigned integer to message.

Source code in spindlex/protocol/sftp_messages.py
def add_uint64(self, value: int) -> None:
    """Add 64-bit unsigned integer to message."""
    self._data.extend(write_uint64(value))
pack()

Serialize SFTP message to bytes.

Returns:

Type Description
bytes

Serialized message data with length prefix

Raises:

Type Description
ProtocolException

If serialization fails

Source code in spindlex/protocol/sftp_messages.py
def pack(self) -> bytes:
    """
    Serialize SFTP message to bytes.

    Returns:
        Serialized message data with length prefix

    Raises:
        ProtocolException: If serialization fails
    """
    try:
        # Build message content
        content = write_byte(self.msg_type)

        # Add request ID for request messages
        if self.request_id is not None:
            content += write_uint32(self.request_id)

        # Add message-specific data
        content += bytes(self._data)

        # Prepend length
        return write_uint32(len(content)) + content
    except Exception as e:
        raise ProtocolException(f"Failed to pack SFTP message: {e}") from e
unpack(data) classmethod

Deserialize SFTP message from bytes.

Parameters:

Name Type Description Default
data bytes

Serialized message data with length prefix

required

Returns:

Type Description
SFTPMessage

Deserialized SFTP message instance

Raises:

Type Description
ProtocolException

If deserialization fails

Source code in spindlex/protocol/sftp_messages.py
@classmethod
def unpack(cls, data: bytes) -> "SFTPMessage":
    """
    Deserialize SFTP message from bytes.

    Args:
        data: Serialized message data with length prefix

    Returns:
        Deserialized SFTP message instance

    Raises:
        ProtocolException: If deserialization fails
    """
    if len(data) < 5:  # Minimum: length(4) + type(1)
        raise ProtocolException("SFTP message data too short")

    # Read message length
    msg_length, offset = read_uint32(data, 0)

    if len(data) < 4 + msg_length:
        raise ProtocolException("Incomplete SFTP message")

    # Read message type
    msg_type, offset = read_byte(data, offset)

    # Create appropriate message class based on type
    message_classes = {
        SSH_FXP_INIT: SFTPInitMessage,
        SSH_FXP_VERSION: SFTPVersionMessage,
        SSH_FXP_OPEN: SFTPOpenMessage,
        SSH_FXP_CLOSE: SFTPCloseMessage,
        SSH_FXP_READ: SFTPReadMessage,
        SSH_FXP_WRITE: SFTPWriteMessage,
        SSH_FXP_STAT: SFTPStatMessage,
        SSH_FXP_LSTAT: SFTPLStatMessage,
        SSH_FXP_FSTAT: SFTPFStatMessage,
        SSH_FXP_SETSTAT: SFTPSetStatMessage,
        SSH_FXP_OPENDIR: SFTPOpenDirMessage,
        SSH_FXP_READDIR: SFTPReadDirMessage,
        SSH_FXP_REMOVE: SFTPRemoveMessage,
        SSH_FXP_MKDIR: SFTPMkdirMessage,
        SSH_FXP_RMDIR: SFTPRmdirMessage,
        SSH_FXP_REALPATH: SFTPRealPathMessage,
        SSH_FXP_RENAME: SFTPRenameMessage,
        SSH_FXP_READLINK: SFTPReadLinkMessage,
        SSH_FXP_SYMLINK: SFTPSymlinkMessage,
        SSH_FXP_LINK: SFTPLinkMessage,
        SSH_FXP_STATUS: SFTPStatusMessage,
        SSH_FXP_HANDLE: SFTPHandleMessage,
        SSH_FXP_DATA: SFTPDataMessage,
        SSH_FXP_NAME: SFTPNameMessage,
        SSH_FXP_ATTRS: SFTPAttrsMessage,
        SSH_FXP_EXTENDED: SFTPExtendedMessage,
        SSH_FXP_EXTENDED_REPLY: SFTPExtendedReplyMessage,
    }

    from typing import cast

    message_class = cast(
        type["SFTPMessage"], message_classes.get(msg_type, SFTPMessage)
    )

    if message_class == SFTPMessage:
        # Generic message
        msg = SFTPMessage(msg_type)
        msg._data = bytearray(data[offset : 4 + msg_length])
        return msg
    else:
        # Specific message class
        return message_class._unpack_data(data[offset : 4 + msg_length])
validate()

Validate SFTP message content.

Returns:

Type Description
bool

True if message is valid

Raises:

Type Description
ProtocolException

If message is invalid

Source code in spindlex/protocol/sftp_messages.py
def validate(self) -> bool:
    """
    Validate SFTP message content.

    Returns:
        True if message is valid

    Raises:
        ProtocolException: If message is invalid
    """
    if not validate_sftp_message_type(self.msg_type):
        raise ProtocolException(f"Invalid SFTP message type: {self.msg_type}")

    # Validate message size
    if len(self._data) > SFTP_MAX_PACKET_SIZE:
        raise ProtocolException(f"SFTP message too large: {len(self._data)} bytes")

    return True

SFTPMkdirMessage

Bases: SFTPMessage

SFTP mkdir message (SSH_FXP_MKDIR).

Source code in spindlex/protocol/sftp_messages.py
class SFTPMkdirMessage(SFTPMessage):
    """SFTP mkdir message (SSH_FXP_MKDIR)."""

    def __init__(self, request_id: int, path: str, attrs: SFTPAttributes) -> None:
        """
        Initialize SFTP mkdir message.

        Args:
            request_id: Request ID
            path: Directory path to create
            attrs: Directory attributes
        """
        super().__init__(SSH_FXP_MKDIR, request_id)
        self.path = path
        self.attrs = attrs

        # Build message data
        self.add_string(path)
        self._data.extend(attrs.pack())

    @classmethod
    def _unpack_data(cls, data: bytes) -> "SFTPMkdirMessage":
        """Unpack SFTP mkdir message data."""
        offset = 0
        request_id, offset = read_uint32(data, offset)
        path_bytes, offset = read_string(data, offset)
        attrs, offset = SFTPAttributes.unpack(data, offset)

        path = path_bytes.decode("utf-8")
        return cls(request_id, path, attrs)
Methods:
__init__(request_id, path, attrs)

Initialize SFTP mkdir message.

Parameters:

Name Type Description Default
request_id int

Request ID

required
path str

Directory path to create

required
attrs SFTPAttributes

Directory attributes

required
Source code in spindlex/protocol/sftp_messages.py
def __init__(self, request_id: int, path: str, attrs: SFTPAttributes) -> None:
    """
    Initialize SFTP mkdir message.

    Args:
        request_id: Request ID
        path: Directory path to create
        attrs: Directory attributes
    """
    super().__init__(SSH_FXP_MKDIR, request_id)
    self.path = path
    self.attrs = attrs

    # Build message data
    self.add_string(path)
    self._data.extend(attrs.pack())

SFTPNameMessage

Bases: SFTPMessage

SFTP name message (SSH_FXP_NAME).

Source code in spindlex/protocol/sftp_messages.py
class SFTPNameMessage(SFTPMessage):
    """SFTP name message (SSH_FXP_NAME)."""

    def __init__(
        self, request_id: int, names: list[tuple[str, str, SFTPAttributes]]
    ) -> None:
        """
        Initialize SFTP name message.

        Args:
            request_id: Request ID this responds to
            names: List of (filename, longname, attrs) tuples
        """
        super().__init__(SSH_FXP_NAME, request_id)
        self.names = names

        # Build message data
        self.add_uint32(len(names))
        for filename, longname, attrs in names:
            self.add_string(filename)
            self.add_string(longname)
            self._data.extend(attrs.pack())

    @classmethod
    def _unpack_data(cls, data: bytes) -> "SFTPNameMessage":
        """Unpack SFTP name message data."""
        offset = 0
        request_id, offset = read_uint32(data, offset)
        count, offset = read_uint32(data, offset)

        names = []
        for _ in range(count):
            filename_bytes, offset = read_string(data, offset)
            longname_bytes, offset = read_string(data, offset)
            attrs, offset = SFTPAttributes.unpack(data, offset)

            filename = filename_bytes.decode("utf-8")
            longname = longname_bytes.decode("utf-8")
            names.append((filename, longname, attrs))

        return cls(request_id, names)
Methods:
__init__(request_id, names)

Initialize SFTP name message.

Parameters:

Name Type Description Default
request_id int

Request ID this responds to

required
names list[tuple[str, str, SFTPAttributes]]

List of (filename, longname, attrs) tuples

required
Source code in spindlex/protocol/sftp_messages.py
def __init__(
    self, request_id: int, names: list[tuple[str, str, SFTPAttributes]]
) -> None:
    """
    Initialize SFTP name message.

    Args:
        request_id: Request ID this responds to
        names: List of (filename, longname, attrs) tuples
    """
    super().__init__(SSH_FXP_NAME, request_id)
    self.names = names

    # Build message data
    self.add_uint32(len(names))
    for filename, longname, attrs in names:
        self.add_string(filename)
        self.add_string(longname)
        self._data.extend(attrs.pack())

SFTPOpenDirMessage

Bases: SFTPMessage

SFTP opendir message (SSH_FXP_OPENDIR).

Source code in spindlex/protocol/sftp_messages.py
class SFTPOpenDirMessage(SFTPMessage):
    """SFTP opendir message (SSH_FXP_OPENDIR)."""

    def __init__(self, request_id: int, path: str) -> None:
        """
        Initialize SFTP opendir message.

        Args:
            request_id: Request ID
            path: Directory path to open
        """
        super().__init__(SSH_FXP_OPENDIR, request_id)
        self.path = path

        # Build message data
        self.add_string(path)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "SFTPOpenDirMessage":
        """Unpack SFTP opendir message data."""
        offset = 0
        request_id, offset = read_uint32(data, offset)
        path_bytes, offset = read_string(data, offset)

        path = path_bytes.decode("utf-8")
        return cls(request_id, path)
Methods:
__init__(request_id, path)

Initialize SFTP opendir message.

Parameters:

Name Type Description Default
request_id int

Request ID

required
path str

Directory path to open

required
Source code in spindlex/protocol/sftp_messages.py
def __init__(self, request_id: int, path: str) -> None:
    """
    Initialize SFTP opendir message.

    Args:
        request_id: Request ID
        path: Directory path to open
    """
    super().__init__(SSH_FXP_OPENDIR, request_id)
    self.path = path

    # Build message data
    self.add_string(path)

SFTPOpenMessage

Bases: SFTPMessage

SFTP open file message (SSH_FXP_OPEN).

Source code in spindlex/protocol/sftp_messages.py
class SFTPOpenMessage(SFTPMessage):
    """SFTP open file message (SSH_FXP_OPEN)."""

    def __init__(
        self, request_id: int, filename: str, pflags: int, attrs: SFTPAttributes
    ) -> None:
        """
        Initialize SFTP open message.

        Args:
            request_id: Request ID
            filename: Path to file to open
            pflags: Open flags
            attrs: File attributes
        """
        super().__init__(SSH_FXP_OPEN, request_id)
        self.filename = filename
        self.pflags = pflags
        self.attrs = attrs

        # Build message data
        self.add_string(filename)
        self.add_uint32(pflags)
        self._data.extend(attrs.pack())

    @classmethod
    def _unpack_data(cls, data: bytes) -> "SFTPOpenMessage":
        """Unpack SFTP open message data."""
        offset = 0
        request_id, offset = read_uint32(data, offset)
        filename_bytes, offset = read_string(data, offset)
        pflags, offset = read_uint32(data, offset)
        attrs, offset = SFTPAttributes.unpack(data, offset)

        filename = filename_bytes.decode("utf-8")
        return cls(request_id, filename, pflags, attrs)

    def validate(self) -> bool:
        """
        Validate SFTP open message content.

        Returns:
            True if message is valid

        Raises:
            ProtocolException: If message is invalid
        """
        super().validate()

        # Validate filename
        if not self.filename:
            raise ProtocolException("Filename cannot be empty")

        # Validate flags
        valid_flags = (
            SSH_FXF_READ
            | SSH_FXF_WRITE
            | SSH_FXF_APPEND
            | SSH_FXF_CREAT
            | SSH_FXF_TRUNC
            | SSH_FXF_EXCL
        )
        if self.pflags & ~valid_flags:
            raise ProtocolException(f"Invalid open flags: {self.pflags}")

        # Must have at least read or write flag
        if not (self.pflags & (SSH_FXF_READ | SSH_FXF_WRITE)):
            raise ProtocolException("Must specify read or write flag")

        return True
Methods:
__init__(request_id, filename, pflags, attrs)

Initialize SFTP open message.

Parameters:

Name Type Description Default
request_id int

Request ID

required
filename str

Path to file to open

required
pflags int

Open flags

required
attrs SFTPAttributes

File attributes

required
Source code in spindlex/protocol/sftp_messages.py
def __init__(
    self, request_id: int, filename: str, pflags: int, attrs: SFTPAttributes
) -> None:
    """
    Initialize SFTP open message.

    Args:
        request_id: Request ID
        filename: Path to file to open
        pflags: Open flags
        attrs: File attributes
    """
    super().__init__(SSH_FXP_OPEN, request_id)
    self.filename = filename
    self.pflags = pflags
    self.attrs = attrs

    # Build message data
    self.add_string(filename)
    self.add_uint32(pflags)
    self._data.extend(attrs.pack())
validate()

Validate SFTP open message content.

Returns:

Type Description
bool

True if message is valid

Raises:

Type Description
ProtocolException

If message is invalid

Source code in spindlex/protocol/sftp_messages.py
def validate(self) -> bool:
    """
    Validate SFTP open message content.

    Returns:
        True if message is valid

    Raises:
        ProtocolException: If message is invalid
    """
    super().validate()

    # Validate filename
    if not self.filename:
        raise ProtocolException("Filename cannot be empty")

    # Validate flags
    valid_flags = (
        SSH_FXF_READ
        | SSH_FXF_WRITE
        | SSH_FXF_APPEND
        | SSH_FXF_CREAT
        | SSH_FXF_TRUNC
        | SSH_FXF_EXCL
    )
    if self.pflags & ~valid_flags:
        raise ProtocolException(f"Invalid open flags: {self.pflags}")

    # Must have at least read or write flag
    if not (self.pflags & (SSH_FXF_READ | SSH_FXF_WRITE)):
        raise ProtocolException("Must specify read or write flag")

    return True

SFTPReadDirMessage

Bases: SFTPMessage

SFTP readdir message (SSH_FXP_READDIR).

Source code in spindlex/protocol/sftp_messages.py
class SFTPReadDirMessage(SFTPMessage):
    """SFTP readdir message (SSH_FXP_READDIR)."""

    def __init__(self, request_id: int, handle: bytes) -> None:
        """
        Initialize SFTP readdir message.

        Args:
            request_id: Request ID
            handle: Directory handle
        """
        super().__init__(SSH_FXP_READDIR, request_id)
        self.handle = handle

        # Build message data
        self.add_string(handle)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "SFTPReadDirMessage":
        """Unpack SFTP readdir message data."""
        offset = 0
        request_id, offset = read_uint32(data, offset)
        handle, offset = read_string(data, offset)

        return cls(request_id, handle)
Methods:
__init__(request_id, handle)

Initialize SFTP readdir message.

Parameters:

Name Type Description Default
request_id int

Request ID

required
handle bytes

Directory handle

required
Source code in spindlex/protocol/sftp_messages.py
def __init__(self, request_id: int, handle: bytes) -> None:
    """
    Initialize SFTP readdir message.

    Args:
        request_id: Request ID
        handle: Directory handle
    """
    super().__init__(SSH_FXP_READDIR, request_id)
    self.handle = handle

    # Build message data
    self.add_string(handle)

SFTPReadLinkMessage

Bases: SFTPMessage

SFTP readlink message (SSH_FXP_READLINK).

Source code in spindlex/protocol/sftp_messages.py
class SFTPReadLinkMessage(SFTPMessage):
    """SFTP readlink message (SSH_FXP_READLINK)."""

    def __init__(self, request_id: int, path: str) -> None:
        """
        Initialize SFTP readlink message.

        Args:
            request_id: Request ID
            path: Path to symbolic link
        """
        super().__init__(SSH_FXP_READLINK, request_id)
        self.path = path

        # Build message data
        self.add_string(path)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "SFTPReadLinkMessage":
        """Unpack SFTP readlink message data."""
        offset = 0
        request_id, offset = read_uint32(data, offset)
        path_bytes, offset = read_string(data, offset)

        path = path_bytes.decode("utf-8")
        return cls(request_id, path)
Methods:
__init__(request_id, path)

Initialize SFTP readlink message.

Parameters:

Name Type Description Default
request_id int

Request ID

required
path str

Path to symbolic link

required
Source code in spindlex/protocol/sftp_messages.py
def __init__(self, request_id: int, path: str) -> None:
    """
    Initialize SFTP readlink message.

    Args:
        request_id: Request ID
        path: Path to symbolic link
    """
    super().__init__(SSH_FXP_READLINK, request_id)
    self.path = path

    # Build message data
    self.add_string(path)

SFTPReadMessage

Bases: SFTPMessage

SFTP read file message (SSH_FXP_READ).

Source code in spindlex/protocol/sftp_messages.py
class SFTPReadMessage(SFTPMessage):
    """SFTP read file message (SSH_FXP_READ)."""

    def __init__(
        self, request_id: int, handle: bytes, offset: int, length: int
    ) -> None:
        """
        Initialize SFTP read message.

        Args:
            request_id: Request ID
            handle: File handle
            offset: Byte offset to read from
            length: Number of bytes to read
        """
        super().__init__(SSH_FXP_READ, request_id)
        self.handle = handle
        self.offset = offset
        self.length = length

        # Build message data
        self.add_string(handle)
        self.add_uint64(offset)
        self.add_uint32(length)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "SFTPReadMessage":
        """Unpack SFTP read message data."""
        offset_pos = 0
        request_id, offset_pos = read_uint32(data, offset_pos)
        handle, offset_pos = read_string(data, offset_pos)
        file_offset, offset_pos = read_uint64(data, offset_pos)
        length, offset_pos = read_uint32(data, offset_pos)

        return cls(request_id, handle, file_offset, length)
Methods:
__init__(request_id, handle, offset, length)

Initialize SFTP read message.

Parameters:

Name Type Description Default
request_id int

Request ID

required
handle bytes

File handle

required
offset int

Byte offset to read from

required
length int

Number of bytes to read

required
Source code in spindlex/protocol/sftp_messages.py
def __init__(
    self, request_id: int, handle: bytes, offset: int, length: int
) -> None:
    """
    Initialize SFTP read message.

    Args:
        request_id: Request ID
        handle: File handle
        offset: Byte offset to read from
        length: Number of bytes to read
    """
    super().__init__(SSH_FXP_READ, request_id)
    self.handle = handle
    self.offset = offset
    self.length = length

    # Build message data
    self.add_string(handle)
    self.add_uint64(offset)
    self.add_uint32(length)

SFTPRealPathMessage

Bases: SFTPMessage

SFTP realpath message (SSH_FXP_REALPATH).

Source code in spindlex/protocol/sftp_messages.py
class SFTPRealPathMessage(SFTPMessage):
    """SFTP realpath message (SSH_FXP_REALPATH)."""

    def __init__(self, request_id: int, path: str) -> None:
        """
        Initialize SFTP realpath message.

        Args:
            request_id: Request ID
            path: Path to resolve
        """
        super().__init__(SSH_FXP_REALPATH, request_id)
        self.path = path

        # Build message data
        self.add_string(path)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "SFTPRealPathMessage":
        """Unpack SFTP realpath message data."""
        offset = 0
        request_id, offset = read_uint32(data, offset)
        path_bytes, offset = read_string(data, offset)

        path = path_bytes.decode("utf-8")
        return cls(request_id, path)
Methods:
__init__(request_id, path)

Initialize SFTP realpath message.

Parameters:

Name Type Description Default
request_id int

Request ID

required
path str

Path to resolve

required
Source code in spindlex/protocol/sftp_messages.py
def __init__(self, request_id: int, path: str) -> None:
    """
    Initialize SFTP realpath message.

    Args:
        request_id: Request ID
        path: Path to resolve
    """
    super().__init__(SSH_FXP_REALPATH, request_id)
    self.path = path

    # Build message data
    self.add_string(path)

SFTPRemoveMessage

Bases: SFTPMessage

SFTP remove message (SSH_FXP_REMOVE).

Source code in spindlex/protocol/sftp_messages.py
class SFTPRemoveMessage(SFTPMessage):
    """SFTP remove message (SSH_FXP_REMOVE)."""

    def __init__(self, request_id: int, filename: str) -> None:
        """
        Initialize SFTP remove message.

        Args:
            request_id: Request ID
            filename: File to remove
        """
        super().__init__(SSH_FXP_REMOVE, request_id)
        self.filename = filename

        # Build message data
        self.add_string(filename)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "SFTPRemoveMessage":
        """Unpack SFTP remove message data."""
        offset = 0
        request_id, offset = read_uint32(data, offset)
        filename_bytes, offset = read_string(data, offset)

        filename = filename_bytes.decode("utf-8")
        return cls(request_id, filename)
Methods:
__init__(request_id, filename)

Initialize SFTP remove message.

Parameters:

Name Type Description Default
request_id int

Request ID

required
filename str

File to remove

required
Source code in spindlex/protocol/sftp_messages.py
def __init__(self, request_id: int, filename: str) -> None:
    """
    Initialize SFTP remove message.

    Args:
        request_id: Request ID
        filename: File to remove
    """
    super().__init__(SSH_FXP_REMOVE, request_id)
    self.filename = filename

    # Build message data
    self.add_string(filename)

SFTPRenameMessage

Bases: SFTPMessage

SFTP rename message (SSH_FXP_RENAME).

Source code in spindlex/protocol/sftp_messages.py
class SFTPRenameMessage(SFTPMessage):
    """SFTP rename message (SSH_FXP_RENAME)."""

    def __init__(self, request_id: int, oldpath: str, newpath: str) -> None:
        """
        Initialize SFTP rename message.

        Args:
            request_id: Request ID
            oldpath: Current path
            newpath: New path
        """
        super().__init__(SSH_FXP_RENAME, request_id)
        self.oldpath = oldpath
        self.newpath = newpath

        # Build message data
        self.add_string(oldpath)
        self.add_string(newpath)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "SFTPRenameMessage":
        """Unpack SFTP rename message data."""
        offset = 0
        request_id, offset = read_uint32(data, offset)
        oldpath_bytes, offset = read_string(data, offset)
        newpath_bytes, offset = read_string(data, offset)

        oldpath = oldpath_bytes.decode("utf-8")
        newpath = newpath_bytes.decode("utf-8")
        return cls(request_id, oldpath, newpath)
Methods:
__init__(request_id, oldpath, newpath)

Initialize SFTP rename message.

Parameters:

Name Type Description Default
request_id int

Request ID

required
oldpath str

Current path

required
newpath str

New path

required
Source code in spindlex/protocol/sftp_messages.py
def __init__(self, request_id: int, oldpath: str, newpath: str) -> None:
    """
    Initialize SFTP rename message.

    Args:
        request_id: Request ID
        oldpath: Current path
        newpath: New path
    """
    super().__init__(SSH_FXP_RENAME, request_id)
    self.oldpath = oldpath
    self.newpath = newpath

    # Build message data
    self.add_string(oldpath)
    self.add_string(newpath)

SFTPRmdirMessage

Bases: SFTPMessage

SFTP rmdir message (SSH_FXP_RMDIR).

Source code in spindlex/protocol/sftp_messages.py
class SFTPRmdirMessage(SFTPMessage):
    """SFTP rmdir message (SSH_FXP_RMDIR)."""

    def __init__(self, request_id: int, path: str) -> None:
        """
        Initialize SFTP rmdir message.

        Args:
            request_id: Request ID
            path: Directory path to remove
        """
        super().__init__(SSH_FXP_RMDIR, request_id)
        self.path = path

        # Build message data
        self.add_string(path)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "SFTPRmdirMessage":
        """Unpack SFTP rmdir message data."""
        offset = 0
        request_id, offset = read_uint32(data, offset)
        path_bytes, offset = read_string(data, offset)

        path = path_bytes.decode("utf-8")
        return cls(request_id, path)
Methods:
__init__(request_id, path)

Initialize SFTP rmdir message.

Parameters:

Name Type Description Default
request_id int

Request ID

required
path str

Directory path to remove

required
Source code in spindlex/protocol/sftp_messages.py
def __init__(self, request_id: int, path: str) -> None:
    """
    Initialize SFTP rmdir message.

    Args:
        request_id: Request ID
        path: Directory path to remove
    """
    super().__init__(SSH_FXP_RMDIR, request_id)
    self.path = path

    # Build message data
    self.add_string(path)

SFTPSetStatMessage

Bases: SFTPMessage

SFTP setstat message (SSH_FXP_SETSTAT).

Source code in spindlex/protocol/sftp_messages.py
class SFTPSetStatMessage(SFTPMessage):
    """SFTP setstat message (SSH_FXP_SETSTAT)."""

    def __init__(self, request_id: int, path: str, attrs: SFTPAttributes) -> None:
        """
        Initialize SFTP setstat message.

        Args:
            request_id: Request ID
            path: Path to set attributes for
            attrs: New attributes
        """
        super().__init__(SSH_FXP_SETSTAT, request_id)
        self.path = path
        self.attrs = attrs

        # Build message data
        self.add_string(path)
        self._data.extend(attrs.pack())

    @classmethod
    def _unpack_data(cls, data: bytes) -> "SFTPSetStatMessage":
        """Unpack SFTP setstat message data."""
        offset = 0
        request_id, offset = read_uint32(data, offset)
        path_bytes, offset = read_string(data, offset)
        attrs, offset = SFTPAttributes.unpack(data, offset)

        path = path_bytes.decode("utf-8")
        return cls(request_id, path, attrs)
Methods:
__init__(request_id, path, attrs)

Initialize SFTP setstat message.

Parameters:

Name Type Description Default
request_id int

Request ID

required
path str

Path to set attributes for

required
attrs SFTPAttributes

New attributes

required
Source code in spindlex/protocol/sftp_messages.py
def __init__(self, request_id: int, path: str, attrs: SFTPAttributes) -> None:
    """
    Initialize SFTP setstat message.

    Args:
        request_id: Request ID
        path: Path to set attributes for
        attrs: New attributes
    """
    super().__init__(SSH_FXP_SETSTAT, request_id)
    self.path = path
    self.attrs = attrs

    # Build message data
    self.add_string(path)
    self._data.extend(attrs.pack())

SFTPStatMessage

Bases: SFTPMessage

SFTP stat message (SSH_FXP_STAT).

Source code in spindlex/protocol/sftp_messages.py
class SFTPStatMessage(SFTPMessage):
    """SFTP stat message (SSH_FXP_STAT)."""

    def __init__(self, request_id: int, path: str) -> None:
        """
        Initialize SFTP stat message.

        Args:
            request_id: Request ID
            path: Path to get attributes for
        """
        super().__init__(SSH_FXP_STAT, request_id)
        self.path = path

        # Build message data
        self.add_string(path)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "SFTPStatMessage":
        """Unpack SFTP stat message data."""
        offset = 0
        request_id, offset = read_uint32(data, offset)
        path_bytes, offset = read_string(data, offset)

        path = path_bytes.decode("utf-8")
        return cls(request_id, path)
Methods:
__init__(request_id, path)

Initialize SFTP stat message.

Parameters:

Name Type Description Default
request_id int

Request ID

required
path str

Path to get attributes for

required
Source code in spindlex/protocol/sftp_messages.py
def __init__(self, request_id: int, path: str) -> None:
    """
    Initialize SFTP stat message.

    Args:
        request_id: Request ID
        path: Path to get attributes for
    """
    super().__init__(SSH_FXP_STAT, request_id)
    self.path = path

    # Build message data
    self.add_string(path)

SFTPStatusMessage

Bases: SFTPMessage

SFTP status message (SSH_FXP_STATUS).

Source code in spindlex/protocol/sftp_messages.py
class SFTPStatusMessage(SFTPMessage):
    """SFTP status message (SSH_FXP_STATUS)."""

    def __init__(
        self, request_id: int, status_code: int, message: str = "", language: str = ""
    ) -> None:
        """
        Initialize SFTP status message.

        Args:
            request_id: Request ID this status responds to
            status_code: SFTP status code
            message: Human-readable error message
            language: Language tag
        """
        super().__init__(SSH_FXP_STATUS, request_id)
        self.status_code = status_code
        self.message = message
        self.language = language

        # Build message data
        self.add_uint32(status_code)
        self.add_string(message)
        self.add_string(language)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "SFTPStatusMessage":
        """Unpack SFTP status message data."""
        offset = 0
        request_id, offset = read_uint32(data, offset)
        status_code, offset = read_uint32(data, offset)
        message_bytes, offset = read_string(data, offset)
        language_bytes, offset = read_string(data, offset)

        message = message_bytes.decode("utf-8", errors="replace")
        language = language_bytes.decode("utf-8", errors="replace")

        return cls(request_id, status_code, message, language)
Methods:
__init__(request_id, status_code, message='', language='')

Initialize SFTP status message.

Parameters:

Name Type Description Default
request_id int

Request ID this status responds to

required
status_code int

SFTP status code

required
message str

Human-readable error message

''
language str

Language tag

''
Source code in spindlex/protocol/sftp_messages.py
def __init__(
    self, request_id: int, status_code: int, message: str = "", language: str = ""
) -> None:
    """
    Initialize SFTP status message.

    Args:
        request_id: Request ID this status responds to
        status_code: SFTP status code
        message: Human-readable error message
        language: Language tag
    """
    super().__init__(SSH_FXP_STATUS, request_id)
    self.status_code = status_code
    self.message = message
    self.language = language

    # Build message data
    self.add_uint32(status_code)
    self.add_string(message)
    self.add_string(language)

SFTPSymlinkMessage

Bases: SFTPMessage

SFTP symlink message (SSH_FXP_SYMLINK).

Source code in spindlex/protocol/sftp_messages.py
class SFTPSymlinkMessage(SFTPMessage):
    """SFTP symlink message (SSH_FXP_SYMLINK)."""

    def __init__(self, request_id: int, targetpath: str, linkpath: str) -> None:
        """
        Initialize SFTP symlink message.

        Args:
            request_id: Request ID
            targetpath: Target path for the link
            linkpath: Path where link should be created
        """
        super().__init__(SSH_FXP_SYMLINK, request_id)
        self.targetpath = targetpath
        self.linkpath = linkpath

        # Build message data (Note: SFTPv3 order is target, then link)
        self.add_string(targetpath)
        self.add_string(linkpath)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "SFTPSymlinkMessage":
        """Unpack SFTP symlink message data."""
        offset = 0
        request_id, offset = read_uint32(data, offset)
        targetpath_bytes, offset = read_string(data, offset)
        linkpath_bytes, offset = read_string(data, offset)

        targetpath = targetpath_bytes.decode("utf-8")
        linkpath = linkpath_bytes.decode("utf-8")
        return cls(request_id, targetpath, linkpath)
Methods:
__init__(request_id, targetpath, linkpath)

Initialize SFTP symlink message.

Parameters:

Name Type Description Default
request_id int

Request ID

required
targetpath str

Target path for the link

required
linkpath str

Path where link should be created

required
Source code in spindlex/protocol/sftp_messages.py
def __init__(self, request_id: int, targetpath: str, linkpath: str) -> None:
    """
    Initialize SFTP symlink message.

    Args:
        request_id: Request ID
        targetpath: Target path for the link
        linkpath: Path where link should be created
    """
    super().__init__(SSH_FXP_SYMLINK, request_id)
    self.targetpath = targetpath
    self.linkpath = linkpath

    # Build message data (Note: SFTPv3 order is target, then link)
    self.add_string(targetpath)
    self.add_string(linkpath)

SFTPVersionMessage

Bases: SFTPMessage

SFTP version message (SSH_FXP_VERSION).

Source code in spindlex/protocol/sftp_messages.py
class SFTPVersionMessage(SFTPMessage):
    """SFTP version message (SSH_FXP_VERSION)."""

    def __init__(
        self, version: int = SFTP_VERSION, extensions: Optional[dict[str, str]] = None
    ) -> None:
        """
        Initialize SFTP version message.

        Args:
            version: SFTP protocol version
            extensions: Optional extensions dictionary
        """
        super().__init__(SSH_FXP_VERSION)
        self.version = version
        self.extensions = extensions or {}

        # Build message data
        self.add_uint32(version)
        for name, data in self.extensions.items():
            self.add_string(name)
            self.add_string(data)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "SFTPVersionMessage":
        """Unpack SFTP version message data."""
        offset = 0
        version, offset = read_uint32(data, offset)

        extensions = {}
        while offset < len(data):
            name_bytes, offset = read_string(data, offset)
            data_bytes, offset = read_string(data, offset)
            name = name_bytes.decode("utf-8")
            ext_data = data_bytes.decode("utf-8")
            extensions[name] = ext_data

        return cls(version, extensions)
Methods:
__init__(version=SFTP_VERSION, extensions=None)

Initialize SFTP version message.

Parameters:

Name Type Description Default
version int

SFTP protocol version

SFTP_VERSION
extensions Optional[dict[str, str]]

Optional extensions dictionary

None
Source code in spindlex/protocol/sftp_messages.py
def __init__(
    self, version: int = SFTP_VERSION, extensions: Optional[dict[str, str]] = None
) -> None:
    """
    Initialize SFTP version message.

    Args:
        version: SFTP protocol version
        extensions: Optional extensions dictionary
    """
    super().__init__(SSH_FXP_VERSION)
    self.version = version
    self.extensions = extensions or {}

    # Build message data
    self.add_uint32(version)
    for name, data in self.extensions.items():
        self.add_string(name)
        self.add_string(data)

SFTPWriteMessage

Bases: SFTPMessage

SFTP write file message (SSH_FXP_WRITE).

Source code in spindlex/protocol/sftp_messages.py
class SFTPWriteMessage(SFTPMessage):
    """SFTP write file message (SSH_FXP_WRITE)."""

    def __init__(
        self, request_id: int, handle: bytes, offset: int, data: bytes
    ) -> None:
        """
        Initialize SFTP write message.

        Args:
            request_id: Request ID
            handle: File handle
            offset: Byte offset to write to
            data: Data to write
        """
        super().__init__(SSH_FXP_WRITE, request_id)
        self.handle = handle
        self.offset = offset
        self.data = data

        # Build message data
        self.add_string(handle)
        self.add_uint64(offset)
        self.add_string(data)

    @classmethod
    def _unpack_data(cls, data: bytes) -> "SFTPWriteMessage":
        """Unpack SFTP write message data."""
        offset_pos = 0
        request_id, offset_pos = read_uint32(data, offset_pos)
        handle, offset_pos = read_string(data, offset_pos)
        file_offset, offset_pos = read_uint64(data, offset_pos)
        write_data, offset_pos = read_string(data, offset_pos)

        return cls(request_id, handle, file_offset, write_data)
Methods:
__init__(request_id, handle, offset, data)

Initialize SFTP write message.

Parameters:

Name Type Description Default
request_id int

Request ID

required
handle bytes

File handle

required
offset int

Byte offset to write to

required
data bytes

Data to write

required
Source code in spindlex/protocol/sftp_messages.py
def __init__(
    self, request_id: int, handle: bytes, offset: int, data: bytes
) -> None:
    """
    Initialize SFTP write message.

    Args:
        request_id: Request ID
        handle: File handle
        offset: Byte offset to write to
        data: Data to write
    """
    super().__init__(SSH_FXP_WRITE, request_id)
    self.handle = handle
    self.offset = offset
    self.data = data

    # Build message data
    self.add_string(handle)
    self.add_uint64(offset)
    self.add_string(data)

Functions:

get_error_category(status_code)

Get error category for SFTP status code.

Parameters:

Name Type Description Default
status_code int

SFTP status code

required

Returns:

Type Description
str

Error category string

Source code in spindlex/protocol/sftp_constants.py
def get_error_category(status_code: int) -> str:
    """
    Get error category for SFTP status code.

    Args:
        status_code: SFTP status code

    Returns:
        Error category string
    """
    if status_code == SSH_FX_OK:
        return "success"
    elif is_file_not_found_error(status_code):
        return "not_found"
    elif is_permission_error(status_code):
        return "permission"
    elif status_code in (SSH_FX_NO_SPACE_ON_FILESYSTEM, SSH_FX_QUOTA_EXCEEDED):
        return "storage"
    elif status_code in (SSH_FX_NO_CONNECTION, SSH_FX_CONNECTION_LOST):
        return "connection"
    elif status_code in (
        SSH_FX_BAD_MESSAGE,
        SSH_FX_INVALID_HANDLE,
        SSH_FX_INVALID_FILENAME,
    ):
        return "protocol"
    else:
        return "general"

get_message_name(msg_type)

Get human-readable name for SFTP message type.

Parameters:

Name Type Description Default
msg_type int

SFTP message type code

required

Returns:

Type Description
str

Message type name or "UNKNOWN" if not recognized

Source code in spindlex/protocol/sftp_constants.py
def get_message_name(msg_type: int) -> str:
    """
    Get human-readable name for SFTP message type.

    Args:
        msg_type: SFTP message type code

    Returns:
        Message type name or "UNKNOWN" if not recognized
    """
    message_names = {
        SSH_FXP_INIT: "SSH_FXP_INIT",
        SSH_FXP_VERSION: "SSH_FXP_VERSION",
        SSH_FXP_OPEN: "SSH_FXP_OPEN",
        SSH_FXP_CLOSE: "SSH_FXP_CLOSE",
        SSH_FXP_READ: "SSH_FXP_READ",
        SSH_FXP_WRITE: "SSH_FXP_WRITE",
        SSH_FXP_LSTAT: "SSH_FXP_LSTAT",
        SSH_FXP_FSTAT: "SSH_FXP_FSTAT",
        SSH_FXP_SETSTAT: "SSH_FXP_SETSTAT",
        SSH_FXP_FSETSTAT: "SSH_FXP_FSETSTAT",
        SSH_FXP_OPENDIR: "SSH_FXP_OPENDIR",
        SSH_FXP_READDIR: "SSH_FXP_READDIR",
        SSH_FXP_REMOVE: "SSH_FXP_REMOVE",
        SSH_FXP_MKDIR: "SSH_FXP_MKDIR",
        SSH_FXP_RMDIR: "SSH_FXP_RMDIR",
        SSH_FXP_REALPATH: "SSH_FXP_REALPATH",
        SSH_FXP_STAT: "SSH_FXP_STAT",
        SSH_FXP_RENAME: "SSH_FXP_RENAME",
        SSH_FXP_READLINK: "SSH_FXP_READLINK",
        SSH_FXP_SYMLINK: "SSH_FXP_SYMLINK",
        SSH_FXP_LINK: "SSH_FXP_LINK",
        SSH_FXP_STATUS: "SSH_FXP_STATUS",
        SSH_FXP_HANDLE: "SSH_FXP_HANDLE",
        SSH_FXP_DATA: "SSH_FXP_DATA",
        SSH_FXP_NAME: "SSH_FXP_NAME",
        SSH_FXP_ATTRS: "SSH_FXP_ATTRS",
        SSH_FXP_EXTENDED: "SSH_FXP_EXTENDED",
        SSH_FXP_EXTENDED_REPLY: "SSH_FXP_EXTENDED_REPLY",
    }

    return message_names.get(msg_type, f"UNKNOWN({msg_type})")

get_status_message(status_code)

Get human-readable message for SFTP status code.

Parameters:

Name Type Description Default
status_code int

SFTP status code

required

Returns:

Type Description
str

Human-readable status message

Source code in spindlex/protocol/sftp_constants.py
def get_status_message(status_code: int) -> str:
    """
    Get human-readable message for SFTP status code.

    Args:
        status_code: SFTP status code

    Returns:
        Human-readable status message
    """
    return SFTP_STATUS_MESSAGES.get(status_code, f"Unknown status code: {status_code}")

is_error_status(status_code)

Check if SFTP status code indicates an error.

Parameters:

Name Type Description Default
status_code int

SFTP status code

required

Returns:

Type Description
bool

True if status indicates an error

Source code in spindlex/protocol/sftp_constants.py
def is_error_status(status_code: int) -> bool:
    """
    Check if SFTP status code indicates an error.

    Args:
        status_code: SFTP status code

    Returns:
        True if status indicates an error
    """
    return status_code != SSH_FX_OK

is_file_not_found_error(status_code)

Check if SFTP status code indicates file not found.

Parameters:

Name Type Description Default
status_code int

SFTP status code

required

Returns:

Type Description
bool

True if status indicates file not found

Source code in spindlex/protocol/sftp_constants.py
def is_file_not_found_error(status_code: int) -> bool:
    """
    Check if SFTP status code indicates file not found.

    Args:
        status_code: SFTP status code

    Returns:
        True if status indicates file not found
    """
    return status_code in (SSH_FX_NO_SUCH_FILE, SSH_FX_NO_SUCH_PATH)

is_permission_error(status_code)

Check if SFTP status code indicates permission error.

Parameters:

Name Type Description Default
status_code int

SFTP status code

required

Returns:

Type Description
bool

True if status indicates permission error

Source code in spindlex/protocol/sftp_constants.py
def is_permission_error(status_code: int) -> bool:
    """
    Check if SFTP status code indicates permission error.

    Args:
        status_code: SFTP status code

    Returns:
        True if status indicates permission error
    """
    return status_code in (SSH_FX_PERMISSION_DENIED, SSH_FX_WRITE_PROTECT)

is_success_status(status_code)

Check if SFTP status code indicates success.

Parameters:

Name Type Description Default
status_code int

SFTP status code

required

Returns:

Type Description
bool

True if status indicates success

Source code in spindlex/protocol/sftp_constants.py
def is_success_status(status_code: int) -> bool:
    """
    Check if SFTP status code indicates success.

    Args:
        status_code: SFTP status code

    Returns:
        True if status indicates success
    """
    return status_code == SSH_FX_OK

validate_sftp_message_type(msg_type)

Validate SFTP message type.

Parameters:

Name Type Description Default
msg_type int

SFTP message type code

required

Returns:

Type Description
bool

True if message type is valid

Source code in spindlex/protocol/sftp_constants.py
def validate_sftp_message_type(msg_type: int) -> bool:
    """
    Validate SFTP message type.

    Args:
        msg_type: SFTP message type code

    Returns:
        True if message type is valid
    """
    valid_types = {
        SSH_FXP_INIT,
        SSH_FXP_VERSION,
        SSH_FXP_OPEN,
        SSH_FXP_CLOSE,
        SSH_FXP_READ,
        SSH_FXP_WRITE,
        SSH_FXP_LSTAT,
        SSH_FXP_FSTAT,
        SSH_FXP_SETSTAT,
        SSH_FXP_FSETSTAT,
        SSH_FXP_OPENDIR,
        SSH_FXP_READDIR,
        SSH_FXP_REMOVE,
        SSH_FXP_MKDIR,
        SSH_FXP_RMDIR,
        SSH_FXP_REALPATH,
        SSH_FXP_STAT,
        SSH_FXP_RENAME,
        SSH_FXP_READLINK,
        SSH_FXP_SYMLINK,
        SSH_FXP_LINK,
        SSH_FXP_STATUS,
        SSH_FXP_HANDLE,
        SSH_FXP_DATA,
        SSH_FXP_NAME,
        SSH_FXP_ATTRS,
        SSH_FXP_EXTENDED,
        SSH_FXP_EXTENDED_REPLY,
    }
    return msg_type in valid_types

Protocol Constants

spindlex.protocol.constants

SSH Protocol Constants

Defines SSH protocol constants, message types, and error codes according to RFC 4251-4254 specifications.

Functions:

create_version_string(software_name='spindlex', software_version=None)

Create SSH version string for this implementation.

Parameters:

Name Type Description Default
software_name str

Name of SSH software

'spindlex'
software_version Optional[str]

Version of SSH software (defaults to package version)

None

Returns:

Type Description
str

Complete SSH version string

Source code in spindlex/protocol/constants.py
def create_version_string(
    software_name: str = "spindlex", software_version: Optional[str] = None
) -> str:
    """
    Create SSH version string for this implementation.

    Args:
        software_name: Name of SSH software
        software_version: Version of SSH software (defaults to package version)

    Returns:
        Complete SSH version string
    """
    if software_version is None:
        software_version = __version__
    return f"SSH-{SSH_PROTOCOL_VERSION_2}-{software_name}_{software_version}"

get_message_name(msg_type)

Get human-readable name for message type.

Parameters:

Name Type Description Default
msg_type int

Message type code

required

Returns:

Type Description
str

Message type name or "UNKNOWN" if not recognized

Source code in spindlex/protocol/constants.py
def get_message_name(msg_type: int) -> str:
    """
    Get human-readable name for message type.

    Args:
        msg_type: Message type code

    Returns:
        Message type name or "UNKNOWN" if not recognized
    """
    message_names = {
        MSG_DISCONNECT: "MSG_DISCONNECT",
        MSG_IGNORE: "MSG_IGNORE",
        MSG_UNIMPLEMENTED: "MSG_UNIMPLEMENTED",
        MSG_DEBUG: "MSG_DEBUG",
        MSG_SERVICE_REQUEST: "MSG_SERVICE_REQUEST",
        MSG_SERVICE_ACCEPT: "MSG_SERVICE_ACCEPT",
        MSG_KEXINIT: "MSG_KEXINIT",
        MSG_NEWKEYS: "MSG_NEWKEYS",
        MSG_KEXDH_INIT: "MSG_KEXDH_INIT",
        MSG_KEXDH_REPLY: "MSG_KEXDH_REPLY",
        MSG_KEX_ECDH_INIT: "MSG_KEX_ECDH_INIT",
        MSG_KEX_ECDH_REPLY: "MSG_KEX_ECDH_REPLY",
        MSG_USERAUTH_REQUEST: "MSG_USERAUTH_REQUEST",
        MSG_USERAUTH_FAILURE: "MSG_USERAUTH_FAILURE",
        MSG_USERAUTH_SUCCESS: "MSG_USERAUTH_SUCCESS",
        MSG_USERAUTH_BANNER: "MSG_USERAUTH_BANNER",
        MSG_USERAUTH_PK_OK: "MSG_USERAUTH_PK_OK",
        MSG_GLOBAL_REQUEST: "MSG_GLOBAL_REQUEST",
        MSG_REQUEST_SUCCESS: "MSG_REQUEST_SUCCESS",
        MSG_REQUEST_FAILURE: "MSG_REQUEST_FAILURE",
        MSG_CHANNEL_OPEN: "MSG_CHANNEL_OPEN",
        MSG_CHANNEL_OPEN_CONFIRMATION: "MSG_CHANNEL_OPEN_CONFIRMATION",
        MSG_CHANNEL_OPEN_FAILURE: "MSG_CHANNEL_OPEN_FAILURE",
        MSG_CHANNEL_WINDOW_ADJUST: "MSG_CHANNEL_WINDOW_ADJUST",
        MSG_CHANNEL_DATA: "MSG_CHANNEL_DATA",
        MSG_CHANNEL_EXTENDED_DATA: "MSG_CHANNEL_EXTENDED_DATA",
        MSG_CHANNEL_EOF: "MSG_CHANNEL_EOF",
        MSG_CHANNEL_CLOSE: "MSG_CHANNEL_CLOSE",
        MSG_CHANNEL_REQUEST: "MSG_CHANNEL_REQUEST",
        MSG_CHANNEL_SUCCESS: "MSG_CHANNEL_SUCCESS",
        MSG_CHANNEL_FAILURE: "MSG_CHANNEL_FAILURE",
    }

    return message_names.get(msg_type, f"UNKNOWN({msg_type})")

is_supported_version(protocol_version)

Check if protocol version is supported.

Parameters:

Name Type Description Default
protocol_version str

Protocol version string (e.g., "2.0")

required

Returns:

Type Description
bool

True if version is supported, False otherwise

Source code in spindlex/protocol/constants.py
def is_supported_version(protocol_version: str) -> bool:
    """
    Check if protocol version is supported.

    Args:
        protocol_version: Protocol version string (e.g., "2.0")

    Returns:
        True if version is supported, False otherwise
    """
    return protocol_version in SUPPORTED_PROTOCOL_VERSIONS

parse_version_string(version_line)

Parse SSH version string.

Parameters:

Name Type Description Default
version_line str

SSH version line (e.g., "SSH-2.0-OpenSSH_8.0")

required

Returns:

Type Description
tuple[str, str]

Tuple of (protocol_version, software_version)

Raises:

Type Description
ValueError

If version string is invalid

Source code in spindlex/protocol/constants.py
def parse_version_string(version_line: str) -> tuple[str, str]:
    """
    Parse SSH version string.

    Args:
        version_line: SSH version line (e.g., "SSH-2.0-OpenSSH_8.0")

    Returns:
        Tuple of (protocol_version, software_version)

    Raises:
        ValueError: If version string is invalid
    """
    if not version_line.startswith("SSH-"):
        raise ValueError(f"Invalid SSH version string: {version_line}")

    parts = version_line.split("-", 2)
    if len(parts) < 2:
        raise ValueError(f"Invalid SSH version string format: {version_line}")

    protocol_version = parts[1]
    software_version = parts[2] if len(parts) > 2 else ""

    return protocol_version, software_version

validate_message_type(msg_type)

Validate SSH message type.

Parameters:

Name Type Description Default
msg_type int

Message type code

required

Returns:

Type Description
bool

True if message type is valid, False otherwise

Source code in spindlex/protocol/constants.py
def validate_message_type(msg_type: int) -> bool:
    """
    Validate SSH message type.

    Args:
        msg_type: Message type code

    Returns:
        True if message type is valid, False otherwise
    """
    # Valid message type ranges according to RFC 4250
    return (
        (1 <= msg_type <= 19)  # Transport layer generic
        or (20 <= msg_type <= 29)  # Algorithm negotiation
        or (30 <= msg_type <= 41)  # Key exchange method specific
        or (50 <= msg_type <= 59)  # User authentication generic
        or (60 <= msg_type <= 79)  # User authentication method specific
        or (80 <= msg_type <= 89)  # Connection protocol generic
        or (90 <= msg_type <= 127)  # Channel related messages
        or (128 <= msg_type <= 191)  # Reserved for client protocols
        or (192 <= msg_type <= 255)  # Local extensions
    )

spindlex.protocol.sftp_constants

SFTP Protocol Constants

Defines SFTP protocol constants, message types, and error codes according to RFC 4254 and draft-ietf-secsh-filexfer specifications.

Functions:

get_error_category(status_code)

Get error category for SFTP status code.

Parameters:

Name Type Description Default
status_code int

SFTP status code

required

Returns:

Type Description
str

Error category string

Source code in spindlex/protocol/sftp_constants.py
def get_error_category(status_code: int) -> str:
    """
    Get error category for SFTP status code.

    Args:
        status_code: SFTP status code

    Returns:
        Error category string
    """
    if status_code == SSH_FX_OK:
        return "success"
    elif is_file_not_found_error(status_code):
        return "not_found"
    elif is_permission_error(status_code):
        return "permission"
    elif status_code in (SSH_FX_NO_SPACE_ON_FILESYSTEM, SSH_FX_QUOTA_EXCEEDED):
        return "storage"
    elif status_code in (SSH_FX_NO_CONNECTION, SSH_FX_CONNECTION_LOST):
        return "connection"
    elif status_code in (
        SSH_FX_BAD_MESSAGE,
        SSH_FX_INVALID_HANDLE,
        SSH_FX_INVALID_FILENAME,
    ):
        return "protocol"
    else:
        return "general"

get_message_name(msg_type)

Get human-readable name for SFTP message type.

Parameters:

Name Type Description Default
msg_type int

SFTP message type code

required

Returns:

Type Description
str

Message type name or "UNKNOWN" if not recognized

Source code in spindlex/protocol/sftp_constants.py
def get_message_name(msg_type: int) -> str:
    """
    Get human-readable name for SFTP message type.

    Args:
        msg_type: SFTP message type code

    Returns:
        Message type name or "UNKNOWN" if not recognized
    """
    message_names = {
        SSH_FXP_INIT: "SSH_FXP_INIT",
        SSH_FXP_VERSION: "SSH_FXP_VERSION",
        SSH_FXP_OPEN: "SSH_FXP_OPEN",
        SSH_FXP_CLOSE: "SSH_FXP_CLOSE",
        SSH_FXP_READ: "SSH_FXP_READ",
        SSH_FXP_WRITE: "SSH_FXP_WRITE",
        SSH_FXP_LSTAT: "SSH_FXP_LSTAT",
        SSH_FXP_FSTAT: "SSH_FXP_FSTAT",
        SSH_FXP_SETSTAT: "SSH_FXP_SETSTAT",
        SSH_FXP_FSETSTAT: "SSH_FXP_FSETSTAT",
        SSH_FXP_OPENDIR: "SSH_FXP_OPENDIR",
        SSH_FXP_READDIR: "SSH_FXP_READDIR",
        SSH_FXP_REMOVE: "SSH_FXP_REMOVE",
        SSH_FXP_MKDIR: "SSH_FXP_MKDIR",
        SSH_FXP_RMDIR: "SSH_FXP_RMDIR",
        SSH_FXP_REALPATH: "SSH_FXP_REALPATH",
        SSH_FXP_STAT: "SSH_FXP_STAT",
        SSH_FXP_RENAME: "SSH_FXP_RENAME",
        SSH_FXP_READLINK: "SSH_FXP_READLINK",
        SSH_FXP_SYMLINK: "SSH_FXP_SYMLINK",
        SSH_FXP_LINK: "SSH_FXP_LINK",
        SSH_FXP_STATUS: "SSH_FXP_STATUS",
        SSH_FXP_HANDLE: "SSH_FXP_HANDLE",
        SSH_FXP_DATA: "SSH_FXP_DATA",
        SSH_FXP_NAME: "SSH_FXP_NAME",
        SSH_FXP_ATTRS: "SSH_FXP_ATTRS",
        SSH_FXP_EXTENDED: "SSH_FXP_EXTENDED",
        SSH_FXP_EXTENDED_REPLY: "SSH_FXP_EXTENDED_REPLY",
    }

    return message_names.get(msg_type, f"UNKNOWN({msg_type})")

get_status_message(status_code)

Get human-readable message for SFTP status code.

Parameters:

Name Type Description Default
status_code int

SFTP status code

required

Returns:

Type Description
str

Human-readable status message

Source code in spindlex/protocol/sftp_constants.py
def get_status_message(status_code: int) -> str:
    """
    Get human-readable message for SFTP status code.

    Args:
        status_code: SFTP status code

    Returns:
        Human-readable status message
    """
    return SFTP_STATUS_MESSAGES.get(status_code, f"Unknown status code: {status_code}")

is_error_status(status_code)

Check if SFTP status code indicates an error.

Parameters:

Name Type Description Default
status_code int

SFTP status code

required

Returns:

Type Description
bool

True if status indicates an error

Source code in spindlex/protocol/sftp_constants.py
def is_error_status(status_code: int) -> bool:
    """
    Check if SFTP status code indicates an error.

    Args:
        status_code: SFTP status code

    Returns:
        True if status indicates an error
    """
    return status_code != SSH_FX_OK

is_file_not_found_error(status_code)

Check if SFTP status code indicates file not found.

Parameters:

Name Type Description Default
status_code int

SFTP status code

required

Returns:

Type Description
bool

True if status indicates file not found

Source code in spindlex/protocol/sftp_constants.py
def is_file_not_found_error(status_code: int) -> bool:
    """
    Check if SFTP status code indicates file not found.

    Args:
        status_code: SFTP status code

    Returns:
        True if status indicates file not found
    """
    return status_code in (SSH_FX_NO_SUCH_FILE, SSH_FX_NO_SUCH_PATH)

is_permission_error(status_code)

Check if SFTP status code indicates permission error.

Parameters:

Name Type Description Default
status_code int

SFTP status code

required

Returns:

Type Description
bool

True if status indicates permission error

Source code in spindlex/protocol/sftp_constants.py
def is_permission_error(status_code: int) -> bool:
    """
    Check if SFTP status code indicates permission error.

    Args:
        status_code: SFTP status code

    Returns:
        True if status indicates permission error
    """
    return status_code in (SSH_FX_PERMISSION_DENIED, SSH_FX_WRITE_PROTECT)

is_success_status(status_code)

Check if SFTP status code indicates success.

Parameters:

Name Type Description Default
status_code int

SFTP status code

required

Returns:

Type Description
bool

True if status indicates success

Source code in spindlex/protocol/sftp_constants.py
def is_success_status(status_code: int) -> bool:
    """
    Check if SFTP status code indicates success.

    Args:
        status_code: SFTP status code

    Returns:
        True if status indicates success
    """
    return status_code == SSH_FX_OK

validate_sftp_message_type(msg_type)

Validate SFTP message type.

Parameters:

Name Type Description Default
msg_type int

SFTP message type code

required

Returns:

Type Description
bool

True if message type is valid

Source code in spindlex/protocol/sftp_constants.py
def validate_sftp_message_type(msg_type: int) -> bool:
    """
    Validate SFTP message type.

    Args:
        msg_type: SFTP message type code

    Returns:
        True if message type is valid
    """
    valid_types = {
        SSH_FXP_INIT,
        SSH_FXP_VERSION,
        SSH_FXP_OPEN,
        SSH_FXP_CLOSE,
        SSH_FXP_READ,
        SSH_FXP_WRITE,
        SSH_FXP_LSTAT,
        SSH_FXP_FSTAT,
        SSH_FXP_SETSTAT,
        SSH_FXP_FSETSTAT,
        SSH_FXP_OPENDIR,
        SSH_FXP_READDIR,
        SSH_FXP_REMOVE,
        SSH_FXP_MKDIR,
        SSH_FXP_RMDIR,
        SSH_FXP_REALPATH,
        SSH_FXP_STAT,
        SSH_FXP_RENAME,
        SSH_FXP_READLINK,
        SSH_FXP_SYMLINK,
        SSH_FXP_LINK,
        SSH_FXP_STATUS,
        SSH_FXP_HANDLE,
        SSH_FXP_DATA,
        SSH_FXP_NAME,
        SSH_FXP_ATTRS,
        SSH_FXP_EXTENDED,
        SSH_FXP_EXTENDED_REPLY,
    }
    return msg_type in valid_types

Protocol Utilities

spindlex.protocol.utils

SSH Protocol Utility Functions

Provides utility functions for SSH protocol message parsing, serialization, and validation.

Classes

Functions:

extract_message_from_packet(packet_data)

Extract message payload from SSH packet.

Parameters:

Name Type Description Default
packet_data bytes

Complete SSH packet

required

Returns:

Type Description
bytes

Message payload (without packet framing)

Raises:

Type Description
ProtocolException

If packet is invalid

Source code in spindlex/protocol/utils.py
def extract_message_from_packet(packet_data: bytes) -> bytes:
    """
    Extract message payload from SSH packet.

    Args:
        packet_data: Complete SSH packet

    Returns:
        Message payload (without packet framing)

    Raises:
        ProtocolException: If packet is invalid
    """
    validate_packet_structure(packet_data)

    # Read packet length and padding length
    packet_length = struct.unpack(">I", packet_data[:PACKET_LENGTH_SIZE])[0]
    padding_length = packet_data[PACKET_LENGTH_SIZE]

    # Extract payload
    payload_start = PACKET_LENGTH_SIZE + PADDING_LENGTH_SIZE
    payload_end = payload_start + packet_length - PADDING_LENGTH_SIZE - padding_length

    return packet_data[payload_start:payload_end]

read_boolean(data, offset)

Read boolean from data.

Parameters:

Name Type Description Default
data Union[bytes, bytearray]

Data buffer

required
offset int

Current offset

required

Returns:

Type Description
tuple[bool, int]

Tuple of (value, new_offset)

Raises:

Type Description
ProtocolException

If not enough data

Source code in spindlex/protocol/utils.py
def read_boolean(data: Union[bytes, bytearray], offset: int) -> tuple[bool, int]:
    """
    Read boolean from data.

    Args:
        data: Data buffer
        offset: Current offset

    Returns:
        Tuple of (value, new_offset)

    Raises:
        ProtocolException: If not enough data
    """
    value, new_offset = read_byte(data, offset)
    return bool(value), new_offset

read_byte(data, offset)

Read single byte from data.

Parameters:

Name Type Description Default
data Union[bytes, bytearray]

Data buffer

required
offset int

Current offset

required

Returns:

Type Description
tuple[int, int]

Tuple of (value, new_offset)

Raises:

Type Description
ProtocolException

If not enough data

Source code in spindlex/protocol/utils.py
def read_byte(data: Union[bytes, bytearray], offset: int) -> tuple[int, int]:
    """
    Read single byte from data.

    Args:
        data: Data buffer
        offset: Current offset

    Returns:
        Tuple of (value, new_offset)

    Raises:
        ProtocolException: If not enough data
    """
    if offset >= len(data):
        raise ProtocolException("Not enough data to read byte")

    return data[offset], offset + 1

read_mpint(data, offset)

Read multiple precision integer from data.

Parameters:

Name Type Description Default
data Union[bytes, bytearray]

Data buffer

required
offset int

Current offset

required

Returns:

Type Description
tuple[int, int]

Tuple of (integer_value, new_offset)

Raises:

Type Description
ProtocolException

If not enough data or invalid mpint

Source code in spindlex/protocol/utils.py
def read_mpint(data: Union[bytes, bytearray], offset: int) -> tuple[int, int]:
    """
    Read multiple precision integer from data.

    Args:
        data: Data buffer
        offset: Current offset

    Returns:
        Tuple of (integer_value, new_offset)

    Raises:
        ProtocolException: If not enough data or invalid mpint
    """
    string_data, new_offset = read_string(data, offset)

    if len(string_data) == 0:
        return 0, new_offset

    # Convert bytes to integer (big-endian, two's complement)
    value = int.from_bytes(string_data, byteorder="big", signed=True)
    return value, new_offset

read_string(data, offset, max_size=MAX_MESSAGE_SIZE)

Read string from data.

Parameters:

Name Type Description Default
data Union[bytes, bytearray]

Data buffer

required
offset int

Current offset

required
max_size int

Maximum allowed string length (use a larger value for SFTP payloads)

MAX_MESSAGE_SIZE

Returns:

Type Description
tuple[bytes, int]

Tuple of (string_bytes, new_offset)

Raises:

Type Description
ProtocolException

If not enough data or invalid string length

Source code in spindlex/protocol/utils.py
def read_string(
    data: Union[bytes, bytearray], offset: int, max_size: int = MAX_MESSAGE_SIZE
) -> tuple[bytes, int]:
    """
    Read string from data.

    Args:
        data: Data buffer
        offset: Current offset
        max_size: Maximum allowed string length (use a larger value for SFTP payloads)

    Returns:
        Tuple of (string_bytes, new_offset)

    Raises:
        ProtocolException: If not enough data or invalid string length
    """
    length, new_offset = read_uint32(data, offset)

    if new_offset + length > len(data):
        raise ProtocolException("Not enough data to read string")

    if length > max_size:
        raise ProtocolException(f"String too long: {length}")

    # Ensure result is bytes, even if data is bytearray
    string_data = bytes(data[new_offset : new_offset + length])
    return string_data, new_offset + length

read_uint32(data, offset)

Read 32-bit unsigned integer from data.

Parameters:

Name Type Description Default
data Union[bytes, bytearray]

Data buffer

required
offset int

Current offset

required

Returns:

Type Description
tuple[int, int]

Tuple of (value, new_offset)

Raises:

Type Description
ProtocolException

If not enough data

Source code in spindlex/protocol/utils.py
def read_uint32(data: Union[bytes, bytearray], offset: int) -> tuple[int, int]:
    """
    Read 32-bit unsigned integer from data.

    Args:
        data: Data buffer
        offset: Current offset

    Returns:
        Tuple of (value, new_offset)

    Raises:
        ProtocolException: If not enough data
    """
    if offset + 4 > len(data):
        raise ProtocolException("Not enough data to read uint32")

    value = struct.unpack(">I", data[offset : offset + 4])[0]
    return value, offset + 4

read_uint64(data, offset)

Read 64-bit unsigned integer from data.

Parameters:

Name Type Description Default
data Union[bytes, bytearray]

Data buffer

required
offset int

Current offset

required

Returns:

Type Description
tuple[int, int]

Tuple of (value, new_offset)

Raises:

Type Description
ProtocolException

If not enough data

Source code in spindlex/protocol/utils.py
def read_uint64(data: Union[bytes, bytearray], offset: int) -> tuple[int, int]:
    """
    Read 64-bit unsigned integer from data.

    Args:
        data: Data buffer
        offset: Current offset

    Returns:
        Tuple of (value, new_offset)

    Raises:
        ProtocolException: If not enough data
    """
    if offset + 8 > len(data):
        raise ProtocolException("Not enough data to read uint64")

    value = struct.unpack(">Q", data[offset : offset + 8])[0]
    return value, offset + 8

validate_packet_structure(packet_data)

Validate SSH packet structure.

Parameters:

Name Type Description Default
packet_data bytes

Raw packet data

required

Returns:

Type Description
bool

True if packet structure is valid

Raises:

Type Description
ProtocolException

If packet structure is invalid

Source code in spindlex/protocol/utils.py
def validate_packet_structure(packet_data: bytes) -> bool:
    """
    Validate SSH packet structure.

    Args:
        packet_data: Raw packet data

    Returns:
        True if packet structure is valid

    Raises:
        ProtocolException: If packet structure is invalid
    """
    # Minimum total = PACKET_LENGTH_SIZE(4) + minimum body(8):
    #   AEAD ciphers (chacha20-poly1305) require body % 8 == 0, so minimum body = 8.
    #   Standard ciphers have larger minimums enforced elsewhere.
    if len(packet_data) < PACKET_LENGTH_SIZE + 8:
        raise ProtocolException(f"Packet too small: {len(packet_data)}")

    if len(packet_data) > MAX_PACKET_SIZE:
        raise ProtocolException(f"Packet too large: {len(packet_data)}")

    # Read packet length
    packet_length = struct.unpack(">I", packet_data[:PACKET_LENGTH_SIZE])[0]

    # Validate packet length - minimum body is 8 (one AEAD block or standard minimum)
    if packet_length < 8:
        raise ProtocolException(f"Invalid packet length: {packet_length}")

    if packet_length > MAX_PACKET_SIZE - PACKET_LENGTH_SIZE:
        raise ProtocolException(f"Packet length too large: {packet_length}")

    # Check if we have the complete packet
    if len(packet_data) < packet_length + PACKET_LENGTH_SIZE:
        raise ProtocolException("Incomplete packet")

    # Read padding length
    padding_length = packet_data[PACKET_LENGTH_SIZE]

    # Validate padding length
    if padding_length < MIN_PADDING_SIZE:
        raise ProtocolException(f"Padding too small: {padding_length}")

    if padding_length > MAX_PADDING_SIZE:
        raise ProtocolException(f"Padding too large: {padding_length}")

    # Calculate payload length
    payload_length = packet_length - PADDING_LENGTH_SIZE - padding_length

    if payload_length < 1:  # At least message type byte
        raise ProtocolException(f"Invalid payload length: {payload_length}")

    return True

write_boolean(value)

Write boolean to bytes.

Parameters:

Name Type Description Default
value bool

Boolean value

required

Returns:

Type Description
bytes

Serialized boolean

Source code in spindlex/protocol/utils.py
def write_boolean(value: bool) -> bytes:
    """
    Write boolean to bytes.

    Args:
        value: Boolean value

    Returns:
        Serialized boolean
    """
    return write_byte(1 if value else 0)

write_byte(value)

Write single byte to bytes.

Parameters:

Name Type Description Default
value int

Byte value (0-255)

required

Returns:

Type Description
bytes

Serialized byte

Raises:

Type Description
ProtocolException

If value is out of range

Source code in spindlex/protocol/utils.py
def write_byte(value: int) -> bytes:
    """
    Write single byte to bytes.

    Args:
        value: Byte value (0-255)

    Returns:
        Serialized byte

    Raises:
        ProtocolException: If value is out of range
    """
    if not (0 <= value <= 255):
        raise ProtocolException(f"Byte value out of range: {value}")

    return bytes([value])

write_mpint(value)

Write multiple precision integer to bytes.

Parameters:

Name Type Description Default
value int

Integer value

required

Returns:

Type Description
bytes

Serialized mpint

Source code in spindlex/protocol/utils.py
def write_mpint(value: int) -> bytes:
    """
    Write multiple precision integer to bytes.

    Args:
        value: Integer value

    Returns:
        Serialized mpint
    """
    if value == 0:
        return write_string(b"")

    # Convert integer to bytes (big-endian, two's complement)
    # Calculate minimum number of bytes needed
    bit_length = value.bit_length()
    byte_length = (bit_length + 7) // 8
    if byte_length == 0:
        byte_length = 1

    try:
        value_bytes = value.to_bytes(byte_length, byteorder="big", signed=True)
    except OverflowError:
        # Handle edge case where we need one more byte (for sign bit)
        byte_length += 1
        value_bytes = value.to_bytes(byte_length, byteorder="big", signed=True)

    return write_string(value_bytes)

write_string(value, max_size=MAX_MESSAGE_SIZE)

Write string to bytes.

Parameters:

Name Type Description Default
value Union[str, bytes]

String or bytes to write

required
max_size int

Maximum allowed string length

MAX_MESSAGE_SIZE

Returns:

Type Description
bytes

Serialized string with length prefix

Raises:

Type Description
ProtocolException

If string is too long

Source code in spindlex/protocol/utils.py
def write_string(value: Union[str, bytes], max_size: int = MAX_MESSAGE_SIZE) -> bytes:
    """
    Write string to bytes.

    Args:
        value: String or bytes to write
        max_size: Maximum allowed string length

    Returns:
        Serialized string with length prefix

    Raises:
        ProtocolException: If string is too long
    """
    if isinstance(value, str):
        value = value.encode(SSH_STRING_ENCODING)

    # Ensure value is bytes (not bytearray)
    value_bytes = bytes(value)

    if len(value_bytes) > max_size:
        raise ProtocolException(f"String too long: {len(value_bytes)}")

    return write_uint32(len(value_bytes)) + value_bytes

write_uint32(value)

Write 32-bit unsigned integer to bytes.

Parameters:

Name Type Description Default
value int

Integer value

required

Returns:

Type Description
bytes

Serialized integer

Raises:

Type Description
ProtocolException

If value is out of range

Source code in spindlex/protocol/utils.py
def write_uint32(value: int) -> bytes:
    """
    Write 32-bit unsigned integer to bytes.

    Args:
        value: Integer value

    Returns:
        Serialized integer

    Raises:
        ProtocolException: If value is out of range
    """
    if not (0 <= value <= 0xFFFFFFFF):
        raise ProtocolException(f"uint32 value out of range: {value}")

    return struct.pack(">I", value)

write_uint64(value)

Write 64-bit unsigned integer to bytes.

Parameters:

Name Type Description Default
value int

Integer value

required

Returns:

Type Description
bytes

Serialized integer

Raises:

Type Description
ProtocolException

If value is out of range

Source code in spindlex/protocol/utils.py
def write_uint64(value: int) -> bytes:
    """
    Write 64-bit unsigned integer to bytes.

    Args:
        value: Integer value

    Returns:
        Serialized integer

    Raises:
        ProtocolException: If value is out of range
    """
    if not (0 <= value <= 0xFFFFFFFFFFFFFFFF):
        raise ProtocolException(f"uint64 value out of range: {value}")

    return struct.pack(">Q", value)