Skip to content

Client API

SpindleX provides high-level SSH and SFTP client implementations for both synchronous and asynchronous use.

SSH Client

spindlex.client.ssh_client

SSH Client Implementation

High-level SSH client for establishing connections, executing commands, and managing SSH sessions with comprehensive authentication support.

Classes

ChannelFile

File-like object for SSH channel streams.

Provides file-like interface for reading from and writing to SSH channels.

Source code in spindlex/client/ssh_client.py
class ChannelFile:
    """
    File-like object for SSH channel streams.

    Provides file-like interface for reading from and writing to SSH channels.
    """

    def __init__(self, channel: Channel, mode: str = "r") -> None:
        """
        Initialize channel file.

        Args:
            channel: SSH channel instance
            mode: File mode ('r' for read, 'w' for write, 'stderr' for stderr)
        """
        self._channel = channel
        self._mode = mode
        self._closed = False

    def read(self, size: int = -1) -> bytes:
        """
        Read data from channel.

        Args:
            size: Number of bytes to read (-1 for all until EOF)

        Returns:
            Read data
        """
        if self._closed:
            raise ValueError("I/O operation on closed file")

        if size > 0:
            if self._mode == "stderr":
                return self._channel.recv_stderr(size)
            elif self._mode == "r":
                return self._channel.recv(size)
            else:
                raise ValueError("File not opened for reading")
        else:
            # Read until EOF
            result = bytearray()
            while True:
                try:
                    chunk = (
                        self._channel.recv(8192)
                        if self._mode == "r"
                        else self._channel.recv_stderr(8192)
                    )
                    if not chunk:
                        break
                    result.extend(chunk)
                except ChannelException as e:
                    errmsg = str(e).lower()
                    if ("timeout" in errmsg or "closed" in errmsg) and result:
                        return bytes(result)
                    if "closed" in errmsg:
                        return b""
                    raise
            return bytes(result)

    def write(self, data: Union[str, bytes]) -> int:
        """
        Write data to channel.

        Args:
            data: Data to write

        Returns:
            Number of bytes written
        """
        if self._closed:
            raise ValueError("I/O operation on closed file")

        if self._mode != "w":
            raise ValueError("File not opened for writing")

        if isinstance(data, str):
            data = data.encode(SSH_STRING_ENCODING)

        return self._channel.send(data)

    def get_exit_status(self) -> int:
        """
        Get command exit status.

        Returns:
            Exit status code, or -1 if not available
        """
        return self._channel.get_exit_status()

    def recv_exit_status(self) -> int:
        """
        Wait for and return command exit status.

        Returns:
            Exit status code
        """
        return self._channel.recv_exit_status()

    def __iter__(self) -> "ChannelFile":
        """
        Make object iterable for line-by-line reading.

        Returns:
            Self as iterator
        """
        return self

    def __next__(self) -> str:
        """
        Read next line from channel.

        Returns:
            Next line of data

        Raises:
            StopIteration: If EOF reached
        """
        line = self.readline()
        if not line:
            raise StopIteration
        return line

    def readline(self) -> str:
        """
        Read a single line from the channel.

        Returns:
            Read line
        """
        result = bytearray()
        while True:
            char = self.read(1)
            if not char:
                break
            result.extend(char)
            if char == b"\n":
                break
        return result.decode(SSH_STRING_ENCODING, errors="replace")

    @property
    def channel(self) -> "Channel":
        """Get underlying SSH channel."""
        return self._channel

    def close(self) -> None:
        """Close the file."""
        if not self._closed:
            self._closed = True
            self._channel.close()

    def __enter__(self) -> "ChannelFile":
        """Context manager entry."""
        return self

    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        """Context manager exit."""
        self.close()
Attributes
channel property

Get underlying SSH channel.

Methods:
__enter__()

Context manager entry.

Source code in spindlex/client/ssh_client.py
def __enter__(self) -> "ChannelFile":
    """Context manager entry."""
    return self
__exit__(exc_type, exc_val, exc_tb)

Context manager exit.

Source code in spindlex/client/ssh_client.py
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
    """Context manager exit."""
    self.close()
__init__(channel, mode='r')

Initialize channel file.

Parameters:

Name Type Description Default
channel Channel

SSH channel instance

required
mode str

File mode ('r' for read, 'w' for write, 'stderr' for stderr)

'r'
Source code in spindlex/client/ssh_client.py
def __init__(self, channel: Channel, mode: str = "r") -> None:
    """
    Initialize channel file.

    Args:
        channel: SSH channel instance
        mode: File mode ('r' for read, 'w' for write, 'stderr' for stderr)
    """
    self._channel = channel
    self._mode = mode
    self._closed = False
__iter__()

Make object iterable for line-by-line reading.

Returns:

Type Description
ChannelFile

Self as iterator

Source code in spindlex/client/ssh_client.py
def __iter__(self) -> "ChannelFile":
    """
    Make object iterable for line-by-line reading.

    Returns:
        Self as iterator
    """
    return self
__next__()

Read next line from channel.

Returns:

Type Description
str

Next line of data

Raises:

Type Description
StopIteration

If EOF reached

Source code in spindlex/client/ssh_client.py
def __next__(self) -> str:
    """
    Read next line from channel.

    Returns:
        Next line of data

    Raises:
        StopIteration: If EOF reached
    """
    line = self.readline()
    if not line:
        raise StopIteration
    return line
close()

Close the file.

Source code in spindlex/client/ssh_client.py
def close(self) -> None:
    """Close the file."""
    if not self._closed:
        self._closed = True
        self._channel.close()
get_exit_status()

Get command exit status.

Returns:

Type Description
int

Exit status code, or -1 if not available

Source code in spindlex/client/ssh_client.py
def get_exit_status(self) -> int:
    """
    Get command exit status.

    Returns:
        Exit status code, or -1 if not available
    """
    return self._channel.get_exit_status()
read(size=-1)

Read data from channel.

Parameters:

Name Type Description Default
size int

Number of bytes to read (-1 for all until EOF)

-1

Returns:

Type Description
bytes

Read data

Source code in spindlex/client/ssh_client.py
def read(self, size: int = -1) -> bytes:
    """
    Read data from channel.

    Args:
        size: Number of bytes to read (-1 for all until EOF)

    Returns:
        Read data
    """
    if self._closed:
        raise ValueError("I/O operation on closed file")

    if size > 0:
        if self._mode == "stderr":
            return self._channel.recv_stderr(size)
        elif self._mode == "r":
            return self._channel.recv(size)
        else:
            raise ValueError("File not opened for reading")
    else:
        # Read until EOF
        result = bytearray()
        while True:
            try:
                chunk = (
                    self._channel.recv(8192)
                    if self._mode == "r"
                    else self._channel.recv_stderr(8192)
                )
                if not chunk:
                    break
                result.extend(chunk)
            except ChannelException as e:
                errmsg = str(e).lower()
                if ("timeout" in errmsg or "closed" in errmsg) and result:
                    return bytes(result)
                if "closed" in errmsg:
                    return b""
                raise
        return bytes(result)
readline()

Read a single line from the channel.

Returns:

Type Description
str

Read line

Source code in spindlex/client/ssh_client.py
def readline(self) -> str:
    """
    Read a single line from the channel.

    Returns:
        Read line
    """
    result = bytearray()
    while True:
        char = self.read(1)
        if not char:
            break
        result.extend(char)
        if char == b"\n":
            break
    return result.decode(SSH_STRING_ENCODING, errors="replace")
recv_exit_status()

Wait for and return command exit status.

Returns:

Type Description
int

Exit status code

Source code in spindlex/client/ssh_client.py
def recv_exit_status(self) -> int:
    """
    Wait for and return command exit status.

    Returns:
        Exit status code
    """
    return self._channel.recv_exit_status()
write(data)

Write data to channel.

Parameters:

Name Type Description Default
data Union[str, bytes]

Data to write

required

Returns:

Type Description
int

Number of bytes written

Source code in spindlex/client/ssh_client.py
def write(self, data: Union[str, bytes]) -> int:
    """
    Write data to channel.

    Args:
        data: Data to write

    Returns:
        Number of bytes written
    """
    if self._closed:
        raise ValueError("I/O operation on closed file")

    if self._mode != "w":
        raise ValueError("File not opened for writing")

    if isinstance(data, str):
        data = data.encode(SSH_STRING_ENCODING)

    return self._channel.send(data)

SSHClient

High-level SSH client interface.

Provides convenient methods for SSH operations including connection establishment, command execution, shell access, and SFTP operations.

Source code in spindlex/client/ssh_client.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
class SSHClient:
    """
    High-level SSH client interface.

    Provides convenient methods for SSH operations including connection
    establishment, command execution, shell access, and SFTP operations.
    """

    def __init__(self) -> None:
        """Initialize SSH client with default settings."""
        self._transport: Optional[Transport] = None
        self._hostname: Optional[str] = None
        self._port: int = 22
        self._connected_username: Optional[str] = None
        self._host_key_policy: MissingHostKeyPolicy = RejectPolicy()
        self._host_key_storage = HostKeyStorage()
        self._logger = logging.getLogger(__name__)

    @property
    def username(self) -> Optional[str]:
        """Return the username used to authenticate this connection, or None if not connected."""
        return self._connected_username

    def set_missing_host_key_policy(self, policy: MissingHostKeyPolicy) -> None:
        """
        Set policy for handling unknown host keys.

        Args:
            policy: Host key policy to use for unknown hosts
        """
        self._host_key_policy = policy

    def set_host_key_storage(self, storage: HostKeyStorage) -> None:
        """
        Set host key storage instance.

        Args:
            storage: Host key storage to use
        """
        self._host_key_storage = storage

    def load_host_keys(self, filename: str) -> None:
        """
        Load host keys from a file.

        Args:
            filename: Path to known_hosts file
        """
        if not self._host_key_storage:
            self._host_key_storage = HostKeyStorage(filename)
        else:
            self._host_key_storage.load(filename)

    def load_system_host_keys(self) -> None:
        """
        Load host keys from system default locations.
        """
        # Common locations
        import os

        paths = [
            os.path.expanduser("~/.ssh/known_hosts"),
            os.path.expanduser("~/.ssh/known_hosts2"),
            "/etc/ssh/ssh_known_hosts",
            "/etc/ssh/ssh_known_hosts2",
        ]
        for path in paths:
            if os.path.exists(path):
                self.load_host_keys(path)

    def save_host_keys(self, filename: str) -> None:
        """
        Save host keys to a file.

        Args:
            filename: Path to save known_hosts
        """
        if self._host_key_storage._filename != filename:
            new_storage = HostKeyStorage(filename)
            new_storage.copy_from(self._host_key_storage)
            self._host_key_storage = new_storage

        self._host_key_storage.save()

    def get_host_key_storage(self) -> HostKeyStorage:
        """
        Get host key storage instance.

        Returns:
            Current host key storage
        """
        return self._host_key_storage

    def connect(
        self,
        hostname: str,
        port: int = 22,
        username: Optional[str] = None,
        password: Optional[str] = None,
        pkey: Optional[PKey] = None,
        key_filename: Optional[str] = None,
        key_password: Optional[str] = None,
        timeout: Optional[float] = None,
        compress: bool = False,
        sock: Optional[socket.socket] = None,
        gss_auth: bool = False,
        gss_kex: bool = False,
        gss_deleg_creds: bool = True,
        gss_host: Optional[str] = None,
        rekey_bytes_limit: Optional[int] = None,
        rekey_time_limit: Optional[float] = None,
    ) -> None:
        """
        Connect to SSH server and authenticate.

        Args:
            hostname: Server hostname or IP address
            port: Server port (default 22)
            username: Username for authentication
            password: Password for authentication
            pkey: Private key for authentication
            key_filename: Path to private key file
            timeout: Connection timeout in seconds
            compress: Whether to enable compression
            sock: Optional existing socket to use
            gss_auth: Whether to use GSSAPI authentication
            gss_kex: Whether to use GSSAPI key exchange
            gss_deleg_creds: Whether to delegate GSSAPI credentials
            gss_host: GSSAPI hostname override
            rekey_bytes_limit: Number of bytes before rekeying (default: 1GB)
            rekey_time_limit: Seconds before rekeying (default: 1 hour)

        Raises:
            SSHException: If connection or authentication fails
        """
        if self._transport and self._transport.active:
            raise SSHException("Already connected")

        if compress:
            from ..exceptions import ConfigurationException

            raise ConfigurationException("Compression is not yet supported")

        if gss_kex:
            from ..exceptions import ConfigurationException

            raise ConfigurationException("GSSAPI key exchange is not yet supported")

        self._hostname = hostname
        self._port = port
        self._connected_username = username

        # Validate port
        if not (0 < port <= 65535):
            raise SSHException(f"Invalid port number: {port}")

        try:
            if sock is None:
                # Create socket connection - create_connection resolves IPv4 and IPv6
                self._logger.debug(f"Connecting to {hostname}:{port}")
                try:
                    sock = socket.create_connection(
                        (hostname, port),
                        timeout=timeout if timeout else None,
                    )
                except OSError as e:
                    raise SSHException(f"Connection failed: {e}") from e

                # Enable TCP_NODELAY to reduce latency (Nagle's algorithm)
                sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

            # Create transport
            self._transport = Transport(
                sock,
                rekey_bytes_limit=rekey_bytes_limit,
                rekey_time_limit=rekey_time_limit,
            )

            # Set permanent timeout on transport
            if timeout:
                self._transport.set_timeout(timeout)

            # Start client transport (handshake and key exchange)
            self._transport.start_client(timeout)

            # Verify host key
            self._verify_host_key()

            # Authenticate if credentials provided
            if username:
                self._authenticate(
                    username,
                    password,
                    pkey,
                    key_filename,
                    key_password,
                    gss_auth,
                    gss_host,
                    gss_deleg_creds,
                )

            self._logger.info(f"Successfully connected to {hostname}:{port}")

        except Exception as e:
            # Cleanup on failure
            if self._transport:
                self._transport.close()
                self._transport = None

            if isinstance(
                e, (SSHException, AuthenticationException, BadHostKeyException)
            ):
                raise
            raise SSHException(f"Connection failed: {e}") from e

    def _verify_host_key(self) -> None:
        """
        Verify server host key according to policy.

        Raises:
            BadHostKeyException: If host key verification fails
        """
        if not self._transport:
            raise SSHException("No transport available")

        hostname = self._hostname or "unknown"

        try:
            # Get actual server host key from transport
            server_key = self._transport.get_server_host_key()

            if server_key is None:
                raise SSHException("No host key received from server")

            # Check if we have any known host keys for this hostname
            known_keys = self._host_key_storage.get_all(hostname)

            if not known_keys:
                # No known key - apply missing host key policy
                self._logger.debug(f"No known host key for {hostname}")

                try:
                    self._host_key_policy.missing_host_key(self, hostname, server_key)
                except (BadHostKeyException, SSHException):
                    raise
                except Exception as e:
                    # Policy had an unexpected error - fail closed
                    raise SSHException(f"Host key policy error: {e}") from e
            else:
                # Check if server key matches ANY stored key for this host
                self._logger.debug(f"Found known host key(s) for {hostname}")
                server_key_bytes = server_key.get_public_key_bytes()

                # Filter known keys by algorithm to avoid false positives for different key types
                known_keys_of_type = [
                    k
                    for k in known_keys
                    if k.algorithm_name == server_key.algorithm_name
                ]

                if known_keys_of_type:
                    if not any(
                        k.get_public_key_bytes() == server_key_bytes
                        for k in known_keys_of_type
                    ):
                        raise BadHostKeyException(
                            hostname, server_key, known_keys_of_type[0]
                        )
                else:
                    # We have keys for this host, but not of this type.
                    # Standard behavior is to treat it as a new (missing) host key.
                    self._logger.debug(
                        f"No known {server_key.algorithm_name} host key for {hostname}"
                    )
                    try:
                        self._host_key_policy.missing_host_key(
                            self, hostname, server_key
                        )
                    except (BadHostKeyException, SSHException):
                        raise
                    except Exception as e:
                        raise SSHException(f"Host key policy error: {e}") from e

        except (BadHostKeyException, SSHException):
            raise
        except Exception as e:
            self._logger.error(f"Host key verification error: {e}")
            raise SSHException(f"Host key verification failed: {e}") from e

    def auth_password(self, username: str, password: str) -> None:
        """
        Authenticate using password.

        Args:
            username: Username for authentication
            password: Password for authentication

        Raises:
            AuthenticationException: If authentication fails
        """
        if not self._transport:
            raise SSHException("No transport available")

        if not self._transport.auth_password(username, password):
            raise AuthenticationException("Password authentication failed")

    def auth_publickey(
        self,
        username: str,
        pkey: Optional[PKey] = None,
        key_filename: Optional[str] = None,
        password: Optional[str] = None,
    ) -> None:
        """
        Authenticate using public key.

        Args:
            username: Username for authentication
            pkey: Private key instance
            key_filename: Path to private key file
            password: Optional password for encrypted private keys

        Raises:
            AuthenticationException: If authentication fails
        """
        if not self._transport:
            raise SSHException("No transport available")

        if key_filename:
            pkey = PKey.from_private_key_file(key_filename, password)

        if pkey is None:
            raise AuthenticationException("No private key provided")

        if not self._transport.auth_publickey(username, pkey):
            raise AuthenticationException("Public key authentication failed")

    def auth_keyboard_interactive(
        self,
        username: str,
        handler: Optional[
            Callable[[str, str, list[tuple[str, bool]]], list[str]]
        ] = None,
    ) -> None:
        """
        Authenticate using keyboard-interactive method.

        Args:
            username: Username for authentication
            handler: Callback function to handle prompts.
                     If None, uses console_handler (terminal prompts).

        Raises:
            AuthenticationException: If authentication fails
        """
        if not self._transport:
            raise SSHException("No transport available")

        if handler is None:
            handler = console_handler

        if not self._transport.auth_keyboard_interactive(username, handler):
            raise AuthenticationException("Keyboard-interactive authentication failed")

    def auth_gssapi(
        self,
        username: str,
        gss_host: Optional[str] = None,
        gss_deleg_creds: bool = False,
    ) -> None:
        """
        Authenticate using GSSAPI (Kerberos).

        Args:
            username: Username for authentication
            gss_host: GSSAPI hostname (optional)
            gss_deleg_creds: Whether to delegate credentials

        Raises:
            AuthenticationException: If authentication fails
        """
        if not self._transport:
            raise SSHException("No transport available")

        if not self._transport.auth_gssapi(username, gss_host, gss_deleg_creds):
            raise AuthenticationException("GSSAPI authentication failed")

    def _authenticate(
        self,
        username: str,
        password: Optional[str] = None,
        pkey: Optional[PKey] = None,
        key_filename: Optional[str] = None,
        key_password: Optional[str] = None,
        gss_auth: bool = False,
        gss_host: Optional[str] = None,
        gss_deleg_creds: bool = False,
    ) -> None:
        """
        Authenticate with the server.

        Args:
            username: Username for authentication
            password: Password for authentication
            pkey: Private key for authentication
            key_filename: Path to private key file
            key_password: Password for private key file
            gss_auth: Whether to use GSSAPI authentication
            gss_host: GSSAPI hostname override
            gss_deleg_creds: Whether to delegate GSSAPI credentials

        Raises:
            AuthenticationException: If authentication fails
        """
        if not self._transport:
            raise AuthenticationException("No transport available")

        authenticated = False

        # Try GSSAPI if requested
        if gss_auth and not authenticated:
            try:
                self.auth_gssapi(username, gss_host, gss_deleg_creds)
                authenticated = True
            except Exception as e:
                self._logger.debug(f"GSSAPI authentication failed: {e}")

        # Load key from file if provided
        if key_filename and not pkey and not authenticated:
            try:
                from ..crypto.pkey import PKey

                # Use key_password if provided, otherwise fall back to password
                # (backward compatibility for when password was used for both)
                effective_key_password = (
                    key_password if key_password is not None else password
                )
                pkey = PKey.from_private_key_file(key_filename, effective_key_password)
            except Exception as e:
                self._logger.debug(f"Failed to load key from {key_filename}: {e}")

        # Try public key authentication first if key provided
        if pkey and not authenticated:
            try:
                self._logger.debug(
                    f"Attempting public key authentication for {username}"
                )
                authenticated = self._transport.auth_publickey(username, pkey)
                if authenticated:
                    self._logger.info(
                        f"Public key authentication successful for {username}"
                    )
            except Exception as e:
                self._logger.debug(f"Public key authentication failed: {e}")

        # Try password authentication if password provided and not yet authenticated
        if password and not authenticated:
            try:
                self._logger.debug(f"Attempting password authentication for {username}")
                authenticated = self._transport.auth_password(username, password)
                if authenticated:
                    self._logger.info(
                        f"Password authentication successful for {username}"
                    )
            except Exception as e:
                self._logger.debug(f"Password authentication failed: {e}")

        if not authenticated:
            auth_methods = []
            if pkey:
                auth_methods.append("publickey")
            if password:
                auth_methods.append("password")

            raise AuthenticationException(
                f"Authentication failed for {username} using methods: {', '.join(auth_methods) if auth_methods else 'none'}"
            )

    def exec_command(
        self, command: str, bufsize: int = -1
    ) -> tuple[ChannelFile, ChannelFile, ChannelFile]:
        """
        Execute command on remote server.

        Args:
            command: Command to execute
            bufsize: Buffer size for streams (unused, kept for compatibility)

        Returns:
            Tuple of (stdin, stdout, stderr) file-like objects

        Raises:
            SSHException: If command execution fails
        """
        if not self.is_connected():
            raise SSHException("Not connected to server")

        if not command.strip():
            raise SSHException("Command cannot be empty")

        try:
            if self._transport is None:
                raise SSHException("No transport available")
            # Open a new session channel
            channel = self._transport.open_channel("session")

            # Execute the command
            channel.exec_command(command)

            # Create file-like objects for the streams
            stdin = ChannelFile(channel, "w")
            stdout = ChannelFile(channel, "r")
            stderr = ChannelFile(channel, "stderr")

            self._logger.debug(f"Command executed: {command}")

            return stdin, stdout, stderr

        except Exception as e:
            if isinstance(e, SSHException):
                raise
            raise SSHException(f"Failed to execute command '{command}': {e}") from e

    def invoke_shell(self) -> Channel:
        """
        Start interactive shell session.

        Returns:
            Channel object for shell interaction

        Raises:
            SSHException: If shell invocation fails
        """
        if not self.is_connected():
            raise SSHException("Not connected to server")

        try:
            if self._transport is None:
                raise SSHException("No transport available")
            # Open a new session channel
            channel = self._transport.open_channel("session")

            # Request a pseudo-terminal (usually needed for shell)
            channel.request_pty()

            # Invoke shell
            channel.invoke_shell()

            self._logger.debug("Interactive shell session started")

            return channel

        except Exception as e:
            if isinstance(e, SSHException):
                raise
            raise SSHException(f"Failed to invoke shell: {e}") from e

    def open_sftp(self) -> "SFTPClient":
        """
        Open SFTP session.

        Returns:
            SFTPClient instance for file operations

        Raises:
            SSHException: If SFTP session creation fails
        """
        if not self.is_connected():
            raise SSHException("Not connected to SSH server")

        try:
            from .sftp_client import SFTPClient

            if self._transport is None:
                raise SSHException("No transport available")
            return SFTPClient(self._transport)
        except Exception as e:
            if isinstance(e, SSHException):
                raise
            raise SSHException(f"Failed to open SFTP session: {e}") from e

    def close(self) -> None:
        """Close SSH connection and cleanup resources."""
        try:
            if self._transport:
                self._logger.debug(
                    f"Closing connection to {self._hostname}:{self._port}"
                )

                # Close all port forwarding tunnels
                try:
                    forwarding_manager = self._transport.get_port_forwarding_manager()
                    forwarding_manager.close_all_tunnels()
                except Exception as e:
                    self._logger.warning(f"Error closing port forwarding tunnels: {e}")

                self._transport.close()
                self._transport = None
                self._logger.info(f"Connection closed to {self._hostname}:{self._port}")
        except Exception as e:
            self._logger.warning(f"Error during connection cleanup: {e}")
        finally:
            self._transport = None
            self._hostname = None
            self._port = 22

    def is_connected(self) -> bool:
        """
        Check if client is connected and authenticated.

        Returns:
            True if connected and authenticated, False otherwise
        """
        return (
            self._transport is not None
            and self._transport.active
            and self._transport.authenticated
        )

    def get_transport(self) -> Optional[Transport]:
        """
        Get underlying transport object.

        Returns:
            Transport instance or None if not connected
        """
        return self._transport

    def create_local_port_forward(
        self,
        local_port: int,
        remote_host: str,
        remote_port: int,
        local_host: str = "127.0.0.1",
    ) -> str:
        """
        Create local port forwarding tunnel.

        Args:
            local_port: Local port to listen on
            remote_host: Remote host to connect to
            remote_port: Remote port to connect to
            local_host: Local interface to bind to

        Returns:
            Tunnel ID for management

        Raises:
            SSHException: If tunnel creation fails
        """
        if not self.is_connected():
            raise SSHException("Not connected to SSH server")

        try:
            if self._transport is None:
                raise SSHException("No transport available")
            forwarding_manager = self._transport.get_port_forwarding_manager()
            return forwarding_manager.create_local_tunnel(
                local_port, remote_host, remote_port, local_host
            )
        except Exception as e:
            if isinstance(e, SSHException):
                raise
            raise SSHException(f"Failed to create local port forwarding: {e}") from e

    def create_remote_port_forward(
        self, remote_port: int, local_host: str, local_port: int, remote_host: str = ""
    ) -> str:
        """
        Create remote port forwarding tunnel.

        Args:
            remote_port: Remote port to listen on
            local_host: Local host to connect to
            local_port: Local port to connect to
            remote_host: Remote interface to bind to

        Returns:
            Tunnel ID for management

        Raises:
            SSHException: If tunnel creation fails
        """
        if not self.is_connected():
            raise SSHException("Not connected to SSH server")

        try:
            if self._transport is None:
                raise SSHException("No transport available")
            forwarding_manager = self._transport.get_port_forwarding_manager()
            return forwarding_manager.create_remote_tunnel(
                remote_port, local_host, local_port, remote_host
            )
        except Exception as e:
            if isinstance(e, SSHException):
                raise
            raise SSHException(f"Failed to create remote port forwarding: {e}") from e

    def close_port_forward(self, tunnel_id: str) -> None:
        """
        Close port forwarding tunnel.

        Args:
            tunnel_id: Tunnel identifier returned by create_*_port_forward

        Raises:
            SSHException: If tunnel closure fails
        """
        if not self.is_connected():
            raise SSHException("Not connected to SSH server")

        try:
            if self._transport is None:
                raise SSHException("No transport available")
            forwarding_manager = self._transport.get_port_forwarding_manager()
            forwarding_manager.close_tunnel(tunnel_id)
        except Exception as e:
            if isinstance(e, SSHException):
                raise
            raise SSHException(f"Failed to close port forwarding tunnel: {e}") from e

    def get_port_forwards(self) -> dict[str, Any]:
        """
        Get all active port forwarding tunnels.

        Returns:
            Dictionary of tunnel ID to tunnel information

        Raises:
            SSHException: If operation fails
        """
        if not self.is_connected():
            raise SSHException("Not connected to SSH server")

        try:
            if self._transport is None:
                raise SSHException("No transport available")
            forwarding_manager = self._transport.get_port_forwarding_manager()
            tunnels = forwarding_manager.get_all_tunnels()

            # Convert to serializable format
            result = {}
            for tunnel_id, tunnel in tunnels.items():
                result[tunnel_id] = {
                    "local_addr": tunnel.local_addr,
                    "remote_addr": tunnel.remote_addr,
                    "tunnel_type": tunnel.tunnel_type,
                    "active": tunnel.active,
                    "connections": len(tunnel.connections),
                }

            return result
        except Exception as e:
            if isinstance(e, SSHException):
                raise
            raise SSHException(f"Failed to get port forwarding tunnels: {e}") from e

    @property
    def is_active(self) -> bool:
        """Check if SSH connection is active."""
        return self._transport is not None and self._transport.active

    def get_host_keys(self) -> HostKeyStorage:
        """Get host key storage instance."""
        return self._host_key_storage

    def __enter__(self) -> "SSHClient":
        """Context manager entry."""
        return self

    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        """Context manager exit with cleanup."""
        self.close()
Attributes
is_active property

Check if SSH connection is active.

username property

Return the username used to authenticate this connection, or None if not connected.

Methods:
__enter__()

Context manager entry.

Source code in spindlex/client/ssh_client.py
def __enter__(self) -> "SSHClient":
    """Context manager entry."""
    return self
__exit__(exc_type, exc_val, exc_tb)

Context manager exit with cleanup.

Source code in spindlex/client/ssh_client.py
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
    """Context manager exit with cleanup."""
    self.close()
__init__()

Initialize SSH client with default settings.

Source code in spindlex/client/ssh_client.py
def __init__(self) -> None:
    """Initialize SSH client with default settings."""
    self._transport: Optional[Transport] = None
    self._hostname: Optional[str] = None
    self._port: int = 22
    self._connected_username: Optional[str] = None
    self._host_key_policy: MissingHostKeyPolicy = RejectPolicy()
    self._host_key_storage = HostKeyStorage()
    self._logger = logging.getLogger(__name__)
auth_gssapi(username, gss_host=None, gss_deleg_creds=False)

Authenticate using GSSAPI (Kerberos).

Parameters:

Name Type Description Default
username str

Username for authentication

required
gss_host Optional[str]

GSSAPI hostname (optional)

None
gss_deleg_creds bool

Whether to delegate credentials

False

Raises:

Type Description
AuthenticationException

If authentication fails

Source code in spindlex/client/ssh_client.py
def auth_gssapi(
    self,
    username: str,
    gss_host: Optional[str] = None,
    gss_deleg_creds: bool = False,
) -> None:
    """
    Authenticate using GSSAPI (Kerberos).

    Args:
        username: Username for authentication
        gss_host: GSSAPI hostname (optional)
        gss_deleg_creds: Whether to delegate credentials

    Raises:
        AuthenticationException: If authentication fails
    """
    if not self._transport:
        raise SSHException("No transport available")

    if not self._transport.auth_gssapi(username, gss_host, gss_deleg_creds):
        raise AuthenticationException("GSSAPI authentication failed")
auth_keyboard_interactive(username, handler=None)

Authenticate using keyboard-interactive method.

Parameters:

Name Type Description Default
username str

Username for authentication

required
handler Optional[Callable[[str, str, list[tuple[str, bool]]], list[str]]]

Callback function to handle prompts. If None, uses console_handler (terminal prompts).

None

Raises:

Type Description
AuthenticationException

If authentication fails

Source code in spindlex/client/ssh_client.py
def auth_keyboard_interactive(
    self,
    username: str,
    handler: Optional[
        Callable[[str, str, list[tuple[str, bool]]], list[str]]
    ] = None,
) -> None:
    """
    Authenticate using keyboard-interactive method.

    Args:
        username: Username for authentication
        handler: Callback function to handle prompts.
                 If None, uses console_handler (terminal prompts).

    Raises:
        AuthenticationException: If authentication fails
    """
    if not self._transport:
        raise SSHException("No transport available")

    if handler is None:
        handler = console_handler

    if not self._transport.auth_keyboard_interactive(username, handler):
        raise AuthenticationException("Keyboard-interactive authentication failed")
auth_password(username, password)

Authenticate using password.

Parameters:

Name Type Description Default
username str

Username for authentication

required
password str

Password for authentication

required

Raises:

Type Description
AuthenticationException

If authentication fails

Source code in spindlex/client/ssh_client.py
def auth_password(self, username: str, password: str) -> None:
    """
    Authenticate using password.

    Args:
        username: Username for authentication
        password: Password for authentication

    Raises:
        AuthenticationException: If authentication fails
    """
    if not self._transport:
        raise SSHException("No transport available")

    if not self._transport.auth_password(username, password):
        raise AuthenticationException("Password authentication failed")
auth_publickey(username, pkey=None, key_filename=None, password=None)

Authenticate using public key.

Parameters:

Name Type Description Default
username str

Username for authentication

required
pkey Optional[PKey]

Private key instance

None
key_filename Optional[str]

Path to private key file

None
password Optional[str]

Optional password for encrypted private keys

None

Raises:

Type Description
AuthenticationException

If authentication fails

Source code in spindlex/client/ssh_client.py
def auth_publickey(
    self,
    username: str,
    pkey: Optional[PKey] = None,
    key_filename: Optional[str] = None,
    password: Optional[str] = None,
) -> None:
    """
    Authenticate using public key.

    Args:
        username: Username for authentication
        pkey: Private key instance
        key_filename: Path to private key file
        password: Optional password for encrypted private keys

    Raises:
        AuthenticationException: If authentication fails
    """
    if not self._transport:
        raise SSHException("No transport available")

    if key_filename:
        pkey = PKey.from_private_key_file(key_filename, password)

    if pkey is None:
        raise AuthenticationException("No private key provided")

    if not self._transport.auth_publickey(username, pkey):
        raise AuthenticationException("Public key authentication failed")
close()

Close SSH connection and cleanup resources.

Source code in spindlex/client/ssh_client.py
def close(self) -> None:
    """Close SSH connection and cleanup resources."""
    try:
        if self._transport:
            self._logger.debug(
                f"Closing connection to {self._hostname}:{self._port}"
            )

            # Close all port forwarding tunnels
            try:
                forwarding_manager = self._transport.get_port_forwarding_manager()
                forwarding_manager.close_all_tunnels()
            except Exception as e:
                self._logger.warning(f"Error closing port forwarding tunnels: {e}")

            self._transport.close()
            self._transport = None
            self._logger.info(f"Connection closed to {self._hostname}:{self._port}")
    except Exception as e:
        self._logger.warning(f"Error during connection cleanup: {e}")
    finally:
        self._transport = None
        self._hostname = None
        self._port = 22
close_port_forward(tunnel_id)

Close port forwarding tunnel.

Parameters:

Name Type Description Default
tunnel_id str

Tunnel identifier returned by create_*_port_forward

required

Raises:

Type Description
SSHException

If tunnel closure fails

Source code in spindlex/client/ssh_client.py
def close_port_forward(self, tunnel_id: str) -> None:
    """
    Close port forwarding tunnel.

    Args:
        tunnel_id: Tunnel identifier returned by create_*_port_forward

    Raises:
        SSHException: If tunnel closure fails
    """
    if not self.is_connected():
        raise SSHException("Not connected to SSH server")

    try:
        if self._transport is None:
            raise SSHException("No transport available")
        forwarding_manager = self._transport.get_port_forwarding_manager()
        forwarding_manager.close_tunnel(tunnel_id)
    except Exception as e:
        if isinstance(e, SSHException):
            raise
        raise SSHException(f"Failed to close port forwarding tunnel: {e}") from e
connect(hostname, port=22, username=None, password=None, pkey=None, key_filename=None, key_password=None, timeout=None, compress=False, sock=None, gss_auth=False, gss_kex=False, gss_deleg_creds=True, gss_host=None, rekey_bytes_limit=None, rekey_time_limit=None)

Connect to SSH server and authenticate.

Parameters:

Name Type Description Default
hostname str

Server hostname or IP address

required
port int

Server port (default 22)

22
username Optional[str]

Username for authentication

None
password Optional[str]

Password for authentication

None
pkey Optional[PKey]

Private key for authentication

None
key_filename Optional[str]

Path to private key file

None
timeout Optional[float]

Connection timeout in seconds

None
compress bool

Whether to enable compression

False
sock Optional[socket]

Optional existing socket to use

None
gss_auth bool

Whether to use GSSAPI authentication

False
gss_kex bool

Whether to use GSSAPI key exchange

False
gss_deleg_creds bool

Whether to delegate GSSAPI credentials

True
gss_host Optional[str]

GSSAPI hostname override

None
rekey_bytes_limit Optional[int]

Number of bytes before rekeying (default: 1GB)

None
rekey_time_limit Optional[float]

Seconds before rekeying (default: 1 hour)

None

Raises:

Type Description
SSHException

If connection or authentication fails

Source code in spindlex/client/ssh_client.py
def connect(
    self,
    hostname: str,
    port: int = 22,
    username: Optional[str] = None,
    password: Optional[str] = None,
    pkey: Optional[PKey] = None,
    key_filename: Optional[str] = None,
    key_password: Optional[str] = None,
    timeout: Optional[float] = None,
    compress: bool = False,
    sock: Optional[socket.socket] = None,
    gss_auth: bool = False,
    gss_kex: bool = False,
    gss_deleg_creds: bool = True,
    gss_host: Optional[str] = None,
    rekey_bytes_limit: Optional[int] = None,
    rekey_time_limit: Optional[float] = None,
) -> None:
    """
    Connect to SSH server and authenticate.

    Args:
        hostname: Server hostname or IP address
        port: Server port (default 22)
        username: Username for authentication
        password: Password for authentication
        pkey: Private key for authentication
        key_filename: Path to private key file
        timeout: Connection timeout in seconds
        compress: Whether to enable compression
        sock: Optional existing socket to use
        gss_auth: Whether to use GSSAPI authentication
        gss_kex: Whether to use GSSAPI key exchange
        gss_deleg_creds: Whether to delegate GSSAPI credentials
        gss_host: GSSAPI hostname override
        rekey_bytes_limit: Number of bytes before rekeying (default: 1GB)
        rekey_time_limit: Seconds before rekeying (default: 1 hour)

    Raises:
        SSHException: If connection or authentication fails
    """
    if self._transport and self._transport.active:
        raise SSHException("Already connected")

    if compress:
        from ..exceptions import ConfigurationException

        raise ConfigurationException("Compression is not yet supported")

    if gss_kex:
        from ..exceptions import ConfigurationException

        raise ConfigurationException("GSSAPI key exchange is not yet supported")

    self._hostname = hostname
    self._port = port
    self._connected_username = username

    # Validate port
    if not (0 < port <= 65535):
        raise SSHException(f"Invalid port number: {port}")

    try:
        if sock is None:
            # Create socket connection - create_connection resolves IPv4 and IPv6
            self._logger.debug(f"Connecting to {hostname}:{port}")
            try:
                sock = socket.create_connection(
                    (hostname, port),
                    timeout=timeout if timeout else None,
                )
            except OSError as e:
                raise SSHException(f"Connection failed: {e}") from e

            # Enable TCP_NODELAY to reduce latency (Nagle's algorithm)
            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

        # Create transport
        self._transport = Transport(
            sock,
            rekey_bytes_limit=rekey_bytes_limit,
            rekey_time_limit=rekey_time_limit,
        )

        # Set permanent timeout on transport
        if timeout:
            self._transport.set_timeout(timeout)

        # Start client transport (handshake and key exchange)
        self._transport.start_client(timeout)

        # Verify host key
        self._verify_host_key()

        # Authenticate if credentials provided
        if username:
            self._authenticate(
                username,
                password,
                pkey,
                key_filename,
                key_password,
                gss_auth,
                gss_host,
                gss_deleg_creds,
            )

        self._logger.info(f"Successfully connected to {hostname}:{port}")

    except Exception as e:
        # Cleanup on failure
        if self._transport:
            self._transport.close()
            self._transport = None

        if isinstance(
            e, (SSHException, AuthenticationException, BadHostKeyException)
        ):
            raise
        raise SSHException(f"Connection failed: {e}") from e
create_local_port_forward(local_port, remote_host, remote_port, local_host='127.0.0.1')

Create local port forwarding tunnel.

Parameters:

Name Type Description Default
local_port int

Local port to listen on

required
remote_host str

Remote host to connect to

required
remote_port int

Remote port to connect to

required
local_host str

Local interface to bind to

'127.0.0.1'

Returns:

Type Description
str

Tunnel ID for management

Raises:

Type Description
SSHException

If tunnel creation fails

Source code in spindlex/client/ssh_client.py
def create_local_port_forward(
    self,
    local_port: int,
    remote_host: str,
    remote_port: int,
    local_host: str = "127.0.0.1",
) -> str:
    """
    Create local port forwarding tunnel.

    Args:
        local_port: Local port to listen on
        remote_host: Remote host to connect to
        remote_port: Remote port to connect to
        local_host: Local interface to bind to

    Returns:
        Tunnel ID for management

    Raises:
        SSHException: If tunnel creation fails
    """
    if not self.is_connected():
        raise SSHException("Not connected to SSH server")

    try:
        if self._transport is None:
            raise SSHException("No transport available")
        forwarding_manager = self._transport.get_port_forwarding_manager()
        return forwarding_manager.create_local_tunnel(
            local_port, remote_host, remote_port, local_host
        )
    except Exception as e:
        if isinstance(e, SSHException):
            raise
        raise SSHException(f"Failed to create local port forwarding: {e}") from e
create_remote_port_forward(remote_port, local_host, local_port, remote_host='')

Create remote port forwarding tunnel.

Parameters:

Name Type Description Default
remote_port int

Remote port to listen on

required
local_host str

Local host to connect to

required
local_port int

Local port to connect to

required
remote_host str

Remote interface to bind to

''

Returns:

Type Description
str

Tunnel ID for management

Raises:

Type Description
SSHException

If tunnel creation fails

Source code in spindlex/client/ssh_client.py
def create_remote_port_forward(
    self, remote_port: int, local_host: str, local_port: int, remote_host: str = ""
) -> str:
    """
    Create remote port forwarding tunnel.

    Args:
        remote_port: Remote port to listen on
        local_host: Local host to connect to
        local_port: Local port to connect to
        remote_host: Remote interface to bind to

    Returns:
        Tunnel ID for management

    Raises:
        SSHException: If tunnel creation fails
    """
    if not self.is_connected():
        raise SSHException("Not connected to SSH server")

    try:
        if self._transport is None:
            raise SSHException("No transport available")
        forwarding_manager = self._transport.get_port_forwarding_manager()
        return forwarding_manager.create_remote_tunnel(
            remote_port, local_host, local_port, remote_host
        )
    except Exception as e:
        if isinstance(e, SSHException):
            raise
        raise SSHException(f"Failed to create remote port forwarding: {e}") from e
exec_command(command, bufsize=-1)

Execute command on remote server.

Parameters:

Name Type Description Default
command str

Command to execute

required
bufsize int

Buffer size for streams (unused, kept for compatibility)

-1

Returns:

Type Description
tuple[ChannelFile, ChannelFile, ChannelFile]

Tuple of (stdin, stdout, stderr) file-like objects

Raises:

Type Description
SSHException

If command execution fails

Source code in spindlex/client/ssh_client.py
def exec_command(
    self, command: str, bufsize: int = -1
) -> tuple[ChannelFile, ChannelFile, ChannelFile]:
    """
    Execute command on remote server.

    Args:
        command: Command to execute
        bufsize: Buffer size for streams (unused, kept for compatibility)

    Returns:
        Tuple of (stdin, stdout, stderr) file-like objects

    Raises:
        SSHException: If command execution fails
    """
    if not self.is_connected():
        raise SSHException("Not connected to server")

    if not command.strip():
        raise SSHException("Command cannot be empty")

    try:
        if self._transport is None:
            raise SSHException("No transport available")
        # Open a new session channel
        channel = self._transport.open_channel("session")

        # Execute the command
        channel.exec_command(command)

        # Create file-like objects for the streams
        stdin = ChannelFile(channel, "w")
        stdout = ChannelFile(channel, "r")
        stderr = ChannelFile(channel, "stderr")

        self._logger.debug(f"Command executed: {command}")

        return stdin, stdout, stderr

    except Exception as e:
        if isinstance(e, SSHException):
            raise
        raise SSHException(f"Failed to execute command '{command}': {e}") from e
get_host_key_storage()

Get host key storage instance.

Returns:

Type Description
HostKeyStorage

Current host key storage

Source code in spindlex/client/ssh_client.py
def get_host_key_storage(self) -> HostKeyStorage:
    """
    Get host key storage instance.

    Returns:
        Current host key storage
    """
    return self._host_key_storage
get_host_keys()

Get host key storage instance.

Source code in spindlex/client/ssh_client.py
def get_host_keys(self) -> HostKeyStorage:
    """Get host key storage instance."""
    return self._host_key_storage
get_port_forwards()

Get all active port forwarding tunnels.

Returns:

Type Description
dict[str, Any]

Dictionary of tunnel ID to tunnel information

Raises:

Type Description
SSHException

If operation fails

Source code in spindlex/client/ssh_client.py
def get_port_forwards(self) -> dict[str, Any]:
    """
    Get all active port forwarding tunnels.

    Returns:
        Dictionary of tunnel ID to tunnel information

    Raises:
        SSHException: If operation fails
    """
    if not self.is_connected():
        raise SSHException("Not connected to SSH server")

    try:
        if self._transport is None:
            raise SSHException("No transport available")
        forwarding_manager = self._transport.get_port_forwarding_manager()
        tunnels = forwarding_manager.get_all_tunnels()

        # Convert to serializable format
        result = {}
        for tunnel_id, tunnel in tunnels.items():
            result[tunnel_id] = {
                "local_addr": tunnel.local_addr,
                "remote_addr": tunnel.remote_addr,
                "tunnel_type": tunnel.tunnel_type,
                "active": tunnel.active,
                "connections": len(tunnel.connections),
            }

        return result
    except Exception as e:
        if isinstance(e, SSHException):
            raise
        raise SSHException(f"Failed to get port forwarding tunnels: {e}") from e
get_transport()

Get underlying transport object.

Returns:

Type Description
Optional[Transport]

Transport instance or None if not connected

Source code in spindlex/client/ssh_client.py
def get_transport(self) -> Optional[Transport]:
    """
    Get underlying transport object.

    Returns:
        Transport instance or None if not connected
    """
    return self._transport
invoke_shell()

Start interactive shell session.

Returns:

Type Description
Channel

Channel object for shell interaction

Raises:

Type Description
SSHException

If shell invocation fails

Source code in spindlex/client/ssh_client.py
def invoke_shell(self) -> Channel:
    """
    Start interactive shell session.

    Returns:
        Channel object for shell interaction

    Raises:
        SSHException: If shell invocation fails
    """
    if not self.is_connected():
        raise SSHException("Not connected to server")

    try:
        if self._transport is None:
            raise SSHException("No transport available")
        # Open a new session channel
        channel = self._transport.open_channel("session")

        # Request a pseudo-terminal (usually needed for shell)
        channel.request_pty()

        # Invoke shell
        channel.invoke_shell()

        self._logger.debug("Interactive shell session started")

        return channel

    except Exception as e:
        if isinstance(e, SSHException):
            raise
        raise SSHException(f"Failed to invoke shell: {e}") from e
is_connected()

Check if client is connected and authenticated.

Returns:

Type Description
bool

True if connected and authenticated, False otherwise

Source code in spindlex/client/ssh_client.py
def is_connected(self) -> bool:
    """
    Check if client is connected and authenticated.

    Returns:
        True if connected and authenticated, False otherwise
    """
    return (
        self._transport is not None
        and self._transport.active
        and self._transport.authenticated
    )
load_host_keys(filename)

Load host keys from a file.

Parameters:

Name Type Description Default
filename str

Path to known_hosts file

required
Source code in spindlex/client/ssh_client.py
def load_host_keys(self, filename: str) -> None:
    """
    Load host keys from a file.

    Args:
        filename: Path to known_hosts file
    """
    if not self._host_key_storage:
        self._host_key_storage = HostKeyStorage(filename)
    else:
        self._host_key_storage.load(filename)
load_system_host_keys()

Load host keys from system default locations.

Source code in spindlex/client/ssh_client.py
def load_system_host_keys(self) -> None:
    """
    Load host keys from system default locations.
    """
    # Common locations
    import os

    paths = [
        os.path.expanduser("~/.ssh/known_hosts"),
        os.path.expanduser("~/.ssh/known_hosts2"),
        "/etc/ssh/ssh_known_hosts",
        "/etc/ssh/ssh_known_hosts2",
    ]
    for path in paths:
        if os.path.exists(path):
            self.load_host_keys(path)
open_sftp()

Open SFTP session.

Returns:

Type Description
SFTPClient

SFTPClient instance for file operations

Raises:

Type Description
SSHException

If SFTP session creation fails

Source code in spindlex/client/ssh_client.py
def open_sftp(self) -> "SFTPClient":
    """
    Open SFTP session.

    Returns:
        SFTPClient instance for file operations

    Raises:
        SSHException: If SFTP session creation fails
    """
    if not self.is_connected():
        raise SSHException("Not connected to SSH server")

    try:
        from .sftp_client import SFTPClient

        if self._transport is None:
            raise SSHException("No transport available")
        return SFTPClient(self._transport)
    except Exception as e:
        if isinstance(e, SSHException):
            raise
        raise SSHException(f"Failed to open SFTP session: {e}") from e
save_host_keys(filename)

Save host keys to a file.

Parameters:

Name Type Description Default
filename str

Path to save known_hosts

required
Source code in spindlex/client/ssh_client.py
def save_host_keys(self, filename: str) -> None:
    """
    Save host keys to a file.

    Args:
        filename: Path to save known_hosts
    """
    if self._host_key_storage._filename != filename:
        new_storage = HostKeyStorage(filename)
        new_storage.copy_from(self._host_key_storage)
        self._host_key_storage = new_storage

    self._host_key_storage.save()
set_host_key_storage(storage)

Set host key storage instance.

Parameters:

Name Type Description Default
storage HostKeyStorage

Host key storage to use

required
Source code in spindlex/client/ssh_client.py
def set_host_key_storage(self, storage: HostKeyStorage) -> None:
    """
    Set host key storage instance.

    Args:
        storage: Host key storage to use
    """
    self._host_key_storage = storage
set_missing_host_key_policy(policy)

Set policy for handling unknown host keys.

Parameters:

Name Type Description Default
policy MissingHostKeyPolicy

Host key policy to use for unknown hosts

required
Source code in spindlex/client/ssh_client.py
def set_missing_host_key_policy(self, policy: MissingHostKeyPolicy) -> None:
    """
    Set policy for handling unknown host keys.

    Args:
        policy: Host key policy to use for unknown hosts
    """
    self._host_key_policy = policy

Functions:

spindlex.client.async_ssh_client

Async SSH Client Implementation

Provides asynchronous SSH client functionality for high-concurrency applications.

Classes

AsyncSSHClient

Async SSH client for establishing SSH connections and executing commands.

Provides asynchronous versions of all SSH client operations for use in async/await applications and high-concurrency scenarios.

Source code in spindlex/client/async_ssh_client.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
class AsyncSSHClient:
    """
    Async SSH client for establishing SSH connections and executing commands.

    Provides asynchronous versions of all SSH client operations for use
    in async/await applications and high-concurrency scenarios.
    """

    def __init__(self) -> None:
        """Initialize async SSH client."""
        self._transport: AsyncTransport | None = None
        self._hostname: str | None = None
        self._port: int = 22
        self._username: str | None = None
        self._host_key_policy: MissingHostKeyPolicy = RejectPolicy()
        self._host_key_storage = HostKeyStorage()
        self._logger = logging.getLogger(__name__)
        self._connected = False

    async def connect(
        self,
        hostname: str,
        port: int = 22,
        username: str | None = None,
        password: str | None = None,
        pkey: Any | None = None,
        key_filename: str | list[str] | None = None,
        key_password: str | None = None,
        timeout: float | None = None,
        compress: bool = False,
        sock: Any | None = None,
        gss_auth: bool = False,
        gss_kex: bool = False,
        gss_deleg_creds: bool = True,
        gss_host: str | None = None,
        rekey_bytes_limit: int | None = None,
        rekey_time_limit: int | None = None,
    ) -> None:
        """
        Connect to SSH server asynchronously with retry logic for transient errors.

        Args:
            hostname: Server hostname or IP address
            port: Server port (default: 22)
            username: Username for authentication
            password: Password for authentication
            pkey: Private key for authentication
            key_filename: Path to private key file(s)
            key_password: Password for encrypted private key file(s)
            timeout: Connection timeout in seconds
            compress: Enable compression
            sock: Optional existing socket or channel to use
            rekey_bytes_limit: Number of bytes before rekeying (default: 1GB)
            rekey_time_limit: Seconds before rekeying (default: 1 hour)
            gss_auth: Use GSSAPI authentication
            gss_kex: Use GSSAPI key exchange
            gss_deleg_creds: Delegate GSSAPI credentials
            gss_host: GSSAPI hostname override

        Raises:
            SSHException: If connection fails
            AuthenticationException: If authentication fails
        """
        if self._connected:
            raise SSHException("Already connected")

        # Validate port
        if not (0 < port <= 65535):
            raise SSHException(f"Invalid port number: {port}")

        max_retries = 3

        try:
            for attempt in range(max_retries):
                try:
                    if sock is None:
                        # Create socket connection
                        current_sock, reader, writer = await self._create_connection(
                            hostname, port, timeout
                        )
                    else:
                        # If sock is provided, we need to wrap it if it's a raw socket
                        # In asyncio, we usually need reader/writer.
                        # If it's a SpindleX Channel, it might need special handling.
                        current_sock = sock
                        if hasattr(
                            current_sock, "makefile"
                        ):  # Likely a socket-like object
                            reader, writer = await asyncio.open_connection(
                                sock=current_sock
                            )
                        else:
                            # Assume it's already a pair or handled by transport
                            reader, writer = None, None

                    # Create async transport
                    self._transport = AsyncTransport(
                        current_sock,
                        rekey_bytes_limit=rekey_bytes_limit,
                        rekey_time_limit=rekey_time_limit,
                    )

                    # Use connect_existing helper to set reader/writer safely
                    if reader and writer:
                        await self._transport.connect_existing(reader, writer)

                    # Start client transport
                    await self._transport.start_client(timeout)

                    # Store connection info before host key verification so hostname is available
                    self._hostname = hostname
                    self._port = port
                    self._username = username
                    self._connected = True

                    # Verify host key
                    self._verify_host_key()

                    # Perform authentication if credentials provided
                    if username:
                        await self._authenticate(
                            username,
                            password=password,
                            pkey=pkey,
                            key_filename=key_filename,
                            key_password=key_password,
                            gss_auth=gss_auth,
                            gss_host=gss_host,
                            gss_deleg_creds=gss_deleg_creds,
                        )

                    return  # Success

                except (
                    SSHException,
                    ConnectionResetError,
                    ConnectionRefusedError,
                    OSError,
                ) as e:
                    # Cleanup failed attempt
                    if self._transport:
                        await self._transport.close()
                        self._transport = None

                    # Only retry on transient connection-level errors.
                    # Avoid retrying on auth failure or host key mismatch.
                    is_transient = isinstance(
                        e, (ConnectionResetError, ConnectionRefusedError, socket.error)
                    ) or (
                        isinstance(e, SSHException)
                        and any(
                            kw in str(e)
                            for kw in (
                                "Connection reset",
                                "Connection failed",
                                "Client start failed",
                                "Connection closed",
                            )
                        )
                    )

                    if not is_transient or attempt == max_retries - 1:
                        if isinstance(
                            e,
                            (
                                SSHException,
                                AuthenticationException,
                                BadHostKeyException,
                            ),
                        ):
                            raise
                        raise SSHException(f"Connection failed: {e}") from e

                    # Exponential backoff with jitter: 0.2s, 0.4s, 0.8s...
                    import random

                    wait = (0.2 * (2**attempt)) + (random.random() * 0.1)  # noqa: S311 # nosec  # fmt: skip
                    self._logger.debug(
                        f"Connection attempt {attempt + 1} failed: {e}. Retrying in {wait:.2f}s..."
                    )
                    await asyncio.sleep(wait)
        except Exception as e:
            if self._transport:
                await self._transport.close()
                self._transport = None

            if isinstance(
                e, (SSHException, AuthenticationException, BadHostKeyException)
            ):
                raise
            raise SSHException(f"Connection failed: {e}") from e

    def _verify_host_key(self) -> None:
        """
        Verify server host key according to policy.

        Raises:
            BadHostKeyException: If host key verification fails
        """
        if not self._transport:
            raise SSHException("No transport available")

        hostname = self._hostname or "unknown"
        server_key = None

        try:
            # Get actual server host key from transport
            server_key = self._transport.get_server_host_key()

            if server_key is None:
                raise SSHException("No server host key received")

            # Check all stored keys for this hostname (MED-12)
            known_keys = self._host_key_storage.get_all(hostname)

            if not known_keys:
                # No known key - apply missing host key policy
                self._logger.debug(f"No known host key for {hostname}")

                try:
                    self._host_key_policy.missing_host_key(self, hostname, server_key)
                except BadHostKeyException:
                    # Policy rejected the key
                    raise
                except Exception as e:
                    # Policy had an error but didn't reject
                    self._logger.warning(f"Host key policy error: {e}")
            else:
                # We have known keys - check if any match the server key
                self._logger.debug(f"Found known host key(s) for {hostname}")

                server_key_bytes = server_key.get_public_key_bytes()

                # Filter known keys by algorithm to avoid false positives for different key types
                known_keys_of_type = [
                    k
                    for k in known_keys
                    if k.algorithm_name == server_key.algorithm_name
                ]

                if known_keys_of_type:
                    if not any(
                        k.get_public_key_bytes() == server_key_bytes
                        for k in known_keys_of_type
                    ):
                        raise BadHostKeyException(
                            hostname, server_key, known_keys_of_type[0]
                        )
                else:
                    # We have keys for this host, but not of this type.
                    # Standard behavior is to treat it as a new (missing) host key.
                    self._logger.debug(
                        f"No known {server_key.algorithm_name} host key for {hostname}"
                    )
                    try:
                        self._host_key_policy.missing_host_key(
                            self, hostname, server_key
                        )
                    except BadHostKeyException:
                        raise
                    except Exception as e:
                        self._logger.warning(f"Host key policy error: {e}")

        except BadHostKeyException:
            raise
        except SSHException:
            raise
        except Exception as e:
            self._logger.error(f"Host key verification error: {e}")
            raise BadHostKeyException(hostname, server_key)

    async def _create_connection(
        self, hostname: str, port: int, timeout: float | None
    ) -> tuple[socket.socket, asyncio.StreamReader, asyncio.StreamWriter]:
        """
        Create socket connection to SSH server.

        Args:
            hostname: Server hostname
            port: Server port
            timeout: Connection timeout

        Returns:
            Tuple of (socket, reader, writer)
        """
        try:
            # Use asyncio to create connection
            reader, writer = await asyncio.wait_for(
                asyncio.open_connection(hostname, port), timeout=timeout
            )

            # Get the underlying socket and tune it for throughput.
            sock = writer.get_extra_info("socket")
            if sock is not None:
                # Disable Nagle's algorithm so small control packets are sent
                # immediately rather than coalesced (critical for handshake RTT).
                sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
                # Larger kernel socket buffers reduce stalls on high-throughput transfers.
                sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1 << 20)  # 1 MB
                sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1 << 20)  # 1 MB

            return sock, reader, writer

        except asyncio.TimeoutError as e:
            raise SSHException(f"Connection timeout to {hostname}:{port}") from e
        except Exception as e:
            raise SSHException(f"Failed to connect to {hostname}:{port}: {e}") from e

    async def exec_command(
        self, command: str, bufsize: int = -1, timeout: float | None = None
    ) -> tuple[Any, Any, Any]:
        """
        Execute command on remote server asynchronously.

        Args:
            command: Command to execute
            bufsize: Buffer size for streams
            timeout: Command timeout in seconds

        Returns:
            Tuple of (stdin, stdout, stderr) streams

        Raises:
            SSHException: If command execution fails
        """
        if not self._connected or not self._transport:
            raise SSHException("Not connected")

        try:
            # Open channel
            channel = await self._transport.open_channel("session")

            # Execute command
            await channel.exec_command(command)

            # Return channel file objects
            return (
                channel.makefile("wb", bufsize),
                channel.makefile("rb", bufsize),
                channel.makefile_stderr("rb", bufsize),
            )

        except Exception as e:
            if isinstance(e, SSHException):
                raise
            raise SSHException(f"Command execution failed: {e}") from e

    async def invoke_shell(self) -> Any:
        """
        Start interactive shell asynchronously.

        Returns:
            Channel for shell interaction

        Raises:
            SSHException: If shell invocation fails
        """
        if not self._connected or not self._transport:
            raise SSHException("Not connected")

        try:
            # Open channel
            channel = await self._transport.open_channel("session")

            # Invoke shell
            await channel.invoke_shell()

            return channel

        except Exception as e:
            if isinstance(e, SSHException):
                raise
            raise SSHException(f"Shell invocation failed: {e}") from e

    async def open_sftp(self) -> AsyncSFTPClient:
        """
        Open SFTP client asynchronously.

        Returns:
            Async SFTP client instance

        Raises:
            SSHException: If SFTP open fails
        """
        if not self._connected or not self._transport:
            raise SSHException("Not connected")

        try:
            # Open SFTP subsystem channel
            channel = await self._transport.open_channel("session")
            await channel.invoke_subsystem("sftp")

            # Create async SFTP client
            sftp_client = AsyncSFTPClient(channel)
            await sftp_client._initialize()

            return sftp_client

        except Exception as e:
            if isinstance(e, SSHException):
                raise
            raise SSHException(f"SFTP open failed: {e}") from e

    async def auth_password(self, username: str, password: str) -> None:
        """
        Authenticate using password asynchronously.

        Args:
            username: Username for authentication
            password: Password for authentication

        Raises:
            AuthenticationException: If authentication fails
        """
        if not self._transport:
            raise SSHException("No transport available")

        if not await self._transport.auth_password(username, password):
            raise AuthenticationException("Password authentication failed")

    async def auth_publickey(
        self,
        username: str,
        pkey: Any | None = None,
        key_filename: str | list[str] | None = None,
        password: str | None = None,
    ) -> None:
        """
        Authenticate using public key asynchronously.

        Args:
            username: Username for authentication
            pkey: Private key instance
            key_filename: Path to private key file(s)
            password: Optional password for encrypted private keys

        Raises:
            AuthenticationException: If authentication fails
        """
        if not self._transport:
            raise SSHException("No transport available")

        # Load key(s) from file if provided
        if key_filename:
            from ..crypto.pkey import PKey

            filenames = (
                [key_filename] if isinstance(key_filename, str) else key_filename
            )
            for filename in filenames:
                try:
                    # Run in thread as it does I/O
                    pkey = await asyncio.to_thread(
                        PKey.from_private_key_file, filename, password
                    )
                    if await self._transport.auth_publickey(username, pkey):
                        return
                except Exception as e:
                    self._logger.debug(
                        f"Failed to authenticate with key {filename}: {e}"
                    )

            if not pkey:
                raise AuthenticationException(
                    f"Failed to load keys from {key_filename}"
                )

        if pkey is None:
            raise AuthenticationException("No private key provided")

        if not await self._transport.auth_publickey(username, pkey):
            raise AuthenticationException("Public key authentication failed")

    async def auth_keyboard_interactive(
        self,
        username: str,
        handler: Callable[[str, str, list[tuple[str, bool]]], Any] | None = None,
    ) -> None:
        """Authenticate using keyboard-interactive method asynchronously."""
        if not self._transport:
            raise SSHException("No transport available")

        # Use console_handler by default
        from ..auth.keyboard_interactive import console_handler

        handler = handler or console_handler

        if not await self._transport.auth_keyboard_interactive(username, handler):
            raise AuthenticationException("Keyboard-interactive authentication failed")

    async def auth_gssapi(
        self,
        username: str,
        gss_host: str | None = None,
        gss_deleg_creds: bool = False,
    ) -> None:
        """Authenticate using GSSAPI (Kerberos) asynchronously."""
        if not self._transport:
            raise SSHException("No transport available")

        if not await self._transport.auth_gssapi(username, gss_host, gss_deleg_creds):
            raise AuthenticationException("GSSAPI authentication failed")

    async def _authenticate(
        self,
        username: str,
        password: str | None = None,
        pkey: Any | None = None,
        key_filename: str | list[str] | None = None,
        key_password: str | None = None,
        gss_auth: bool = False,
        gss_host: str | None = None,
        gss_deleg_creds: bool = False,
    ) -> None:
        """Internal helper to guide authentication flow."""
        if not self._transport:
            raise SSHException("No transport available")

        authenticated = False

        # Try GSSAPI if requested
        if gss_auth and not authenticated:
            try:
                await self.auth_gssapi(username, gss_host, gss_deleg_creds)
                authenticated = True
            except Exception as e:
                self._logger.debug(f"GSSAPI authentication failed: {e}")

        # Try Public Key
        if (pkey or key_filename) and not authenticated:
            try:
                await self.auth_publickey(
                    username,
                    pkey=pkey,
                    key_filename=key_filename,
                    password=key_password if key_password is not None else password,
                )
                authenticated = True
            except Exception as e:
                self._logger.debug(f"Public key authentication failed: {e}")

        # Try Password
        if password and not authenticated:
            try:
                await self.auth_password(username, password)
                authenticated = True
            except Exception as e:
                self._logger.debug(f"Password authentication failed: {e}")

        if not authenticated:
            raise AuthenticationException(f"Authentication failed for user {username}")

    async def create_local_port_forward(
        self,
        local_port: int,
        remote_host: str,
        remote_port: int,
        local_host: str = "127.0.0.1",
    ) -> str:
        """
        Create local port forwarding tunnel asynchronously.

        Args:
            local_port: Local port to listen on
            remote_host: Remote host to connect to
            remote_port: Remote port to connect to
            local_host: Local interface to bind to

        Returns:
            Tunnel ID for management
        """
        if not self._connected or not self._transport:
            raise SSHException("Not connected")

        manager = self._transport.get_port_forwarding_manager()
        return await manager.create_local_tunnel(
            local_port, remote_host, remote_port, local_host
        )

    async def create_remote_port_forward(
        self,
        remote_port: int,
        local_host: str,
        local_port: int,
        remote_host: str = "",
    ) -> str:
        """
        Create remote port forwarding tunnel asynchronously.

        Args:
            remote_port: Remote port to listen on
            local_host: Local host to connect to
            local_port: Local port to connect to
            remote_host: Remote interface to bind to

        Returns:
            Tunnel ID for management
        """
        if not self._connected or not self._transport:
            raise SSHException("Not connected")

        manager = self._transport.get_port_forwarding_manager()
        return await manager.create_remote_tunnel(
            remote_port, local_host, local_port, remote_host
        )

    async def close_port_forward(self, tunnel_id: str) -> None:
        """
        Close port forwarding tunnel asynchronously.

        Args:
            tunnel_id: Tunnel identifier
        """
        if self._transport:
            manager = self._transport.get_port_forwarding_manager()
            await manager.close_tunnel(tunnel_id)

    def get_port_forwards(self) -> dict[str, Any]:
        """
        Get all active port forwarding tunnels.

        Returns:
            Dictionary mapping tunnel IDs to tunnel objects
        """
        if self._transport:
            manager = self._transport.get_port_forwarding_manager()
            return manager.get_all_tunnels()
        return {}

    def set_missing_host_key_policy(self, policy: MissingHostKeyPolicy) -> None:
        """
        Set policy for handling unknown host keys.

        Args:
            policy: Host key policy instance
        """
        self._host_key_policy = policy

    def set_host_key_storage(self, storage: HostKeyStorage) -> None:
        """
        Set host key storage instance.

        Args:
            storage: Host key storage to use
        """
        self._host_key_storage = storage

    async def load_host_keys(self, filename: str) -> None:
        """
        Load host keys from a file.

        Args:
            filename: Path to known_hosts file
        """
        if not self._host_key_storage:
            self._host_key_storage = HostKeyStorage(filename)
        else:
            await asyncio.to_thread(self._host_key_storage.load, filename)

    async def load_system_host_keys(self) -> None:
        """
        Load host keys from system default locations.
        """
        # Common locations
        import os

        paths = [
            os.path.expanduser("~/.ssh/known_hosts"),
            os.path.expanduser("~/.ssh/known_hosts2"),
            "/etc/ssh/ssh_known_hosts",
            "/etc/ssh/ssh_known_hosts2",
        ]
        for path in paths:
            if await asyncio.to_thread(os.path.exists, path):
                await self.load_host_keys(path)

    async def save_host_keys(self, filename: str) -> None:
        """
        Save host keys to a file.

        Args:
            filename: Path to save known_hosts
        """
        if not self._host_key_storage or self._host_key_storage._filename != filename:
            old_storage = self._host_key_storage
            self._host_key_storage = HostKeyStorage(filename)
            if old_storage:
                self._host_key_storage._keys = old_storage._keys

        # save() is sync, run in thread
        await asyncio.to_thread(self._host_key_storage.save)

    def get_host_key_storage(self) -> HostKeyStorage:
        """
        Get host key storage instance.

        Returns:
            Current host key storage
        """
        return self._host_key_storage

    def get_host_keys(self) -> HostKeyStorage:
        """
        Alias for get_host_key_storage().

        Returns:
            Current host key storage
        """
        return self.get_host_key_storage()

    async def close(self) -> None:
        """Close SSH connection and cleanup resources."""
        if self._transport:
            await self._transport.close()
            self._transport = None

        self._connected = False
        self._hostname = None
        self._port = 22
        self._username = None

    async def __aenter__(self) -> AsyncSSHClient:
        """Async context manager entry."""
        return self

    async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        """Async context manager exit."""
        await self.close()

    @property
    def connected(self) -> bool:
        """Check if client is connected."""
        return self._connected and self._transport is not None

    @property
    def hostname(self) -> str | None:
        """Get connected hostname."""
        return self._hostname

    @property
    def port(self) -> int:
        """Get connected port."""
        return self._port

    @property
    def username(self) -> str | None:
        """Get authenticated username."""
        return self._username
Attributes
connected property

Check if client is connected.

hostname property

Get connected hostname.

port property

Get connected port.

username property

Get authenticated username.

Methods:
__aenter__() async

Async context manager entry.

Source code in spindlex/client/async_ssh_client.py
async def __aenter__(self) -> AsyncSSHClient:
    """Async context manager entry."""
    return self
__aexit__(exc_type, exc_val, exc_tb) async

Async context manager exit.

Source code in spindlex/client/async_ssh_client.py
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
    """Async context manager exit."""
    await self.close()
__init__()

Initialize async SSH client.

Source code in spindlex/client/async_ssh_client.py
def __init__(self) -> None:
    """Initialize async SSH client."""
    self._transport: AsyncTransport | None = None
    self._hostname: str | None = None
    self._port: int = 22
    self._username: str | None = None
    self._host_key_policy: MissingHostKeyPolicy = RejectPolicy()
    self._host_key_storage = HostKeyStorage()
    self._logger = logging.getLogger(__name__)
    self._connected = False
auth_gssapi(username, gss_host=None, gss_deleg_creds=False) async

Authenticate using GSSAPI (Kerberos) asynchronously.

Source code in spindlex/client/async_ssh_client.py
async def auth_gssapi(
    self,
    username: str,
    gss_host: str | None = None,
    gss_deleg_creds: bool = False,
) -> None:
    """Authenticate using GSSAPI (Kerberos) asynchronously."""
    if not self._transport:
        raise SSHException("No transport available")

    if not await self._transport.auth_gssapi(username, gss_host, gss_deleg_creds):
        raise AuthenticationException("GSSAPI authentication failed")
auth_keyboard_interactive(username, handler=None) async

Authenticate using keyboard-interactive method asynchronously.

Source code in spindlex/client/async_ssh_client.py
async def auth_keyboard_interactive(
    self,
    username: str,
    handler: Callable[[str, str, list[tuple[str, bool]]], Any] | None = None,
) -> None:
    """Authenticate using keyboard-interactive method asynchronously."""
    if not self._transport:
        raise SSHException("No transport available")

    # Use console_handler by default
    from ..auth.keyboard_interactive import console_handler

    handler = handler or console_handler

    if not await self._transport.auth_keyboard_interactive(username, handler):
        raise AuthenticationException("Keyboard-interactive authentication failed")
auth_password(username, password) async

Authenticate using password asynchronously.

Parameters:

Name Type Description Default
username str

Username for authentication

required
password str

Password for authentication

required

Raises:

Type Description
AuthenticationException

If authentication fails

Source code in spindlex/client/async_ssh_client.py
async def auth_password(self, username: str, password: str) -> None:
    """
    Authenticate using password asynchronously.

    Args:
        username: Username for authentication
        password: Password for authentication

    Raises:
        AuthenticationException: If authentication fails
    """
    if not self._transport:
        raise SSHException("No transport available")

    if not await self._transport.auth_password(username, password):
        raise AuthenticationException("Password authentication failed")
auth_publickey(username, pkey=None, key_filename=None, password=None) async

Authenticate using public key asynchronously.

Parameters:

Name Type Description Default
username str

Username for authentication

required
pkey Any | None

Private key instance

None
key_filename str | list[str] | None

Path to private key file(s)

None
password str | None

Optional password for encrypted private keys

None

Raises:

Type Description
AuthenticationException

If authentication fails

Source code in spindlex/client/async_ssh_client.py
async def auth_publickey(
    self,
    username: str,
    pkey: Any | None = None,
    key_filename: str | list[str] | None = None,
    password: str | None = None,
) -> None:
    """
    Authenticate using public key asynchronously.

    Args:
        username: Username for authentication
        pkey: Private key instance
        key_filename: Path to private key file(s)
        password: Optional password for encrypted private keys

    Raises:
        AuthenticationException: If authentication fails
    """
    if not self._transport:
        raise SSHException("No transport available")

    # Load key(s) from file if provided
    if key_filename:
        from ..crypto.pkey import PKey

        filenames = (
            [key_filename] if isinstance(key_filename, str) else key_filename
        )
        for filename in filenames:
            try:
                # Run in thread as it does I/O
                pkey = await asyncio.to_thread(
                    PKey.from_private_key_file, filename, password
                )
                if await self._transport.auth_publickey(username, pkey):
                    return
            except Exception as e:
                self._logger.debug(
                    f"Failed to authenticate with key {filename}: {e}"
                )

        if not pkey:
            raise AuthenticationException(
                f"Failed to load keys from {key_filename}"
            )

    if pkey is None:
        raise AuthenticationException("No private key provided")

    if not await self._transport.auth_publickey(username, pkey):
        raise AuthenticationException("Public key authentication failed")
close() async

Close SSH connection and cleanup resources.

Source code in spindlex/client/async_ssh_client.py
async def close(self) -> None:
    """Close SSH connection and cleanup resources."""
    if self._transport:
        await self._transport.close()
        self._transport = None

    self._connected = False
    self._hostname = None
    self._port = 22
    self._username = None
close_port_forward(tunnel_id) async

Close port forwarding tunnel asynchronously.

Parameters:

Name Type Description Default
tunnel_id str

Tunnel identifier

required
Source code in spindlex/client/async_ssh_client.py
async def close_port_forward(self, tunnel_id: str) -> None:
    """
    Close port forwarding tunnel asynchronously.

    Args:
        tunnel_id: Tunnel identifier
    """
    if self._transport:
        manager = self._transport.get_port_forwarding_manager()
        await manager.close_tunnel(tunnel_id)
connect(hostname, port=22, username=None, password=None, pkey=None, key_filename=None, key_password=None, timeout=None, compress=False, sock=None, gss_auth=False, gss_kex=False, gss_deleg_creds=True, gss_host=None, rekey_bytes_limit=None, rekey_time_limit=None) async

Connect to SSH server asynchronously with retry logic for transient errors.

Parameters:

Name Type Description Default
hostname str

Server hostname or IP address

required
port int

Server port (default: 22)

22
username str | None

Username for authentication

None
password str | None

Password for authentication

None
pkey Any | None

Private key for authentication

None
key_filename str | list[str] | None

Path to private key file(s)

None
key_password str | None

Password for encrypted private key file(s)

None
timeout float | None

Connection timeout in seconds

None
compress bool

Enable compression

False
sock Any | None

Optional existing socket or channel to use

None
rekey_bytes_limit int | None

Number of bytes before rekeying (default: 1GB)

None
rekey_time_limit int | None

Seconds before rekeying (default: 1 hour)

None
gss_auth bool

Use GSSAPI authentication

False
gss_kex bool

Use GSSAPI key exchange

False
gss_deleg_creds bool

Delegate GSSAPI credentials

True
gss_host str | None

GSSAPI hostname override

None

Raises:

Type Description
SSHException

If connection fails

AuthenticationException

If authentication fails

Source code in spindlex/client/async_ssh_client.py
async def connect(
    self,
    hostname: str,
    port: int = 22,
    username: str | None = None,
    password: str | None = None,
    pkey: Any | None = None,
    key_filename: str | list[str] | None = None,
    key_password: str | None = None,
    timeout: float | None = None,
    compress: bool = False,
    sock: Any | None = None,
    gss_auth: bool = False,
    gss_kex: bool = False,
    gss_deleg_creds: bool = True,
    gss_host: str | None = None,
    rekey_bytes_limit: int | None = None,
    rekey_time_limit: int | None = None,
) -> None:
    """
    Connect to SSH server asynchronously with retry logic for transient errors.

    Args:
        hostname: Server hostname or IP address
        port: Server port (default: 22)
        username: Username for authentication
        password: Password for authentication
        pkey: Private key for authentication
        key_filename: Path to private key file(s)
        key_password: Password for encrypted private key file(s)
        timeout: Connection timeout in seconds
        compress: Enable compression
        sock: Optional existing socket or channel to use
        rekey_bytes_limit: Number of bytes before rekeying (default: 1GB)
        rekey_time_limit: Seconds before rekeying (default: 1 hour)
        gss_auth: Use GSSAPI authentication
        gss_kex: Use GSSAPI key exchange
        gss_deleg_creds: Delegate GSSAPI credentials
        gss_host: GSSAPI hostname override

    Raises:
        SSHException: If connection fails
        AuthenticationException: If authentication fails
    """
    if self._connected:
        raise SSHException("Already connected")

    # Validate port
    if not (0 < port <= 65535):
        raise SSHException(f"Invalid port number: {port}")

    max_retries = 3

    try:
        for attempt in range(max_retries):
            try:
                if sock is None:
                    # Create socket connection
                    current_sock, reader, writer = await self._create_connection(
                        hostname, port, timeout
                    )
                else:
                    # If sock is provided, we need to wrap it if it's a raw socket
                    # In asyncio, we usually need reader/writer.
                    # If it's a SpindleX Channel, it might need special handling.
                    current_sock = sock
                    if hasattr(
                        current_sock, "makefile"
                    ):  # Likely a socket-like object
                        reader, writer = await asyncio.open_connection(
                            sock=current_sock
                        )
                    else:
                        # Assume it's already a pair or handled by transport
                        reader, writer = None, None

                # Create async transport
                self._transport = AsyncTransport(
                    current_sock,
                    rekey_bytes_limit=rekey_bytes_limit,
                    rekey_time_limit=rekey_time_limit,
                )

                # Use connect_existing helper to set reader/writer safely
                if reader and writer:
                    await self._transport.connect_existing(reader, writer)

                # Start client transport
                await self._transport.start_client(timeout)

                # Store connection info before host key verification so hostname is available
                self._hostname = hostname
                self._port = port
                self._username = username
                self._connected = True

                # Verify host key
                self._verify_host_key()

                # Perform authentication if credentials provided
                if username:
                    await self._authenticate(
                        username,
                        password=password,
                        pkey=pkey,
                        key_filename=key_filename,
                        key_password=key_password,
                        gss_auth=gss_auth,
                        gss_host=gss_host,
                        gss_deleg_creds=gss_deleg_creds,
                    )

                return  # Success

            except (
                SSHException,
                ConnectionResetError,
                ConnectionRefusedError,
                OSError,
            ) as e:
                # Cleanup failed attempt
                if self._transport:
                    await self._transport.close()
                    self._transport = None

                # Only retry on transient connection-level errors.
                # Avoid retrying on auth failure or host key mismatch.
                is_transient = isinstance(
                    e, (ConnectionResetError, ConnectionRefusedError, socket.error)
                ) or (
                    isinstance(e, SSHException)
                    and any(
                        kw in str(e)
                        for kw in (
                            "Connection reset",
                            "Connection failed",
                            "Client start failed",
                            "Connection closed",
                        )
                    )
                )

                if not is_transient or attempt == max_retries - 1:
                    if isinstance(
                        e,
                        (
                            SSHException,
                            AuthenticationException,
                            BadHostKeyException,
                        ),
                    ):
                        raise
                    raise SSHException(f"Connection failed: {e}") from e

                # Exponential backoff with jitter: 0.2s, 0.4s, 0.8s...
                import random

                wait = (0.2 * (2**attempt)) + (random.random() * 0.1)  # noqa: S311 # nosec  # fmt: skip
                self._logger.debug(
                    f"Connection attempt {attempt + 1} failed: {e}. Retrying in {wait:.2f}s..."
                )
                await asyncio.sleep(wait)
    except Exception as e:
        if self._transport:
            await self._transport.close()
            self._transport = None

        if isinstance(
            e, (SSHException, AuthenticationException, BadHostKeyException)
        ):
            raise
        raise SSHException(f"Connection failed: {e}") from e
create_local_port_forward(local_port, remote_host, remote_port, local_host='127.0.0.1') async

Create local port forwarding tunnel asynchronously.

Parameters:

Name Type Description Default
local_port int

Local port to listen on

required
remote_host str

Remote host to connect to

required
remote_port int

Remote port to connect to

required
local_host str

Local interface to bind to

'127.0.0.1'

Returns:

Type Description
str

Tunnel ID for management

Source code in spindlex/client/async_ssh_client.py
async def create_local_port_forward(
    self,
    local_port: int,
    remote_host: str,
    remote_port: int,
    local_host: str = "127.0.0.1",
) -> str:
    """
    Create local port forwarding tunnel asynchronously.

    Args:
        local_port: Local port to listen on
        remote_host: Remote host to connect to
        remote_port: Remote port to connect to
        local_host: Local interface to bind to

    Returns:
        Tunnel ID for management
    """
    if not self._connected or not self._transport:
        raise SSHException("Not connected")

    manager = self._transport.get_port_forwarding_manager()
    return await manager.create_local_tunnel(
        local_port, remote_host, remote_port, local_host
    )
create_remote_port_forward(remote_port, local_host, local_port, remote_host='') async

Create remote port forwarding tunnel asynchronously.

Parameters:

Name Type Description Default
remote_port int

Remote port to listen on

required
local_host str

Local host to connect to

required
local_port int

Local port to connect to

required
remote_host str

Remote interface to bind to

''

Returns:

Type Description
str

Tunnel ID for management

Source code in spindlex/client/async_ssh_client.py
async def create_remote_port_forward(
    self,
    remote_port: int,
    local_host: str,
    local_port: int,
    remote_host: str = "",
) -> str:
    """
    Create remote port forwarding tunnel asynchronously.

    Args:
        remote_port: Remote port to listen on
        local_host: Local host to connect to
        local_port: Local port to connect to
        remote_host: Remote interface to bind to

    Returns:
        Tunnel ID for management
    """
    if not self._connected or not self._transport:
        raise SSHException("Not connected")

    manager = self._transport.get_port_forwarding_manager()
    return await manager.create_remote_tunnel(
        remote_port, local_host, local_port, remote_host
    )
exec_command(command, bufsize=-1, timeout=None) async

Execute command on remote server asynchronously.

Parameters:

Name Type Description Default
command str

Command to execute

required
bufsize int

Buffer size for streams

-1
timeout float | None

Command timeout in seconds

None

Returns:

Type Description
tuple[Any, Any, Any]

Tuple of (stdin, stdout, stderr) streams

Raises:

Type Description
SSHException

If command execution fails

Source code in spindlex/client/async_ssh_client.py
async def exec_command(
    self, command: str, bufsize: int = -1, timeout: float | None = None
) -> tuple[Any, Any, Any]:
    """
    Execute command on remote server asynchronously.

    Args:
        command: Command to execute
        bufsize: Buffer size for streams
        timeout: Command timeout in seconds

    Returns:
        Tuple of (stdin, stdout, stderr) streams

    Raises:
        SSHException: If command execution fails
    """
    if not self._connected or not self._transport:
        raise SSHException("Not connected")

    try:
        # Open channel
        channel = await self._transport.open_channel("session")

        # Execute command
        await channel.exec_command(command)

        # Return channel file objects
        return (
            channel.makefile("wb", bufsize),
            channel.makefile("rb", bufsize),
            channel.makefile_stderr("rb", bufsize),
        )

    except Exception as e:
        if isinstance(e, SSHException):
            raise
        raise SSHException(f"Command execution failed: {e}") from e
get_host_key_storage()

Get host key storage instance.

Returns:

Type Description
HostKeyStorage

Current host key storage

Source code in spindlex/client/async_ssh_client.py
def get_host_key_storage(self) -> HostKeyStorage:
    """
    Get host key storage instance.

    Returns:
        Current host key storage
    """
    return self._host_key_storage
get_host_keys()

Alias for get_host_key_storage().

Returns:

Type Description
HostKeyStorage

Current host key storage

Source code in spindlex/client/async_ssh_client.py
def get_host_keys(self) -> HostKeyStorage:
    """
    Alias for get_host_key_storage().

    Returns:
        Current host key storage
    """
    return self.get_host_key_storage()
get_port_forwards()

Get all active port forwarding tunnels.

Returns:

Type Description
dict[str, Any]

Dictionary mapping tunnel IDs to tunnel objects

Source code in spindlex/client/async_ssh_client.py
def get_port_forwards(self) -> dict[str, Any]:
    """
    Get all active port forwarding tunnels.

    Returns:
        Dictionary mapping tunnel IDs to tunnel objects
    """
    if self._transport:
        manager = self._transport.get_port_forwarding_manager()
        return manager.get_all_tunnels()
    return {}
invoke_shell() async

Start interactive shell asynchronously.

Returns:

Type Description
Any

Channel for shell interaction

Raises:

Type Description
SSHException

If shell invocation fails

Source code in spindlex/client/async_ssh_client.py
async def invoke_shell(self) -> Any:
    """
    Start interactive shell asynchronously.

    Returns:
        Channel for shell interaction

    Raises:
        SSHException: If shell invocation fails
    """
    if not self._connected or not self._transport:
        raise SSHException("Not connected")

    try:
        # Open channel
        channel = await self._transport.open_channel("session")

        # Invoke shell
        await channel.invoke_shell()

        return channel

    except Exception as e:
        if isinstance(e, SSHException):
            raise
        raise SSHException(f"Shell invocation failed: {e}") from e
load_host_keys(filename) async

Load host keys from a file.

Parameters:

Name Type Description Default
filename str

Path to known_hosts file

required
Source code in spindlex/client/async_ssh_client.py
async def load_host_keys(self, filename: str) -> None:
    """
    Load host keys from a file.

    Args:
        filename: Path to known_hosts file
    """
    if not self._host_key_storage:
        self._host_key_storage = HostKeyStorage(filename)
    else:
        await asyncio.to_thread(self._host_key_storage.load, filename)
load_system_host_keys() async

Load host keys from system default locations.

Source code in spindlex/client/async_ssh_client.py
async def load_system_host_keys(self) -> None:
    """
    Load host keys from system default locations.
    """
    # Common locations
    import os

    paths = [
        os.path.expanduser("~/.ssh/known_hosts"),
        os.path.expanduser("~/.ssh/known_hosts2"),
        "/etc/ssh/ssh_known_hosts",
        "/etc/ssh/ssh_known_hosts2",
    ]
    for path in paths:
        if await asyncio.to_thread(os.path.exists, path):
            await self.load_host_keys(path)
open_sftp() async

Open SFTP client asynchronously.

Returns:

Type Description
AsyncSFTPClient

Async SFTP client instance

Raises:

Type Description
SSHException

If SFTP open fails

Source code in spindlex/client/async_ssh_client.py
async def open_sftp(self) -> AsyncSFTPClient:
    """
    Open SFTP client asynchronously.

    Returns:
        Async SFTP client instance

    Raises:
        SSHException: If SFTP open fails
    """
    if not self._connected or not self._transport:
        raise SSHException("Not connected")

    try:
        # Open SFTP subsystem channel
        channel = await self._transport.open_channel("session")
        await channel.invoke_subsystem("sftp")

        # Create async SFTP client
        sftp_client = AsyncSFTPClient(channel)
        await sftp_client._initialize()

        return sftp_client

    except Exception as e:
        if isinstance(e, SSHException):
            raise
        raise SSHException(f"SFTP open failed: {e}") from e
save_host_keys(filename) async

Save host keys to a file.

Parameters:

Name Type Description Default
filename str

Path to save known_hosts

required
Source code in spindlex/client/async_ssh_client.py
async def save_host_keys(self, filename: str) -> None:
    """
    Save host keys to a file.

    Args:
        filename: Path to save known_hosts
    """
    if not self._host_key_storage or self._host_key_storage._filename != filename:
        old_storage = self._host_key_storage
        self._host_key_storage = HostKeyStorage(filename)
        if old_storage:
            self._host_key_storage._keys = old_storage._keys

    # save() is sync, run in thread
    await asyncio.to_thread(self._host_key_storage.save)
set_host_key_storage(storage)

Set host key storage instance.

Parameters:

Name Type Description Default
storage HostKeyStorage

Host key storage to use

required
Source code in spindlex/client/async_ssh_client.py
def set_host_key_storage(self, storage: HostKeyStorage) -> None:
    """
    Set host key storage instance.

    Args:
        storage: Host key storage to use
    """
    self._host_key_storage = storage
set_missing_host_key_policy(policy)

Set policy for handling unknown host keys.

Parameters:

Name Type Description Default
policy MissingHostKeyPolicy

Host key policy instance

required
Source code in spindlex/client/async_ssh_client.py
def set_missing_host_key_policy(self, policy: MissingHostKeyPolicy) -> None:
    """
    Set policy for handling unknown host keys.

    Args:
        policy: Host key policy instance
    """
    self._host_key_policy = policy

SFTP Client

spindlex.client.sftp_client

SFTP Client Implementation

Provides SFTP (SSH File Transfer Protocol) client functionality for secure file operations over SSH connections.

Classes

SFTPClient

SFTP client for secure file operations.

Implements SFTP protocol for file transfer, directory operations, and file attribute management over SSH connections.

Source code in spindlex/client/sftp_client.py
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
class SFTPClient:
    """
    SFTP client for secure file operations.

    Implements SFTP protocol for file transfer, directory operations,
    and file attribute management over SSH connections.
    """

    def __init__(self, transport: Transport) -> None:
        """
        Initialize SFTP client with SSH transport.

        Args:
            transport: SSH transport instance

        Raises:
            SFTPError: If SFTP initialization fails
        """
        self._transport = transport
        self._channel: Optional[Channel] = None
        self._request_id = 0
        self._request_lock = threading.Lock()
        self._logger = logging.getLogger(__name__)
        self._server_version: Optional[int] = None
        self._server_extensions: dict[str, str] = {}
        self._pending_responses: dict[int, SFTPMessage] = {}
        self._max_write_len: int = _DEFAULT_MAX_WRITE

        # Initialize SFTP session
        self._initialize_sftp()

    def _initialize_sftp(self) -> None:
        """
        Initialize SFTP subsystem and perform version negotiation.

        Raises:
            SFTPError: If SFTP initialization fails
        """
        try:
            # Open channel for SFTP subsystem
            self._channel = self._transport.open_channel("session")
            if not self._channel:
                raise SFTPError("Failed to open channel for SFTP")

            # Prevent recv() from spinning forever when no response arrives
            self._channel.settimeout(30.0)

            # Request SFTP subsystem
            self._channel.invoke_subsystem(SFTP_SUBSYSTEM)

            # Send SFTP init message
            init_msg = SFTPInitMessage(SFTP_VERSION)
            self._send_message(init_msg)

            # Wait for version response
            response = self._receive_message()
            if not isinstance(response, SFTPVersionMessage):
                raise SFTPError("Expected SFTP version message")

            self._server_version = response.version
            self._server_extensions = response.extensions

            if self._server_version < SFTP_VERSION:
                self._logger.warning(
                    f"Server SFTP version {self._server_version} < {SFTP_VERSION}"
                )

            # Query server limits via limits@openssh.com extension.
            # This allows much larger write chunks (e.g. 255 KB) instead of
            # the protocol default of 64 KB, cutting round trips for large uploads.
            if self._server_extensions.get("limits@openssh.com") == "1":
                self._query_limits()

            self._logger.debug(
                f"SFTP initialized, server version: {self._server_version}, "
                f"max_write_len: {self._max_write_len}"
            )

        except Exception as e:
            if self._channel:
                self._channel.close()
                self._channel = None
            if isinstance(e, SFTPError):
                raise
            raise SFTPError(f"SFTP initialization failed: {e}") from e

    def _query_limits(self) -> None:
        """Query server limits via limits@openssh.com and update _max_write_len."""
        try:
            request_id = self._get_next_request_id()
            self._send_message(SFTPExtendedMessage(request_id, "limits@openssh.com"))
            response = self._receive_message_for_id(request_id, timeout=10.0)
            if isinstance(response, SFTPExtendedReplyMessage):
                data = response.extended_data
                if len(data) >= 32:
                    _max_pkt, _max_read, max_write, _max_handles = struct.unpack(
                        ">QQQQ", data[:32]
                    )
                    if max_write > 0:
                        self._max_write_len = int(max_write)
        except Exception:  # nosec B110
            pass  # non-fatal: fall back to _DEFAULT_MAX_WRITE

    def _get_next_request_id(self) -> int:
        """Get next request ID for SFTP messages."""
        with self._request_lock:
            self._request_id += 1
            return self._request_id

    def _send_message(self, message: SFTPMessage) -> None:
        """
        Send SFTP message over channel.

        Args:
            message: SFTP message to send

        Raises:
            SFTPError: If message sending fails
        """
        if not self._channel:
            raise SFTPError("SFTP channel not available")

        try:
            data = message.pack()
            self._channel.sendall(data)
        except Exception as e:
            raise SFTPError(f"Failed to send SFTP message: {e}") from e

    def _receive_message(self) -> SFTPMessage:
        """
        Receive SFTP message from channel.

        Returns:
            Received SFTP message

        Raises:
            SFTPError: If message receiving fails
        """
        if not self._channel:
            raise SFTPError("SFTP channel not available")

        try:
            # Read message length first (4 bytes)
            length_data = self._channel.recv_exactly(4)
            msg_length = int.from_bytes(length_data, "big")

            # Read message content (msg_length bytes)
            # The payload does NOT include the length itself in SFTP packets
            # but SFTPMessage.unpack expects the length-prefixed data or just the payload?
            # Let's check SFTPMessage.unpack.
            payload = self._channel.recv_exactly(msg_length)
            msg_data = length_data + payload

            return SFTPMessage.unpack(msg_data)
        except Exception as e:
            raise SFTPError(f"Failed to receive SFTP message: {e}") from e

    def _receive_message_for_id(
        self, target_id: int, timeout: float = 60.0
    ) -> SFTPMessage:
        """
        Receive the SFTP response matching target_id, buffering any others.

        Enables pipelining by allowing out-of-order response collection.
        """
        if target_id in self._pending_responses:
            return self._pending_responses.pop(target_id)
        deadline = time.monotonic() + timeout
        while True:
            if time.monotonic() > deadline:
                raise SFTPError(f"Timeout waiting for response to request {target_id}")
            msg = self._receive_message()
            if msg.request_id == target_id:
                return msg
            if msg.request_id is not None:
                self._pending_responses[msg.request_id] = msg

    def _send_request_and_wait_response(self, request: SFTPMessage) -> SFTPMessage:
        """
        Send SFTP request and wait for response.

        Args:
            request: SFTP request message

        Returns:
            SFTP response message

        Raises:
            SFTPError: If request fails or response indicates error
        """
        self._send_message(request)
        if request.request_id is None:
            raise SFTPError("Request has no ID assigned")
        return self._receive_message_for_id(request.request_id)

    def get(self, remotepath: str, localpath: str) -> None:
        """
        Download file from remote server.

        Args:
            remotepath: Path to remote file
            localpath: Path for local file

        Raises:
            SFTPError: If file download fails
        """
        try:
            # Open remote file for reading
            request_id = self._get_next_request_id()
            attrs = SFTPAttributes()
            open_msg = SFTPOpenMessage(request_id, remotepath, SSH_FXF_READ, attrs)

            response = self._send_request_and_wait_response(open_msg)
            if isinstance(response, SFTPStatusMessage):
                if response.status_code != SSH_FX_OK:
                    raise SFTPError.from_status(response.status_code, response.message)
                else:
                    raise SFTPError("Unexpected status response to open request")
            elif not isinstance(response, SFTPHandleMessage):
                raise SFTPError("Expected handle message for file open")

            handle = response.handle

            try:
                # Open local file for writing
                _CHUNK = 32768
                _DEPTH = 32
                with open(localpath, "wb") as local_file:
                    offset = 0
                    in_flight: list[int] = []
                    eof = False

                    while not eof or in_flight:
                        # Issue read requests to fill the pipeline
                        while not eof and len(in_flight) < _DEPTH:
                            request_id = self._get_next_request_id()
                            read_msg = SFTPReadMessage(
                                request_id, handle, offset, _CHUNK
                            )
                            self._send_message(read_msg)
                            in_flight.append(request_id)
                            offset += _CHUNK

                        if not in_flight:
                            break

                        # Collect the next in-order response
                        rid = in_flight.pop(0)
                        response = self._receive_message_for_id(rid)

                        if isinstance(response, SFTPDataMessage):
                            local_file.write(response.data)
                        elif isinstance(response, SFTPStatusMessage):
                            if response.status_code == SSH_FX_EOF:
                                eof = True
                            else:
                                raise SFTPError.from_status(
                                    response.status_code, response.message
                                )
                        else:
                            raise SFTPError("Unexpected response to read request")

            finally:
                # Close remote file
                request_id = self._get_next_request_id()
                close_msg = SFTPCloseMessage(request_id, handle)
                self._send_request_and_wait_response(close_msg)

        except Exception as e:
            if isinstance(e, SFTPError):
                raise
            raise SFTPError(f"File download failed: {e}", filename=remotepath) from e

    def put(self, localpath: str, remotepath: str) -> None:
        """
        Upload file to remote server.

        Args:
            localpath: Path to local file
            remotepath: Path for remote file

        Raises:
            SFTPError: If file upload fails
        """
        try:
            # Get local file size
            file_size = os.path.getsize(localpath)

            # Create attributes with file size
            attrs = SFTPAttributes()
            attrs.flags = SSH_FILEXFER_ATTR_SIZE
            attrs.size = file_size

            # Open remote file for writing
            request_id = self._get_next_request_id()
            pflags = SSH_FXF_WRITE | SSH_FXF_CREAT | SSH_FXF_TRUNC
            open_msg = SFTPOpenMessage(request_id, remotepath, pflags, attrs)

            response = self._send_request_and_wait_response(open_msg)
            if isinstance(response, SFTPStatusMessage):
                if response.status_code != SSH_FX_OK:
                    raise SFTPError.from_status(response.status_code, response.message)
                else:
                    raise SFTPError("Unexpected status response to open request")
            elif not isinstance(response, SFTPHandleMessage):
                raise SFTPError("Expected handle message for file open")

            handle = response.handle

            try:
                _CHUNK = self._max_write_len
                _DEPTH = 32
                with open(localpath, "rb") as local_file:
                    offset = 0
                    in_flight: list[int] = []

                    while True:
                        # Fill the pipeline with write requests
                        while len(in_flight) < _DEPTH:
                            chunk = local_file.read(_CHUNK)
                            if not chunk:
                                break
                            request_id = self._get_next_request_id()
                            write_msg = SFTPWriteMessage(
                                request_id, handle, offset, chunk
                            )
                            self._send_message(write_msg)
                            in_flight.append(request_id)
                            offset += len(chunk)

                        if not in_flight:
                            break

                        # Collect one ACK before refilling
                        rid = in_flight.pop(0)
                        response = self._receive_message_for_id(rid)
                        if isinstance(response, SFTPStatusMessage):
                            if response.status_code != SSH_FX_OK:
                                raise SFTPError.from_status(
                                    response.status_code, response.message
                                )
                        else:
                            raise SFTPError("Unexpected response to write request")

            finally:
                # Close remote file
                request_id = self._get_next_request_id()
                close_msg = SFTPCloseMessage(request_id, handle)
                self._send_request_and_wait_response(close_msg)

        except Exception as e:
            if isinstance(e, SFTPError):
                raise
            raise SFTPError(f"File upload failed: {e}", filename=localpath)

    def get_recursive(self, remotepath: str, localpath: str) -> None:
        """
        Download directory recursively.

        Args:
            remotepath: Remote directory path
            localpath: Local destination path
        """
        import stat

        attrs = self.stat(remotepath)
        if not stat.S_ISDIR(attrs.st_mode or 0):
            self.get(remotepath, localpath)
            return

        if not os.path.exists(localpath):
            os.makedirs(localpath)

        for item in self.listdir(remotepath):
            # SFTP paths always use forward slash
            remote_item = (
                f"{remotepath}/{item}"
                if not remotepath.endswith("/")
                else f"{remotepath}{item}"
            )
            local_item = os.path.join(localpath, item)
            self.get_recursive(remote_item, local_item)

    def put_recursive(self, localpath: str, remotepath: str) -> None:
        """
        Upload directory recursively.

        Args:
            localpath: Local directory path
            remotepath: Remote destination path
        """
        if not os.path.isdir(localpath):
            self.put(localpath, remotepath)
            return

        try:
            self.mkdir(remotepath)
        except SFTPError as e:
            if e.sftp_code != SFTPError.SSH_FX_FAILURE:
                raise

        for item in os.listdir(localpath):
            local_item = os.path.join(localpath, item)
            remote_item = (
                f"{remotepath}/{item}"
                if not remotepath.endswith("/")
                else f"{remotepath}{item}"
            )
            self.put_recursive(local_item, remote_item)

    def listdir(self, path: str = ".") -> list[str]:
        """
        List directory contents.

        Args:
            path: Directory path to list

        Returns:
            List of filenames in directory

        Raises:
            SFTPError: If directory listing fails
        """
        try:
            # Open directory
            request_id = self._get_next_request_id()
            from ..protocol.sftp_messages import SFTPOpenDirMessage

            opendir_msg = SFTPOpenDirMessage(request_id, path)

            response = self._send_request_and_wait_response(opendir_msg)
            if isinstance(response, SFTPStatusMessage):
                if response.status_code != SSH_FX_OK:
                    raise SFTPError.from_status(response.status_code, response.message)
                else:
                    raise SFTPError("Unexpected status response to opendir request")
            elif not isinstance(response, SFTPHandleMessage):
                raise SFTPError("Expected handle message for directory open")

            handle = response.handle
            filenames = []

            try:
                while True:
                    # Read directory entries
                    request_id = self._get_next_request_id()
                    from ..protocol.sftp_messages import SFTPReadDirMessage

                    readdir_msg = SFTPReadDirMessage(request_id, handle)

                    response = self._send_request_and_wait_response(readdir_msg)

                    if isinstance(response, SFTPStatusMessage):
                        if response.status_code == SSH_FX_EOF:
                            break  # End of directory reached
                        else:
                            raise SFTPError.from_status(
                                response.status_code, response.message
                            )
                    else:
                        from ..protocol.sftp_messages import SFTPNameMessage

                        if isinstance(response, SFTPNameMessage):
                            for filename, _longname, _attrs in response.names:
                                # Skip . and .. entries
                                if filename not in (".", ".."):
                                    filenames.append(filename)
                        else:
                            raise SFTPError("Unexpected response to readdir request")

            finally:
                # Close directory handle
                request_id = self._get_next_request_id()
                close_msg = SFTPCloseMessage(request_id, handle)
                response = self._send_request_and_wait_response(close_msg)
                if isinstance(response, SFTPStatusMessage):
                    if response.status_code != SSH_FX_OK:
                        raise SFTPError.from_status(
                            response.status_code, response.message
                        )

            return filenames

        except Exception as e:
            if isinstance(e, SFTPError):
                raise
            raise SFTPError(f"Directory listing failed: {e}", filename=path)

    def stat(self, path: str) -> SFTPAttributes:
        """
        Get file/directory attributes.

        Args:
            path: Path to file or directory

        Returns:
            SFTPAttributes object with file information

        Raises:
            SFTPError: If stat operation fails
        """
        try:
            request_id = self._get_next_request_id()
            from ..protocol.sftp_messages import SFTPStatMessage

            stat_msg = SFTPStatMessage(request_id, path)

            response = self._send_request_and_wait_response(stat_msg)

            if isinstance(response, SFTPStatusMessage):
                if response.status_code != SSH_FX_OK:
                    raise SFTPError.from_status(response.status_code, response.message)
                else:
                    raise SFTPError("Unexpected status response to stat request")
            else:
                from ..protocol.sftp_messages import SFTPAttrsMessage

                if isinstance(response, SFTPAttrsMessage):
                    return response.attrs
                else:
                    raise SFTPError("Unexpected response to stat request")

        except Exception as e:
            if isinstance(e, SFTPError):
                raise
            raise SFTPError(f"Stat operation failed: {e}", filename=path)

    def lstat(self, path: str) -> SFTPAttributes:
        """
        Get file/directory attributes (don't follow symlinks).

        Args:
            path: Path to file or directory

        Returns:
            SFTPAttributes object with file information

        Raises:
            SFTPError: If lstat operation fails
        """
        try:
            request_id = self._get_next_request_id()
            from ..protocol.sftp_messages import SFTPLStatMessage

            lstat_msg = SFTPLStatMessage(request_id, path)

            response = self._send_request_and_wait_response(lstat_msg)

            if isinstance(response, SFTPStatusMessage):
                if response.status_code != SSH_FX_OK:
                    raise SFTPError.from_status(response.status_code, response.message)
                else:
                    raise SFTPError("Unexpected status response to lstat request")
            else:
                from ..protocol.sftp_messages import SFTPAttrsMessage

                if isinstance(response, SFTPAttrsMessage):
                    return response.attrs
                else:
                    raise SFTPError("Unexpected response to lstat request")

        except Exception as e:
            if isinstance(e, SFTPError):
                raise
            raise SFTPError(f"Lstat operation failed: {e}", filename=path)

    def chmod(self, path: str, mode: int) -> None:
        """
        Change file permissions.

        Args:
            path: Path to file
            mode: New permission mode

        Raises:
            SFTPError: If chmod operation fails
        """
        try:
            # Create attributes with new permissions
            attrs = SFTPAttributes()
            attrs.flags = SSH_FILEXFER_ATTR_PERMISSIONS
            attrs.permissions = mode

            request_id = self._get_next_request_id()
            from ..protocol.sftp_messages import SFTPSetStatMessage

            setstat_msg = SFTPSetStatMessage(request_id, path, attrs)

            response = self._send_request_and_wait_response(setstat_msg)

            if isinstance(response, SFTPStatusMessage):
                if response.status_code != SSH_FX_OK:
                    raise SFTPError.from_status(response.status_code, response.message)
            else:
                raise SFTPError("Unexpected response to setstat request")

        except Exception as e:
            if isinstance(e, SFTPError):
                raise
            raise SFTPError(f"Chmod operation failed: {e}", filename=path)

    def mkdir(self, path: str, mode: int = 0o777) -> None:
        """
        Create directory.

        Args:
            path: Directory path to create
            mode: Directory permissions

        Raises:
            SFTPError: If directory creation fails
        """
        try:
            # Create attributes with permissions
            attrs = SFTPAttributes()
            attrs.flags = SSH_FILEXFER_ATTR_PERMISSIONS
            attrs.permissions = mode

            request_id = self._get_next_request_id()
            from ..protocol.sftp_messages import SFTPMkdirMessage

            mkdir_msg = SFTPMkdirMessage(request_id, path, attrs)

            response = self._send_request_and_wait_response(mkdir_msg)

            if isinstance(response, SFTPStatusMessage):
                if response.status_code != SSH_FX_OK:
                    raise SFTPError.from_status(response.status_code, response.message)
            else:
                raise SFTPError("Unexpected response to mkdir request")

        except Exception as e:
            if isinstance(e, SFTPError):
                raise
            raise SFTPError(f"Directory creation failed: {e}", filename=path)

    def rmdir(self, path: str) -> None:
        """
        Remove directory.

        Args:
            path: Directory path to remove

        Raises:
            SFTPError: If directory removal fails
        """
        try:
            request_id = self._get_next_request_id()
            from ..protocol.sftp_messages import SFTPRmdirMessage

            rmdir_msg = SFTPRmdirMessage(request_id, path)

            response = self._send_request_and_wait_response(rmdir_msg)

            if isinstance(response, SFTPStatusMessage):
                if response.status_code != SSH_FX_OK:
                    raise SFTPError.from_status(response.status_code, response.message)
            else:
                raise SFTPError("Unexpected response to rmdir request")

        except Exception as e:
            if isinstance(e, SFTPError):
                raise
            raise SFTPError(f"Directory removal failed: {e}", filename=path)

    def remove(self, path: str) -> None:
        """
        Remove file.

        Args:
            path: File path to remove

        Raises:
            SFTPError: If file removal fails
        """
        try:
            request_id = self._get_next_request_id()
            from ..protocol.sftp_messages import SFTPRemoveMessage

            remove_msg = SFTPRemoveMessage(request_id, path)

            response = self._send_request_and_wait_response(remove_msg)

            if isinstance(response, SFTPStatusMessage):
                if response.status_code != SSH_FX_OK:
                    raise SFTPError.from_status(response.status_code, response.message)
            else:
                raise SFTPError("Unexpected response to remove request")

        except Exception as e:
            if isinstance(e, SFTPError):
                raise
            raise SFTPError(f"File removal failed: {e}", filename=path)

    def rename(self, oldpath: str, newpath: str) -> None:
        """
        Rename file or directory.

        Args:
            oldpath: Current path
            newpath: New path

        Raises:
            SFTPError: If rename operation fails
        """
        try:
            request_id = self._get_next_request_id()
            from ..protocol.sftp_messages import SFTPRenameMessage

            rename_msg = SFTPRenameMessage(request_id, oldpath, newpath)

            response = self._send_request_and_wait_response(rename_msg)

            if isinstance(response, SFTPStatusMessage):
                if response.status_code != SSH_FX_OK:
                    raise SFTPError.from_status(response.status_code, response.message)
            else:
                raise SFTPError("Unexpected response to rename request")

        except Exception as e:
            if isinstance(e, SFTPError):
                raise
            raise SFTPError(f"Rename operation failed: {e}") from e

    def getcwd(self) -> str:
        """
        Get current working directory.

        Returns:
            Current working directory path

        Raises:
            SFTPError: If operation fails
        """
        try:
            request_id = self._get_next_request_id()
            from ..protocol.sftp_messages import SFTPRealPathMessage

            realpath_msg = SFTPRealPathMessage(request_id, ".")

            response = self._send_request_and_wait_response(realpath_msg)

            if isinstance(response, SFTPStatusMessage):
                if response.status_code != SSH_FX_OK:
                    raise SFTPError.from_status(response.status_code, response.message)
                else:
                    raise SFTPError("Unexpected status response to realpath request")
            else:
                from ..protocol.sftp_messages import SFTPNameMessage

                if isinstance(response, SFTPNameMessage):
                    if response.names:
                        return response.names[0][0]  # First filename in response
                    else:
                        raise SFTPError("Empty response to realpath request")
                else:
                    raise SFTPError("Unexpected response to realpath request")

        except Exception as e:
            if isinstance(e, SFTPError):
                raise
            raise SFTPError(f"Get current directory failed: {e}") from e

    def normalize(self, path: str) -> str:
        """
        Normalize path (resolve . and .. components).

        Args:
            path: Path to normalize

        Returns:
            Normalized path

        Raises:
            SFTPError: If operation fails
        """
        try:
            request_id = self._get_next_request_id()
            from ..protocol.sftp_messages import SFTPRealPathMessage

            realpath_msg = SFTPRealPathMessage(request_id, path)

            response = self._send_request_and_wait_response(realpath_msg)

            if isinstance(response, SFTPStatusMessage):
                if response.status_code != SSH_FX_OK:
                    raise SFTPError.from_status(response.status_code, response.message)
                else:
                    raise SFTPError("Unexpected status response to realpath request")
            else:
                from ..protocol.sftp_messages import SFTPNameMessage

                if isinstance(response, SFTPNameMessage):
                    if response.names:
                        return response.names[0][0]  # First filename in response
                    else:
                        raise SFTPError("Empty response to realpath request")
                else:
                    raise SFTPError("Unexpected response to realpath request")

        except Exception as e:
            if isinstance(e, SFTPError):
                raise
            raise SFTPError(f"Path normalization failed: {e}", filename=path) from e

    def symlink(self, targetpath: str, linkpath: str) -> None:
        """
        Create symbolic link.

        Args:
            targetpath: Target path for the link
            linkpath: Path where link should be created

        Raises:
            SFTPError: If symlink creation fails
        """
        try:
            request_id = self._get_next_request_id()
            from ..protocol.sftp_messages import SFTPSymlinkMessage

            symlink_msg = SFTPSymlinkMessage(request_id, targetpath, linkpath)

            response = self._send_request_and_wait_response(symlink_msg)

            if isinstance(response, SFTPStatusMessage):
                if response.status_code != SSH_FX_OK:
                    raise SFTPError.from_status(response.status_code, response.message)
            else:
                raise SFTPError("Unexpected response to symlink request")

        except Exception as e:
            if isinstance(e, SFTPError):
                raise
            raise SFTPError(f"Symlink creation failed: {e}") from e

    def readlink(self, path: str) -> str:
        """
        Read symbolic link.

        Args:
            path: Path to symbolic link

        Returns:
            Target path of the link

        Raises:
            SFTPError: If operation fails
        """
        try:
            request_id = self._get_next_request_id()
            from ..protocol.sftp_messages import SFTPReadLinkMessage

            readlink_msg = SFTPReadLinkMessage(request_id, path)

            response = self._send_request_and_wait_response(readlink_msg)

            if isinstance(response, SFTPStatusMessage):
                if response.status_code != SSH_FX_OK:
                    raise SFTPError.from_status(response.status_code, response.message)
                else:
                    raise SFTPError("Unexpected status response to readlink request")
            else:
                from ..protocol.sftp_messages import SFTPNameMessage

                if isinstance(response, SFTPNameMessage):
                    if response.names:
                        return response.names[0][0]  # Target path
                    else:
                        raise SFTPError("Empty response to readlink request")
                else:
                    raise SFTPError("Unexpected response to readlink request")

        except Exception as e:
            if isinstance(e, SFTPError):
                raise
            raise SFTPError(f"Readlink failed: {e}", filename=path) from e

    def open(self, filename: str, mode: str = "r") -> "SFTPFile":
        """
        Open remote file.

        Args:
            filename: Remote file path
            mode: File open mode (r, w, a, rb, wb, ab)

        Returns:
            SFTPFile object

        Raises:
            SFTPError: If file open fails
        """
        try:
            flags = self._mode_to_flags(mode)
            attrs = SFTPAttributes()
            request_id = self._get_next_request_id()

            open_msg = SFTPOpenMessage(request_id, filename, flags, attrs)
            response = self._send_request_and_wait_response(open_msg)

            if isinstance(response, SFTPHandleMessage):
                return SFTPFile(self, response.handle, mode)
            elif isinstance(response, SFTPStatusMessage):
                raise SFTPError.from_status(response.status_code, response.message)
            else:
                raise SFTPError("Unexpected response to open request")

        except Exception as e:
            if isinstance(e, SFTPError):
                raise
            raise SFTPError(f"File open failed: {e}", filename=filename)

    def _mode_to_flags(self, mode: str) -> int:
        """Convert file mode string to SFTP flags."""
        flags = 0
        if "r" in mode:
            flags |= SSH_FXF_READ
        if "w" in mode:
            flags |= SSH_FXF_WRITE | SSH_FXF_CREAT | SSH_FXF_TRUNC
        if "a" in mode:
            flags |= SSH_FXF_WRITE | SSH_FXF_CREAT | SSH_FXF_APPEND
        return flags

    def close(self) -> None:
        """Close SFTP session and cleanup resources."""
        if self._channel:
            try:
                self._channel.close()
            except Exception as e:
                self._logger.warning(f"Error closing SFTP channel: {e}")
            finally:
                self._channel = None

    def __enter__(self) -> "SFTPClient":
        """Context manager entry."""
        return self

    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        """Context manager exit."""
        self.close()
Methods:
__enter__()

Context manager entry.

Source code in spindlex/client/sftp_client.py
def __enter__(self) -> "SFTPClient":
    """Context manager entry."""
    return self
__exit__(exc_type, exc_val, exc_tb)

Context manager exit.

Source code in spindlex/client/sftp_client.py
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
    """Context manager exit."""
    self.close()
__init__(transport)

Initialize SFTP client with SSH transport.

Parameters:

Name Type Description Default
transport Transport

SSH transport instance

required

Raises:

Type Description
SFTPError

If SFTP initialization fails

Source code in spindlex/client/sftp_client.py
def __init__(self, transport: Transport) -> None:
    """
    Initialize SFTP client with SSH transport.

    Args:
        transport: SSH transport instance

    Raises:
        SFTPError: If SFTP initialization fails
    """
    self._transport = transport
    self._channel: Optional[Channel] = None
    self._request_id = 0
    self._request_lock = threading.Lock()
    self._logger = logging.getLogger(__name__)
    self._server_version: Optional[int] = None
    self._server_extensions: dict[str, str] = {}
    self._pending_responses: dict[int, SFTPMessage] = {}
    self._max_write_len: int = _DEFAULT_MAX_WRITE

    # Initialize SFTP session
    self._initialize_sftp()
chmod(path, mode)

Change file permissions.

Parameters:

Name Type Description Default
path str

Path to file

required
mode int

New permission mode

required

Raises:

Type Description
SFTPError

If chmod operation fails

Source code in spindlex/client/sftp_client.py
def chmod(self, path: str, mode: int) -> None:
    """
    Change file permissions.

    Args:
        path: Path to file
        mode: New permission mode

    Raises:
        SFTPError: If chmod operation fails
    """
    try:
        # Create attributes with new permissions
        attrs = SFTPAttributes()
        attrs.flags = SSH_FILEXFER_ATTR_PERMISSIONS
        attrs.permissions = mode

        request_id = self._get_next_request_id()
        from ..protocol.sftp_messages import SFTPSetStatMessage

        setstat_msg = SFTPSetStatMessage(request_id, path, attrs)

        response = self._send_request_and_wait_response(setstat_msg)

        if isinstance(response, SFTPStatusMessage):
            if response.status_code != SSH_FX_OK:
                raise SFTPError.from_status(response.status_code, response.message)
        else:
            raise SFTPError("Unexpected response to setstat request")

    except Exception as e:
        if isinstance(e, SFTPError):
            raise
        raise SFTPError(f"Chmod operation failed: {e}", filename=path)
close()

Close SFTP session and cleanup resources.

Source code in spindlex/client/sftp_client.py
def close(self) -> None:
    """Close SFTP session and cleanup resources."""
    if self._channel:
        try:
            self._channel.close()
        except Exception as e:
            self._logger.warning(f"Error closing SFTP channel: {e}")
        finally:
            self._channel = None
get(remotepath, localpath)

Download file from remote server.

Parameters:

Name Type Description Default
remotepath str

Path to remote file

required
localpath str

Path for local file

required

Raises:

Type Description
SFTPError

If file download fails

Source code in spindlex/client/sftp_client.py
def get(self, remotepath: str, localpath: str) -> None:
    """
    Download file from remote server.

    Args:
        remotepath: Path to remote file
        localpath: Path for local file

    Raises:
        SFTPError: If file download fails
    """
    try:
        # Open remote file for reading
        request_id = self._get_next_request_id()
        attrs = SFTPAttributes()
        open_msg = SFTPOpenMessage(request_id, remotepath, SSH_FXF_READ, attrs)

        response = self._send_request_and_wait_response(open_msg)
        if isinstance(response, SFTPStatusMessage):
            if response.status_code != SSH_FX_OK:
                raise SFTPError.from_status(response.status_code, response.message)
            else:
                raise SFTPError("Unexpected status response to open request")
        elif not isinstance(response, SFTPHandleMessage):
            raise SFTPError("Expected handle message for file open")

        handle = response.handle

        try:
            # Open local file for writing
            _CHUNK = 32768
            _DEPTH = 32
            with open(localpath, "wb") as local_file:
                offset = 0
                in_flight: list[int] = []
                eof = False

                while not eof or in_flight:
                    # Issue read requests to fill the pipeline
                    while not eof and len(in_flight) < _DEPTH:
                        request_id = self._get_next_request_id()
                        read_msg = SFTPReadMessage(
                            request_id, handle, offset, _CHUNK
                        )
                        self._send_message(read_msg)
                        in_flight.append(request_id)
                        offset += _CHUNK

                    if not in_flight:
                        break

                    # Collect the next in-order response
                    rid = in_flight.pop(0)
                    response = self._receive_message_for_id(rid)

                    if isinstance(response, SFTPDataMessage):
                        local_file.write(response.data)
                    elif isinstance(response, SFTPStatusMessage):
                        if response.status_code == SSH_FX_EOF:
                            eof = True
                        else:
                            raise SFTPError.from_status(
                                response.status_code, response.message
                            )
                    else:
                        raise SFTPError("Unexpected response to read request")

        finally:
            # Close remote file
            request_id = self._get_next_request_id()
            close_msg = SFTPCloseMessage(request_id, handle)
            self._send_request_and_wait_response(close_msg)

    except Exception as e:
        if isinstance(e, SFTPError):
            raise
        raise SFTPError(f"File download failed: {e}", filename=remotepath) from e
get_recursive(remotepath, localpath)

Download directory recursively.

Parameters:

Name Type Description Default
remotepath str

Remote directory path

required
localpath str

Local destination path

required
Source code in spindlex/client/sftp_client.py
def get_recursive(self, remotepath: str, localpath: str) -> None:
    """
    Download directory recursively.

    Args:
        remotepath: Remote directory path
        localpath: Local destination path
    """
    import stat

    attrs = self.stat(remotepath)
    if not stat.S_ISDIR(attrs.st_mode or 0):
        self.get(remotepath, localpath)
        return

    if not os.path.exists(localpath):
        os.makedirs(localpath)

    for item in self.listdir(remotepath):
        # SFTP paths always use forward slash
        remote_item = (
            f"{remotepath}/{item}"
            if not remotepath.endswith("/")
            else f"{remotepath}{item}"
        )
        local_item = os.path.join(localpath, item)
        self.get_recursive(remote_item, local_item)
getcwd()

Get current working directory.

Returns:

Type Description
str

Current working directory path

Raises:

Type Description
SFTPError

If operation fails

Source code in spindlex/client/sftp_client.py
def getcwd(self) -> str:
    """
    Get current working directory.

    Returns:
        Current working directory path

    Raises:
        SFTPError: If operation fails
    """
    try:
        request_id = self._get_next_request_id()
        from ..protocol.sftp_messages import SFTPRealPathMessage

        realpath_msg = SFTPRealPathMessage(request_id, ".")

        response = self._send_request_and_wait_response(realpath_msg)

        if isinstance(response, SFTPStatusMessage):
            if response.status_code != SSH_FX_OK:
                raise SFTPError.from_status(response.status_code, response.message)
            else:
                raise SFTPError("Unexpected status response to realpath request")
        else:
            from ..protocol.sftp_messages import SFTPNameMessage

            if isinstance(response, SFTPNameMessage):
                if response.names:
                    return response.names[0][0]  # First filename in response
                else:
                    raise SFTPError("Empty response to realpath request")
            else:
                raise SFTPError("Unexpected response to realpath request")

    except Exception as e:
        if isinstance(e, SFTPError):
            raise
        raise SFTPError(f"Get current directory failed: {e}") from e
listdir(path='.')

List directory contents.

Parameters:

Name Type Description Default
path str

Directory path to list

'.'

Returns:

Type Description
list[str]

List of filenames in directory

Raises:

Type Description
SFTPError

If directory listing fails

Source code in spindlex/client/sftp_client.py
def listdir(self, path: str = ".") -> list[str]:
    """
    List directory contents.

    Args:
        path: Directory path to list

    Returns:
        List of filenames in directory

    Raises:
        SFTPError: If directory listing fails
    """
    try:
        # Open directory
        request_id = self._get_next_request_id()
        from ..protocol.sftp_messages import SFTPOpenDirMessage

        opendir_msg = SFTPOpenDirMessage(request_id, path)

        response = self._send_request_and_wait_response(opendir_msg)
        if isinstance(response, SFTPStatusMessage):
            if response.status_code != SSH_FX_OK:
                raise SFTPError.from_status(response.status_code, response.message)
            else:
                raise SFTPError("Unexpected status response to opendir request")
        elif not isinstance(response, SFTPHandleMessage):
            raise SFTPError("Expected handle message for directory open")

        handle = response.handle
        filenames = []

        try:
            while True:
                # Read directory entries
                request_id = self._get_next_request_id()
                from ..protocol.sftp_messages import SFTPReadDirMessage

                readdir_msg = SFTPReadDirMessage(request_id, handle)

                response = self._send_request_and_wait_response(readdir_msg)

                if isinstance(response, SFTPStatusMessage):
                    if response.status_code == SSH_FX_EOF:
                        break  # End of directory reached
                    else:
                        raise SFTPError.from_status(
                            response.status_code, response.message
                        )
                else:
                    from ..protocol.sftp_messages import SFTPNameMessage

                    if isinstance(response, SFTPNameMessage):
                        for filename, _longname, _attrs in response.names:
                            # Skip . and .. entries
                            if filename not in (".", ".."):
                                filenames.append(filename)
                    else:
                        raise SFTPError("Unexpected response to readdir request")

        finally:
            # Close directory handle
            request_id = self._get_next_request_id()
            close_msg = SFTPCloseMessage(request_id, handle)
            response = self._send_request_and_wait_response(close_msg)
            if isinstance(response, SFTPStatusMessage):
                if response.status_code != SSH_FX_OK:
                    raise SFTPError.from_status(
                        response.status_code, response.message
                    )

        return filenames

    except Exception as e:
        if isinstance(e, SFTPError):
            raise
        raise SFTPError(f"Directory listing failed: {e}", filename=path)
lstat(path)

Get file/directory attributes (don't follow symlinks).

Parameters:

Name Type Description Default
path str

Path to file or directory

required

Returns:

Type Description
SFTPAttributes

SFTPAttributes object with file information

Raises:

Type Description
SFTPError

If lstat operation fails

Source code in spindlex/client/sftp_client.py
def lstat(self, path: str) -> SFTPAttributes:
    """
    Get file/directory attributes (don't follow symlinks).

    Args:
        path: Path to file or directory

    Returns:
        SFTPAttributes object with file information

    Raises:
        SFTPError: If lstat operation fails
    """
    try:
        request_id = self._get_next_request_id()
        from ..protocol.sftp_messages import SFTPLStatMessage

        lstat_msg = SFTPLStatMessage(request_id, path)

        response = self._send_request_and_wait_response(lstat_msg)

        if isinstance(response, SFTPStatusMessage):
            if response.status_code != SSH_FX_OK:
                raise SFTPError.from_status(response.status_code, response.message)
            else:
                raise SFTPError("Unexpected status response to lstat request")
        else:
            from ..protocol.sftp_messages import SFTPAttrsMessage

            if isinstance(response, SFTPAttrsMessage):
                return response.attrs
            else:
                raise SFTPError("Unexpected response to lstat request")

    except Exception as e:
        if isinstance(e, SFTPError):
            raise
        raise SFTPError(f"Lstat operation failed: {e}", filename=path)
mkdir(path, mode=511)

Create directory.

Parameters:

Name Type Description Default
path str

Directory path to create

required
mode int

Directory permissions

511

Raises:

Type Description
SFTPError

If directory creation fails

Source code in spindlex/client/sftp_client.py
def mkdir(self, path: str, mode: int = 0o777) -> None:
    """
    Create directory.

    Args:
        path: Directory path to create
        mode: Directory permissions

    Raises:
        SFTPError: If directory creation fails
    """
    try:
        # Create attributes with permissions
        attrs = SFTPAttributes()
        attrs.flags = SSH_FILEXFER_ATTR_PERMISSIONS
        attrs.permissions = mode

        request_id = self._get_next_request_id()
        from ..protocol.sftp_messages import SFTPMkdirMessage

        mkdir_msg = SFTPMkdirMessage(request_id, path, attrs)

        response = self._send_request_and_wait_response(mkdir_msg)

        if isinstance(response, SFTPStatusMessage):
            if response.status_code != SSH_FX_OK:
                raise SFTPError.from_status(response.status_code, response.message)
        else:
            raise SFTPError("Unexpected response to mkdir request")

    except Exception as e:
        if isinstance(e, SFTPError):
            raise
        raise SFTPError(f"Directory creation failed: {e}", filename=path)
normalize(path)

Normalize path (resolve . and .. components).

Parameters:

Name Type Description Default
path str

Path to normalize

required

Returns:

Type Description
str

Normalized path

Raises:

Type Description
SFTPError

If operation fails

Source code in spindlex/client/sftp_client.py
def normalize(self, path: str) -> str:
    """
    Normalize path (resolve . and .. components).

    Args:
        path: Path to normalize

    Returns:
        Normalized path

    Raises:
        SFTPError: If operation fails
    """
    try:
        request_id = self._get_next_request_id()
        from ..protocol.sftp_messages import SFTPRealPathMessage

        realpath_msg = SFTPRealPathMessage(request_id, path)

        response = self._send_request_and_wait_response(realpath_msg)

        if isinstance(response, SFTPStatusMessage):
            if response.status_code != SSH_FX_OK:
                raise SFTPError.from_status(response.status_code, response.message)
            else:
                raise SFTPError("Unexpected status response to realpath request")
        else:
            from ..protocol.sftp_messages import SFTPNameMessage

            if isinstance(response, SFTPNameMessage):
                if response.names:
                    return response.names[0][0]  # First filename in response
                else:
                    raise SFTPError("Empty response to realpath request")
            else:
                raise SFTPError("Unexpected response to realpath request")

    except Exception as e:
        if isinstance(e, SFTPError):
            raise
        raise SFTPError(f"Path normalization failed: {e}", filename=path) from e
open(filename, mode='r')

Open remote file.

Parameters:

Name Type Description Default
filename str

Remote file path

required
mode str

File open mode (r, w, a, rb, wb, ab)

'r'

Returns:

Type Description
SFTPFile

SFTPFile object

Raises:

Type Description
SFTPError

If file open fails

Source code in spindlex/client/sftp_client.py
def open(self, filename: str, mode: str = "r") -> "SFTPFile":
    """
    Open remote file.

    Args:
        filename: Remote file path
        mode: File open mode (r, w, a, rb, wb, ab)

    Returns:
        SFTPFile object

    Raises:
        SFTPError: If file open fails
    """
    try:
        flags = self._mode_to_flags(mode)
        attrs = SFTPAttributes()
        request_id = self._get_next_request_id()

        open_msg = SFTPOpenMessage(request_id, filename, flags, attrs)
        response = self._send_request_and_wait_response(open_msg)

        if isinstance(response, SFTPHandleMessage):
            return SFTPFile(self, response.handle, mode)
        elif isinstance(response, SFTPStatusMessage):
            raise SFTPError.from_status(response.status_code, response.message)
        else:
            raise SFTPError("Unexpected response to open request")

    except Exception as e:
        if isinstance(e, SFTPError):
            raise
        raise SFTPError(f"File open failed: {e}", filename=filename)
put(localpath, remotepath)

Upload file to remote server.

Parameters:

Name Type Description Default
localpath str

Path to local file

required
remotepath str

Path for remote file

required

Raises:

Type Description
SFTPError

If file upload fails

Source code in spindlex/client/sftp_client.py
def put(self, localpath: str, remotepath: str) -> None:
    """
    Upload file to remote server.

    Args:
        localpath: Path to local file
        remotepath: Path for remote file

    Raises:
        SFTPError: If file upload fails
    """
    try:
        # Get local file size
        file_size = os.path.getsize(localpath)

        # Create attributes with file size
        attrs = SFTPAttributes()
        attrs.flags = SSH_FILEXFER_ATTR_SIZE
        attrs.size = file_size

        # Open remote file for writing
        request_id = self._get_next_request_id()
        pflags = SSH_FXF_WRITE | SSH_FXF_CREAT | SSH_FXF_TRUNC
        open_msg = SFTPOpenMessage(request_id, remotepath, pflags, attrs)

        response = self._send_request_and_wait_response(open_msg)
        if isinstance(response, SFTPStatusMessage):
            if response.status_code != SSH_FX_OK:
                raise SFTPError.from_status(response.status_code, response.message)
            else:
                raise SFTPError("Unexpected status response to open request")
        elif not isinstance(response, SFTPHandleMessage):
            raise SFTPError("Expected handle message for file open")

        handle = response.handle

        try:
            _CHUNK = self._max_write_len
            _DEPTH = 32
            with open(localpath, "rb") as local_file:
                offset = 0
                in_flight: list[int] = []

                while True:
                    # Fill the pipeline with write requests
                    while len(in_flight) < _DEPTH:
                        chunk = local_file.read(_CHUNK)
                        if not chunk:
                            break
                        request_id = self._get_next_request_id()
                        write_msg = SFTPWriteMessage(
                            request_id, handle, offset, chunk
                        )
                        self._send_message(write_msg)
                        in_flight.append(request_id)
                        offset += len(chunk)

                    if not in_flight:
                        break

                    # Collect one ACK before refilling
                    rid = in_flight.pop(0)
                    response = self._receive_message_for_id(rid)
                    if isinstance(response, SFTPStatusMessage):
                        if response.status_code != SSH_FX_OK:
                            raise SFTPError.from_status(
                                response.status_code, response.message
                            )
                    else:
                        raise SFTPError("Unexpected response to write request")

        finally:
            # Close remote file
            request_id = self._get_next_request_id()
            close_msg = SFTPCloseMessage(request_id, handle)
            self._send_request_and_wait_response(close_msg)

    except Exception as e:
        if isinstance(e, SFTPError):
            raise
        raise SFTPError(f"File upload failed: {e}", filename=localpath)
put_recursive(localpath, remotepath)

Upload directory recursively.

Parameters:

Name Type Description Default
localpath str

Local directory path

required
remotepath str

Remote destination path

required
Source code in spindlex/client/sftp_client.py
def put_recursive(self, localpath: str, remotepath: str) -> None:
    """
    Upload directory recursively.

    Args:
        localpath: Local directory path
        remotepath: Remote destination path
    """
    if not os.path.isdir(localpath):
        self.put(localpath, remotepath)
        return

    try:
        self.mkdir(remotepath)
    except SFTPError as e:
        if e.sftp_code != SFTPError.SSH_FX_FAILURE:
            raise

    for item in os.listdir(localpath):
        local_item = os.path.join(localpath, item)
        remote_item = (
            f"{remotepath}/{item}"
            if not remotepath.endswith("/")
            else f"{remotepath}{item}"
        )
        self.put_recursive(local_item, remote_item)

Read symbolic link.

Parameters:

Name Type Description Default
path str

Path to symbolic link

required

Returns:

Type Description
str

Target path of the link

Raises:

Type Description
SFTPError

If operation fails

Source code in spindlex/client/sftp_client.py
def readlink(self, path: str) -> str:
    """
    Read symbolic link.

    Args:
        path: Path to symbolic link

    Returns:
        Target path of the link

    Raises:
        SFTPError: If operation fails
    """
    try:
        request_id = self._get_next_request_id()
        from ..protocol.sftp_messages import SFTPReadLinkMessage

        readlink_msg = SFTPReadLinkMessage(request_id, path)

        response = self._send_request_and_wait_response(readlink_msg)

        if isinstance(response, SFTPStatusMessage):
            if response.status_code != SSH_FX_OK:
                raise SFTPError.from_status(response.status_code, response.message)
            else:
                raise SFTPError("Unexpected status response to readlink request")
        else:
            from ..protocol.sftp_messages import SFTPNameMessage

            if isinstance(response, SFTPNameMessage):
                if response.names:
                    return response.names[0][0]  # Target path
                else:
                    raise SFTPError("Empty response to readlink request")
            else:
                raise SFTPError("Unexpected response to readlink request")

    except Exception as e:
        if isinstance(e, SFTPError):
            raise
        raise SFTPError(f"Readlink failed: {e}", filename=path) from e
remove(path)

Remove file.

Parameters:

Name Type Description Default
path str

File path to remove

required

Raises:

Type Description
SFTPError

If file removal fails

Source code in spindlex/client/sftp_client.py
def remove(self, path: str) -> None:
    """
    Remove file.

    Args:
        path: File path to remove

    Raises:
        SFTPError: If file removal fails
    """
    try:
        request_id = self._get_next_request_id()
        from ..protocol.sftp_messages import SFTPRemoveMessage

        remove_msg = SFTPRemoveMessage(request_id, path)

        response = self._send_request_and_wait_response(remove_msg)

        if isinstance(response, SFTPStatusMessage):
            if response.status_code != SSH_FX_OK:
                raise SFTPError.from_status(response.status_code, response.message)
        else:
            raise SFTPError("Unexpected response to remove request")

    except Exception as e:
        if isinstance(e, SFTPError):
            raise
        raise SFTPError(f"File removal failed: {e}", filename=path)
rename(oldpath, newpath)

Rename file or directory.

Parameters:

Name Type Description Default
oldpath str

Current path

required
newpath str

New path

required

Raises:

Type Description
SFTPError

If rename operation fails

Source code in spindlex/client/sftp_client.py
def rename(self, oldpath: str, newpath: str) -> None:
    """
    Rename file or directory.

    Args:
        oldpath: Current path
        newpath: New path

    Raises:
        SFTPError: If rename operation fails
    """
    try:
        request_id = self._get_next_request_id()
        from ..protocol.sftp_messages import SFTPRenameMessage

        rename_msg = SFTPRenameMessage(request_id, oldpath, newpath)

        response = self._send_request_and_wait_response(rename_msg)

        if isinstance(response, SFTPStatusMessage):
            if response.status_code != SSH_FX_OK:
                raise SFTPError.from_status(response.status_code, response.message)
        else:
            raise SFTPError("Unexpected response to rename request")

    except Exception as e:
        if isinstance(e, SFTPError):
            raise
        raise SFTPError(f"Rename operation failed: {e}") from e
rmdir(path)

Remove directory.

Parameters:

Name Type Description Default
path str

Directory path to remove

required

Raises:

Type Description
SFTPError

If directory removal fails

Source code in spindlex/client/sftp_client.py
def rmdir(self, path: str) -> None:
    """
    Remove directory.

    Args:
        path: Directory path to remove

    Raises:
        SFTPError: If directory removal fails
    """
    try:
        request_id = self._get_next_request_id()
        from ..protocol.sftp_messages import SFTPRmdirMessage

        rmdir_msg = SFTPRmdirMessage(request_id, path)

        response = self._send_request_and_wait_response(rmdir_msg)

        if isinstance(response, SFTPStatusMessage):
            if response.status_code != SSH_FX_OK:
                raise SFTPError.from_status(response.status_code, response.message)
        else:
            raise SFTPError("Unexpected response to rmdir request")

    except Exception as e:
        if isinstance(e, SFTPError):
            raise
        raise SFTPError(f"Directory removal failed: {e}", filename=path)
stat(path)

Get file/directory attributes.

Parameters:

Name Type Description Default
path str

Path to file or directory

required

Returns:

Type Description
SFTPAttributes

SFTPAttributes object with file information

Raises:

Type Description
SFTPError

If stat operation fails

Source code in spindlex/client/sftp_client.py
def stat(self, path: str) -> SFTPAttributes:
    """
    Get file/directory attributes.

    Args:
        path: Path to file or directory

    Returns:
        SFTPAttributes object with file information

    Raises:
        SFTPError: If stat operation fails
    """
    try:
        request_id = self._get_next_request_id()
        from ..protocol.sftp_messages import SFTPStatMessage

        stat_msg = SFTPStatMessage(request_id, path)

        response = self._send_request_and_wait_response(stat_msg)

        if isinstance(response, SFTPStatusMessage):
            if response.status_code != SSH_FX_OK:
                raise SFTPError.from_status(response.status_code, response.message)
            else:
                raise SFTPError("Unexpected status response to stat request")
        else:
            from ..protocol.sftp_messages import SFTPAttrsMessage

            if isinstance(response, SFTPAttrsMessage):
                return response.attrs
            else:
                raise SFTPError("Unexpected response to stat request")

    except Exception as e:
        if isinstance(e, SFTPError):
            raise
        raise SFTPError(f"Stat operation failed: {e}", filename=path)

Create symbolic link.

Parameters:

Name Type Description Default
targetpath str

Target path for the link

required
linkpath str

Path where link should be created

required

Raises:

Type Description
SFTPError

If symlink creation fails

Source code in spindlex/client/sftp_client.py
def symlink(self, targetpath: str, linkpath: str) -> None:
    """
    Create symbolic link.

    Args:
        targetpath: Target path for the link
        linkpath: Path where link should be created

    Raises:
        SFTPError: If symlink creation fails
    """
    try:
        request_id = self._get_next_request_id()
        from ..protocol.sftp_messages import SFTPSymlinkMessage

        symlink_msg = SFTPSymlinkMessage(request_id, targetpath, linkpath)

        response = self._send_request_and_wait_response(symlink_msg)

        if isinstance(response, SFTPStatusMessage):
            if response.status_code != SSH_FX_OK:
                raise SFTPError.from_status(response.status_code, response.message)
        else:
            raise SFTPError("Unexpected response to symlink request")

    except Exception as e:
        if isinstance(e, SFTPError):
            raise
        raise SFTPError(f"Symlink creation failed: {e}") from e

SFTPFile

SFTP file object for remote file operations.

Source code in spindlex/client/sftp_client.py
class SFTPFile:
    """SFTP file object for remote file operations."""

    _PIPELINE_DEPTH = 32

    def __init__(self, client: "SFTPClient", handle: bytes, mode: str) -> None:
        """
        Initialize SFTP file.

        Args:
            client: SFTP client instance
            handle: File handle from server
            mode: File open mode
        """
        self._client = client
        self._handle = handle
        self._mode = mode
        self._offset = 0
        self._send_offset = 0
        self._closed = False
        self._write_queue: list[tuple[int, int]] = []  # (request_id, data_length)

    def read(self, size: int = -1) -> bytes:
        """
        Read data from remote file.

        Args:
            size: Number of bytes to read (-1 for all)

        Returns:
            Read data
        """
        if self._closed:
            raise SFTPError("File is closed")

        if size < 0:
            # Read until EOF using a pipelined window of concurrent requests.
            _CHUNK = SFTP_MAX_READ_SIZE
            result = bytearray()
            in_flight: list[int] = []
            eof = False
            offset = self._offset

            while not eof or in_flight:
                while not eof and len(in_flight) < self._PIPELINE_DEPTH:
                    rid = self._client._get_next_request_id()
                    self._client._send_message(
                        SFTPReadMessage(rid, self._handle, offset, _CHUNK)
                    )
                    in_flight.append(rid)
                    offset += _CHUNK

                if not in_flight:
                    break

                rid = in_flight.pop(0)
                response = self._client._receive_message_for_id(rid)

                if isinstance(response, SFTPDataMessage):
                    result.extend(response.data)
                elif isinstance(response, SFTPStatusMessage):
                    if response.status_code == SSH_FX_EOF:
                        eof = True
                    else:
                        raise SFTPError.from_status(
                            response.status_code, response.message
                        )
                else:
                    raise SFTPError("Unexpected response to read request")

            self._offset += len(result)
            return bytes(result)

        request_id = self._client._get_next_request_id()
        read_msg = SFTPReadMessage(request_id, self._handle, self._offset, size)

        response = self._client._send_request_and_wait_response(read_msg)

        if isinstance(response, SFTPDataMessage):
            self._offset += len(response.data)
            return response.data
        elif isinstance(response, SFTPStatusMessage):
            if response.status_code == SSH_FX_EOF:
                return b""
            raise SFTPError.from_status(response.status_code, response.message)
        else:
            raise SFTPError("Unexpected response to read request")

    def write(self, data: bytes) -> int:
        """
        Write data to remote file.

        Args:
            data: Data to write

        Returns:
            Number of bytes written
        """
        if self._closed:
            raise SFTPError("File is closed")

        _MAX_CHUNK = self._client._max_write_len
        offset = 0
        while offset < len(data):
            chunk = data[offset : offset + _MAX_CHUNK]
            request_id = self._client._get_next_request_id()
            write_msg = SFTPWriteMessage(
                request_id, self._handle, self._send_offset, chunk
            )
            self._client._send_message(write_msg)
            chunk_len = len(chunk)
            self._send_offset += chunk_len
            self._write_queue.append((request_id, chunk_len))

            # Collect the oldest pending ACK when the pipeline is full so we
            # surface write errors promptly and keep memory usage bounded.
            if len(self._write_queue) >= self._PIPELINE_DEPTH:
                rid, nbytes = self._write_queue.pop(0)
                response = self._client._receive_message_for_id(rid)
                if isinstance(response, SFTPStatusMessage):
                    if response.status_code != SSH_FX_OK:
                        raise SFTPError.from_status(
                            response.status_code, response.message
                        )
                    self._offset += nbytes
                else:
                    raise SFTPError("Unexpected response to write request")

            offset += chunk_len

        return len(data)

    def _flush_write_queue(self) -> None:
        """Drain all outstanding pipelined write ACKs."""
        for rid, nbytes in self._write_queue:
            response = self._client._receive_message_for_id(rid)
            if isinstance(response, SFTPStatusMessage):
                if response.status_code != SSH_FX_OK:
                    raise SFTPError.from_status(response.status_code, response.message)
                self._offset += nbytes
            else:
                raise SFTPError("Unexpected response to write request")
        self._write_queue.clear()

    def close(self) -> None:
        """Close remote file."""
        if not self._closed:
            self._closed = True
            try:
                self._flush_write_queue()
            finally:
                request_id = self._client._get_next_request_id()
                close_msg = SFTPCloseMessage(request_id, self._handle)
                self._client._send_request_and_wait_response(close_msg)

    def __enter__(self) -> "SFTPFile":
        """Context manager entry."""
        return self

    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        """Context manager exit."""
        self.close()
Methods:
__enter__()

Context manager entry.

Source code in spindlex/client/sftp_client.py
def __enter__(self) -> "SFTPFile":
    """Context manager entry."""
    return self
__exit__(exc_type, exc_val, exc_tb)

Context manager exit.

Source code in spindlex/client/sftp_client.py
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
    """Context manager exit."""
    self.close()
__init__(client, handle, mode)

Initialize SFTP file.

Parameters:

Name Type Description Default
client SFTPClient

SFTP client instance

required
handle bytes

File handle from server

required
mode str

File open mode

required
Source code in spindlex/client/sftp_client.py
def __init__(self, client: "SFTPClient", handle: bytes, mode: str) -> None:
    """
    Initialize SFTP file.

    Args:
        client: SFTP client instance
        handle: File handle from server
        mode: File open mode
    """
    self._client = client
    self._handle = handle
    self._mode = mode
    self._offset = 0
    self._send_offset = 0
    self._closed = False
    self._write_queue: list[tuple[int, int]] = []  # (request_id, data_length)
close()

Close remote file.

Source code in spindlex/client/sftp_client.py
def close(self) -> None:
    """Close remote file."""
    if not self._closed:
        self._closed = True
        try:
            self._flush_write_queue()
        finally:
            request_id = self._client._get_next_request_id()
            close_msg = SFTPCloseMessage(request_id, self._handle)
            self._client._send_request_and_wait_response(close_msg)
read(size=-1)

Read data from remote file.

Parameters:

Name Type Description Default
size int

Number of bytes to read (-1 for all)

-1

Returns:

Type Description
bytes

Read data

Source code in spindlex/client/sftp_client.py
def read(self, size: int = -1) -> bytes:
    """
    Read data from remote file.

    Args:
        size: Number of bytes to read (-1 for all)

    Returns:
        Read data
    """
    if self._closed:
        raise SFTPError("File is closed")

    if size < 0:
        # Read until EOF using a pipelined window of concurrent requests.
        _CHUNK = SFTP_MAX_READ_SIZE
        result = bytearray()
        in_flight: list[int] = []
        eof = False
        offset = self._offset

        while not eof or in_flight:
            while not eof and len(in_flight) < self._PIPELINE_DEPTH:
                rid = self._client._get_next_request_id()
                self._client._send_message(
                    SFTPReadMessage(rid, self._handle, offset, _CHUNK)
                )
                in_flight.append(rid)
                offset += _CHUNK

            if not in_flight:
                break

            rid = in_flight.pop(0)
            response = self._client._receive_message_for_id(rid)

            if isinstance(response, SFTPDataMessage):
                result.extend(response.data)
            elif isinstance(response, SFTPStatusMessage):
                if response.status_code == SSH_FX_EOF:
                    eof = True
                else:
                    raise SFTPError.from_status(
                        response.status_code, response.message
                    )
            else:
                raise SFTPError("Unexpected response to read request")

        self._offset += len(result)
        return bytes(result)

    request_id = self._client._get_next_request_id()
    read_msg = SFTPReadMessage(request_id, self._handle, self._offset, size)

    response = self._client._send_request_and_wait_response(read_msg)

    if isinstance(response, SFTPDataMessage):
        self._offset += len(response.data)
        return response.data
    elif isinstance(response, SFTPStatusMessage):
        if response.status_code == SSH_FX_EOF:
            return b""
        raise SFTPError.from_status(response.status_code, response.message)
    else:
        raise SFTPError("Unexpected response to read request")
write(data)

Write data to remote file.

Parameters:

Name Type Description Default
data bytes

Data to write

required

Returns:

Type Description
int

Number of bytes written

Source code in spindlex/client/sftp_client.py
def write(self, data: bytes) -> int:
    """
    Write data to remote file.

    Args:
        data: Data to write

    Returns:
        Number of bytes written
    """
    if self._closed:
        raise SFTPError("File is closed")

    _MAX_CHUNK = self._client._max_write_len
    offset = 0
    while offset < len(data):
        chunk = data[offset : offset + _MAX_CHUNK]
        request_id = self._client._get_next_request_id()
        write_msg = SFTPWriteMessage(
            request_id, self._handle, self._send_offset, chunk
        )
        self._client._send_message(write_msg)
        chunk_len = len(chunk)
        self._send_offset += chunk_len
        self._write_queue.append((request_id, chunk_len))

        # Collect the oldest pending ACK when the pipeline is full so we
        # surface write errors promptly and keep memory usage bounded.
        if len(self._write_queue) >= self._PIPELINE_DEPTH:
            rid, nbytes = self._write_queue.pop(0)
            response = self._client._receive_message_for_id(rid)
            if isinstance(response, SFTPStatusMessage):
                if response.status_code != SSH_FX_OK:
                    raise SFTPError.from_status(
                        response.status_code, response.message
                    )
                self._offset += nbytes
            else:
                raise SFTPError("Unexpected response to write request")

        offset += chunk_len

    return len(data)

spindlex.client.async_sftp_client

Async SFTP Client Implementation

Provides asynchronous SFTP client functionality for file operations.

Classes

AsyncSFTPClient

Async SFTP client for file transfer operations.

Provides asynchronous versions of all SFTP operations for use in async/await applications and high-concurrency scenarios.

Source code in spindlex/client/async_sftp_client.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
class AsyncSFTPClient:
    """
    Async SFTP client for file transfer operations.

    Provides asynchronous versions of all SFTP operations for use
    in async/await applications and high-concurrency scenarios.
    """

    def __init__(self, channel: Any) -> None:
        """
        Initialize async SFTP client.

        Args:
            channel: SSH channel for SFTP subsystem
        """
        self._channel = channel
        self._request_id = 0
        self._pending_requests: dict[int, asyncio.Future] = {}
        self._buffered_responses: dict[int, Any] = {}
        self._initialized = False
        self._dispatch_task: Optional[asyncio.Task] = None
        self._lock = asyncio.Lock()
        self._logger = logging.getLogger(__name__)

    async def _initialize(self) -> None:
        """Initialize SFTP subsystem."""
        if self._initialized:
            return

        # Start dispatcher task
        self._dispatch_task = asyncio.create_task(self._dispatch_loop())

        try:
            # Send SFTP init message
            init_msg = SFTPInitMessage(version=SFTP_VERSION)
            await self._send_message(init_msg)

            # Wait for version response (special case in dispatcher uses ID -2)
            fut = asyncio.get_running_loop().create_future()
            self._pending_requests[_SFTP_INIT_SENTINEL] = fut

            response = await fut
            if not isinstance(response, SFTPVersionMessage):
                raise SFTPError("Expected SFTP version message")
            self._initialized = True
        except Exception as e:
            await self.close()
            if isinstance(e, SFTPError):
                raise
            raise SFTPError(f"SFTP initialization failed: {e}") from e

    async def _dispatch_loop(self) -> None:
        """Background loop to receive and dispatch SFTP messages."""
        try:
            while self._channel and not self._channel.closed:
                try:
                    response = await self._recv_message()

                    # SSH_FXP_VERSION doesn't have request_id in protocol but our class might handle it
                    # In SFTP protocol, VERSION is the only one without ID
                    request_id: Optional[int]
                    if isinstance(response, SFTPVersionMessage):
                        request_id = _SFTP_INIT_SENTINEL
                    else:
                        request_id = getattr(response, "request_id", None)

                    if request_id is not None:
                        if request_id in self._pending_requests:
                            fut = self._pending_requests.pop(request_id)
                            if not fut.done():
                                fut.set_result(response)
                        else:
                            # Response arrived before _wait_for_response registered a
                            # future (race between send and future registration).
                            # Buffer it so _wait_for_response can collect it synchronously.
                            self._buffered_responses[request_id] = response
                except asyncio.CancelledError:
                    break
                except Exception as e:
                    # Cancel all pending requests on error
                    for fut in list(self._pending_requests.values()):
                        if not fut.done():
                            fut.set_exception(e)
                    self._pending_requests.clear()
                    break
        finally:
            self._initialized = False

    async def remove(self, path: str) -> None:
        """
        Remove remote file asynchronously.

        Args:
            path: Remote file path to remove

        Raises:
            SFTPError: If removal fails
        """
        try:
            request_id = self._get_next_request_id()
            from ..protocol.sftp_messages import SFTPRemoveMessage

            # Send remove request
            remove_msg = SFTPRemoveMessage(request_id=request_id, filename=path)
            await self._send_message(remove_msg)

            # Wait for response
            response = await self._wait_for_response(request_id)

            if isinstance(response, SFTPStatusMessage):
                if response.status_code != SSH_FX_OK:
                    raise SFTPError(
                        f"File removal failed: {response.message}", response.status_code
                    )
            else:
                raise SFTPError("Unexpected response to remove request")

        except Exception as e:
            if isinstance(e, SFTPError):
                raise
            raise SFTPError(f"File removal failed: {e}") from e

    async def get(self, remotepath: str, localpath: str) -> None:
        """
        Download file from remote server asynchronously.

        Args:
            remotepath: Remote file path
            localpath: Local file path

        Raises:
            SFTPError: If download fails
        """
        _CHUNK = 32768
        _WINDOW = 64
        try:
            remote_file = await self.open(remotepath, "rb")
            try:
                loop = asyncio.get_running_loop()
                with open(localpath, "wb") as local_file:
                    offset = 0
                    inflight: list[asyncio.Future] = []
                    done = False

                    while not done or inflight:
                        # Fill pipeline up to window size
                        while not done and len(inflight) < _WINDOW:
                            req_id = self._get_next_request_id()
                            fut: asyncio.Future = loop.create_future()
                            self._pending_requests[req_id] = fut
                            msg = SFTPReadMessage(
                                request_id=req_id,
                                handle=remote_file._handle,
                                offset=offset,
                                length=_CHUNK,
                            )
                            await self._send_message(msg)
                            inflight.append(fut)
                            offset += _CHUNK

                        if not inflight:
                            break

                        # Drain oldest in-order
                        response = await inflight.pop(0)
                        if isinstance(response, SFTPDataMessage):
                            local_file.write(response.data)
                        elif isinstance(response, SFTPStatusMessage):
                            if response.status_code == SSH_FX_EOF:
                                done = True
                            else:
                                raise SFTPError(
                                    f"Read failed: {response.message}",
                                    response.status_code,
                                )
                        else:
                            raise SFTPError("Unexpected response to read request")
            finally:
                await remote_file.close()

        except Exception as e:
            if isinstance(e, SFTPError):
                raise
            raise SFTPError(f"File download failed: {e}") from e

    async def put(self, localpath: str, remotepath: str) -> None:
        """
        Upload file to remote server asynchronously.

        Args:
            localpath: Local file path
            remotepath: Remote file path

        Raises:
            SFTPError: If upload fails
        """
        _CHUNK = 32768
        _WINDOW = 64
        try:
            remote_file = await self.open(remotepath, "wb")
            try:
                loop = asyncio.get_running_loop()
                with open(localpath, "rb") as local_file:
                    offset = 0
                    inflight: list[asyncio.Future] = []

                    while True:
                        chunk = local_file.read(_CHUNK)
                        if not chunk:
                            break

                        req_id = self._get_next_request_id()
                        fut: asyncio.Future = loop.create_future()
                        self._pending_requests[req_id] = fut
                        msg = SFTPWriteMessage(
                            request_id=req_id,
                            handle=remote_file._handle,
                            offset=offset,
                            data=chunk,
                        )
                        await self._send_message(msg)
                        inflight.append(fut)
                        offset += len(chunk)

                        # Drain when window is full
                        while len(inflight) >= _WINDOW:
                            response = await inflight.pop(0)
                            if not isinstance(response, SFTPStatusMessage):
                                raise SFTPError("Unexpected response to write request")
                            if response.status_code != SSH_FX_OK:
                                raise SFTPError(
                                    f"Write failed: {response.message}",
                                    response.status_code,
                                )

                    # Drain remaining inflight
                    for fut in inflight:
                        response = await fut
                        if not isinstance(response, SFTPStatusMessage):
                            raise SFTPError("Unexpected response to write request")
                        if response.status_code != SSH_FX_OK:
                            raise SFTPError(
                                f"Write failed: {response.message}",
                                response.status_code,
                            )
            finally:
                await remote_file.close()

        except Exception as e:
            if isinstance(e, SFTPError):
                raise
            raise SFTPError(f"File upload failed: {e}") from e

    async def get_recursive(self, remotepath: str, localpath: str) -> None:
        """
        Download directory recursively and asynchronously.

        Args:
            remotepath: Remote directory path
            localpath: Local destination path
        """
        import stat

        attrs = await self.stat(remotepath)
        if not stat.S_ISDIR(attrs.st_mode):
            await self.get(remotepath, localpath)
            return

        if not os.path.exists(localpath):
            os.makedirs(localpath)

        items = await self.listdir(remotepath)
        tasks = []
        for item in items:
            remote_item = (
                f"{remotepath}/{item}"
                if not remotepath.endswith("/")
                else f"{remotepath}{item}"
            )
            local_item = os.path.join(localpath, item)
            tasks.append(self.get_recursive(remote_item, local_item))

        if tasks:
            await asyncio.gather(*tasks)

    async def put_recursive(self, localpath: str, remotepath: str) -> None:
        """
        Upload directory recursively and asynchronously.

        Args:
            localpath: Local directory path
            remotepath: Remote destination path
        """
        if not os.path.isdir(localpath):
            await self.put(localpath, remotepath)
            return

        try:
            await self.mkdir(remotepath)
        except SFTPError as e:
            if e.sftp_code != SFTPError.SSH_FX_FAILURE:
                raise

        items = os.listdir(localpath)
        tasks = []
        for item in items:
            local_item = os.path.join(localpath, item)
            remote_item = (
                f"{remotepath}/{item}"
                if not remotepath.endswith("/")
                else f"{remotepath}{item}"
            )
            tasks.append(self.put_recursive(local_item, remote_item))

        if tasks:
            await asyncio.gather(*tasks)

    async def listdir(self, path: str = ".") -> list[str]:
        """
        List directory contents asynchronously.

        Args:
            path: Directory path to list

        Returns:
            List of filenames in directory

        Raises:
            SFTPError: If listing fails
        """
        try:
            # Open directory handle
            handle = await self._opendir(path)

            filenames = []

            try:
                # Read directory entries
                while True:
                    try:
                        entries = await self._readdir(handle)
                        if not entries:
                            break

                        for entry in entries:
                            filename = entry[0]
                            if filename not in (".", ".."):
                                filenames.append(filename)

                    except SFTPError as e:
                        if e.status_code == SSH_FX_EOF:
                            break
                        raise
            finally:
                await self._close(handle)

            return filenames

        except Exception as e:
            if isinstance(e, SFTPError):
                raise
            raise SFTPError(f"Directory listing failed: {e}") from e

    async def stat(self, path: str) -> Any:
        """
        Get file/directory attributes asynchronously.

        Args:
            path: File or directory path

        Returns:
            File attributes

        Raises:
            SFTPError: If stat fails
        """
        try:
            request_id = self._get_next_request_id()

            # Send stat request
            stat_msg = SFTPStatMessage(request_id=request_id, path=path)
            await self._send_message(stat_msg)

            # Wait for response
            response = await self._wait_for_response(request_id)

            if isinstance(response, SFTPAttrsMessage):
                return response.attrs
            elif isinstance(response, SFTPStatusMessage):
                raise SFTPError(
                    f"Stat failed: {response.message}", response.status_code
                )
            else:
                raise SFTPError("Unexpected response to stat request")

        except Exception as e:
            if isinstance(e, SFTPError):
                raise
            raise SFTPError(f"Stat operation failed: {e}") from e

    async def lstat(self, path: str) -> Any:
        """
        Get file/directory attributes (without following symlinks) asynchronously.

        Args:
            path: File or directory path

        Returns:
            File attributes

        Raises:
            SFTPError: If lstat fails
        """
        try:
            request_id = self._get_next_request_id()

            # Send lstat request
            lstat_msg = SFTPLStatMessage(request_id=request_id, path=path)
            await self._send_message(lstat_msg)

            # Wait for response
            response = await self._wait_for_response(request_id)

            if isinstance(response, SFTPAttrsMessage):
                return response.attrs
            elif isinstance(response, SFTPStatusMessage):
                raise SFTPError(
                    f"Lstat failed: {response.message}", response.status_code
                )
            else:
                raise SFTPError("Unexpected response to lstat request")

        except Exception as e:
            if isinstance(e, SFTPError):
                raise
            raise SFTPError(f"Lstat operation failed: {e}") from e

    async def mkdir(self, path: str, mode: int = 0o755) -> None:
        """
        Create directory asynchronously.

        Args:
            path: Directory path to create
            mode: Directory permissions

        Raises:
            SFTPError: If mkdir fails
        """
        try:
            request_id = self._get_next_request_id()

            # Create attributes with mode
            attrs = SFTPAttributes()
            attrs.st_mode = mode

            # Send mkdir request
            mkdir_msg = SFTPMkdirMessage(request_id=request_id, path=path, attrs=attrs)
            await self._send_message(mkdir_msg)

            # Wait for response
            response = await self._wait_for_response(request_id)

            if isinstance(response, SFTPStatusMessage):
                if response.status_code != SSH_FX_OK:
                    raise SFTPError(
                        f"Mkdir failed: {response.message}", response.status_code
                    )
            else:
                raise SFTPError("Unexpected response to mkdir request")

        except Exception as e:
            if isinstance(e, SFTPError):
                raise
            raise SFTPError(f"Directory creation failed: {e}") from e

    async def rmdir(self, path: str) -> None:
        """
        Remove directory asynchronously.

        Args:
            path: Directory path to remove

        Raises:
            SFTPError: If rmdir fails
        """
        try:
            request_id = self._get_next_request_id()

            # Send rmdir request
            rmdir_msg = SFTPRmdirMessage(request_id=request_id, path=path)
            await self._send_message(rmdir_msg)

            # Wait for response
            response = await self._wait_for_response(request_id)

            if isinstance(response, SFTPStatusMessage):
                if response.status_code != SSH_FX_OK:
                    raise SFTPError(
                        f"Rmdir failed: {response.message}", response.status_code
                    )
            else:
                raise SFTPError("Unexpected response to rmdir request")

        except Exception as e:
            if isinstance(e, SFTPError):
                raise
            raise SFTPError(f"Directory removal failed: {e}") from e

    async def open(self, filename: str, mode: str = "r") -> "AsyncSFTPFile":
        """
        Open remote file asynchronously.

        Args:
            filename: Remote file path
            mode: File open mode

        Returns:
            Async SFTP file object

        Raises:
            SFTPError: If file open fails
        """
        try:
            request_id = self._get_next_request_id()

            # Convert mode to SFTP flags
            flags = self._mode_to_flags(mode)

            # Send open request
            open_msg = SFTPOpenMessage(
                request_id=request_id,
                filename=filename,
                pflags=flags,
                attrs=SFTPAttributes(),
            )
            await self._send_message(open_msg)

            # Wait for response
            response = await self._wait_for_response(request_id)

            if isinstance(response, SFTPHandleMessage):
                return AsyncSFTPFile(self, response.handle, mode)
            elif isinstance(response, SFTPStatusMessage):
                raise SFTPError(
                    f"File open failed: {response.message}", response.status_code
                )
            else:
                raise SFTPError("Unexpected response to open request")

        except Exception as e:
            if isinstance(e, SFTPError):
                raise
            raise SFTPError(f"File open failed: {e}") from e

    async def rename(self, oldpath: str, newpath: str) -> None:
        """Rename remote file or directory."""
        try:
            request_id = self._get_next_request_id()
            rename_msg = SFTPRenameMessage(
                request_id=request_id, oldpath=oldpath, newpath=newpath
            )
            await self._send_message(rename_msg)
            response = await self._wait_for_response(request_id)
            if isinstance(response, SFTPStatusMessage):
                if response.status_code != SSH_FX_OK:
                    raise SFTPError(
                        f"Rename failed: {response.message}", response.status_code
                    )
            else:
                raise SFTPError("Unexpected response to rename request")
        except Exception as e:
            if isinstance(e, SFTPError):
                raise
            raise SFTPError(f"Rename failed: {e}") from e

    async def symlink(self, targetpath: str, linkpath: str) -> None:
        """
        Create symbolic link asynchronously.

        Args:
            targetpath: Target path for the link
            linkpath: Path where link should be created

        Raises:
            SFTPError: If symlink fails
        """
        try:
            request_id = self._get_next_request_id()
            symlink_msg = SFTPSymlinkMessage(
                request_id=request_id, targetpath=targetpath, linkpath=linkpath
            )
            await self._send_message(symlink_msg)
            response = await self._wait_for_response(request_id)
            if isinstance(response, SFTPStatusMessage):
                if response.status_code != SSH_FX_OK:
                    raise SFTPError(
                        f"Symlink failed: {response.message}", response.status_code
                    )
            else:
                raise SFTPError("Unexpected response to symlink request")
        except Exception as e:
            if isinstance(e, SFTPError):
                raise
            raise SFTPError(f"Symlink failed: {e}") from e

    async def readlink(self, path: str) -> str:
        """
        Read symbolic link asynchronously.

        Args:
            path: Path to symbolic link

        Returns:
            Target path of the link

        Raises:
            SFTPError: If readlink fails
        """
        try:
            request_id = self._get_next_request_id()
            readlink_msg = SFTPReadLinkMessage(request_id=request_id, path=path)
            await self._send_message(readlink_msg)
            response = await self._wait_for_response(request_id)
            if isinstance(response, SFTPNameMessage):
                if response.names:
                    return response.names[0][0]
                raise SFTPError("Empty response to readlink request")
            elif isinstance(response, SFTPStatusMessage):
                raise SFTPError(
                    f"Readlink failed: {response.message}", response.status_code
                )
            else:
                raise SFTPError("Unexpected response to readlink request")
        except Exception as e:
            if isinstance(e, SFTPError):
                raise
            raise SFTPError(f"Readlink failed: {e}") from e

    async def chmod(self, path: str, mode: int) -> None:
        """Change remote file permissions."""
        try:
            attrs = SFTPAttributes()
            attrs.flags = SSH_FILEXFER_ATTR_PERMISSIONS
            attrs.permissions = mode
            request_id = self._get_next_request_id()
            setstat_msg = SFTPSetStatMessage(
                request_id=request_id, path=path, attrs=attrs
            )
            await self._send_message(setstat_msg)
            response = await self._wait_for_response(request_id)
            if isinstance(response, SFTPStatusMessage):
                if response.status_code != SSH_FX_OK:
                    raise SFTPError(
                        f"Chmod failed: {response.message}", response.status_code
                    )
            else:
                raise SFTPError("Unexpected response to setstat request")
        except Exception as e:
            if isinstance(e, SFTPError):
                raise
            raise SFTPError(f"Chmod failed: {e}") from e

    async def normalize(self, path: str) -> str:
        """Resolve remote path to its absolute canonical form."""
        try:
            request_id = self._get_next_request_id()
            realpath_msg = SFTPRealPathMessage(request_id=request_id, path=path)
            await self._send_message(realpath_msg)
            response = await self._wait_for_response(request_id)
            if isinstance(response, SFTPNameMessage):
                if response.names:
                    return response.names[0][0]
                raise SFTPError("Empty response to realpath request")
            elif isinstance(response, SFTPStatusMessage):
                raise SFTPError(
                    f"Normalize failed: {response.message}", response.status_code
                )
            else:
                raise SFTPError("Unexpected response to realpath request")
        except Exception as e:
            if isinstance(e, SFTPError):
                raise
            raise SFTPError(f"Normalize failed: {e}") from e

    async def close(self) -> None:
        """Close SFTP client and cleanup resources."""
        if self._dispatch_task:
            self._dispatch_task.cancel()
            try:
                await self._dispatch_task
            except asyncio.CancelledError:
                pass
            self._dispatch_task = None

        if self._channel:
            await self._channel.close()
            self._channel = None

        self._initialized = False
        self._pending_requests.clear()
        self._buffered_responses.clear()

    async def __aenter__(self) -> "AsyncSFTPClient":
        """Async context manager entry."""
        return self

    async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        """Async context manager exit."""
        await self.close()

    def _get_next_request_id(self) -> int:
        """Get next request ID."""
        self._request_id = (self._request_id + 1) & 0xFFFFFFFF
        return self._request_id

    def _mode_to_flags(self, mode: str) -> int:
        """Convert file mode string to SFTP flags."""
        flags = 0

        if "r" in mode:
            flags |= SSH_FXF_READ
        if "w" in mode:
            flags |= SSH_FXF_WRITE | SSH_FXF_CREAT | SSH_FXF_TRUNC
        if "a" in mode:
            flags |= SSH_FXF_WRITE | SSH_FXF_CREAT | SSH_FXF_APPEND
        if "x" in mode:
            flags |= SSH_FXF_WRITE | SSH_FXF_CREAT | SSH_FXF_EXCL

        return flags

    async def _send_message(self, message: Any) -> None:
        """Send SFTP message through channel (with locking)."""
        data = message.pack()
        async with self._lock:
            await self._channel.send(data)

    async def _recv_message(self) -> Any:
        """Receive SFTP message from channel."""
        # Read message length
        length_data = await self._channel.recv_exactly(4)
        length = struct.unpack(">I", length_data)[0]

        # Read message data
        data = await self._channel.recv_exactly(length)

        # Parse message
        return SFTPMessage.unpack(length_data + data)

    async def _wait_for_response(self, request_id: int, timeout: float = 60.0) -> Any:
        """Wait for response to specific request using dispatcher."""
        # Check if the response already arrived before this future was registered.
        # No yield point between the check and future registration, so this is race-free
        # in asyncio's single-threaded cooperative model.
        if request_id in self._buffered_responses:
            return self._buffered_responses.pop(request_id)

        fut = asyncio.get_running_loop().create_future()
        self._pending_requests[request_id] = fut
        try:
            return await asyncio.wait_for(fut, timeout=timeout)
        except asyncio.TimeoutError:
            self._pending_requests.pop(request_id, None)
            if not fut.done():
                fut.cancel()
            raise SFTPError(f"Timeout waiting for response to request {request_id}")

    async def _opendir(self, path: str) -> bytes:
        """
        Open directory and return handle.

        Args:
            path: Directory path to open

        Returns:
            Directory handle

        Raises:
            SFTPError: If directory open fails
        """
        request_id = self._get_next_request_id()

        # Send opendir request
        opendir_msg = SFTPOpenDirMessage(request_id=request_id, path=path)
        await self._send_message(opendir_msg)

        # Wait for response
        response = await self._wait_for_response(request_id)

        if isinstance(response, SFTPHandleMessage):
            return response.handle
        elif isinstance(response, SFTPStatusMessage):
            raise SFTPError(f"Opendir failed: {response.message}", response.status_code)
        else:
            raise SFTPError("Unexpected response to opendir request")

    async def _readdir(self, handle: bytes) -> list[Any]:
        """
        Read directory entries.

        Args:
            handle: Directory handle

        Returns:
            List of directory entries

        Raises:
            SFTPError: If readdir fails
        """
        request_id = self._get_next_request_id()

        # Send readdir request
        readdir_msg = SFTPReadDirMessage(request_id=request_id, handle=handle)
        await self._send_message(readdir_msg)

        # Wait for response
        response = await self._wait_for_response(request_id)

        if isinstance(response, SFTPNameMessage):
            return response.names
        elif isinstance(response, SFTPStatusMessage):
            if response.status_code == SSH_FX_EOF:
                return []  # End of directory
            else:
                raise SFTPError(
                    f"Readdir failed: {response.message}", response.status_code
                )
        else:
            raise SFTPError("Unexpected response to readdir request")

    async def _close(self, handle: bytes) -> None:
        """
        Close file or directory handle.

        Args:
            handle: Handle to close

        Raises:
            SFTPError: If close fails
        """
        request_id = self._get_next_request_id()

        # Send close request
        close_msg = SFTPCloseMessage(request_id=request_id, handle=handle)
        await self._send_message(close_msg)

        # Wait for response
        response = await self._wait_for_response(request_id)

        if isinstance(response, SFTPStatusMessage):
            if response.status_code != SSH_FX_OK:
                raise SFTPError(
                    f"Close failed: {response.message}", response.status_code
                )
        else:
            raise SFTPError("Unexpected response to close request")
Methods:
__aenter__() async

Async context manager entry.

Source code in spindlex/client/async_sftp_client.py
async def __aenter__(self) -> "AsyncSFTPClient":
    """Async context manager entry."""
    return self
__aexit__(exc_type, exc_val, exc_tb) async

Async context manager exit.

Source code in spindlex/client/async_sftp_client.py
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
    """Async context manager exit."""
    await self.close()
__init__(channel)

Initialize async SFTP client.

Parameters:

Name Type Description Default
channel Any

SSH channel for SFTP subsystem

required
Source code in spindlex/client/async_sftp_client.py
def __init__(self, channel: Any) -> None:
    """
    Initialize async SFTP client.

    Args:
        channel: SSH channel for SFTP subsystem
    """
    self._channel = channel
    self._request_id = 0
    self._pending_requests: dict[int, asyncio.Future] = {}
    self._buffered_responses: dict[int, Any] = {}
    self._initialized = False
    self._dispatch_task: Optional[asyncio.Task] = None
    self._lock = asyncio.Lock()
    self._logger = logging.getLogger(__name__)
chmod(path, mode) async

Change remote file permissions.

Source code in spindlex/client/async_sftp_client.py
async def chmod(self, path: str, mode: int) -> None:
    """Change remote file permissions."""
    try:
        attrs = SFTPAttributes()
        attrs.flags = SSH_FILEXFER_ATTR_PERMISSIONS
        attrs.permissions = mode
        request_id = self._get_next_request_id()
        setstat_msg = SFTPSetStatMessage(
            request_id=request_id, path=path, attrs=attrs
        )
        await self._send_message(setstat_msg)
        response = await self._wait_for_response(request_id)
        if isinstance(response, SFTPStatusMessage):
            if response.status_code != SSH_FX_OK:
                raise SFTPError(
                    f"Chmod failed: {response.message}", response.status_code
                )
        else:
            raise SFTPError("Unexpected response to setstat request")
    except Exception as e:
        if isinstance(e, SFTPError):
            raise
        raise SFTPError(f"Chmod failed: {e}") from e
close() async

Close SFTP client and cleanup resources.

Source code in spindlex/client/async_sftp_client.py
async def close(self) -> None:
    """Close SFTP client and cleanup resources."""
    if self._dispatch_task:
        self._dispatch_task.cancel()
        try:
            await self._dispatch_task
        except asyncio.CancelledError:
            pass
        self._dispatch_task = None

    if self._channel:
        await self._channel.close()
        self._channel = None

    self._initialized = False
    self._pending_requests.clear()
    self._buffered_responses.clear()
get(remotepath, localpath) async

Download file from remote server asynchronously.

Parameters:

Name Type Description Default
remotepath str

Remote file path

required
localpath str

Local file path

required

Raises:

Type Description
SFTPError

If download fails

Source code in spindlex/client/async_sftp_client.py
async def get(self, remotepath: str, localpath: str) -> None:
    """
    Download file from remote server asynchronously.

    Args:
        remotepath: Remote file path
        localpath: Local file path

    Raises:
        SFTPError: If download fails
    """
    _CHUNK = 32768
    _WINDOW = 64
    try:
        remote_file = await self.open(remotepath, "rb")
        try:
            loop = asyncio.get_running_loop()
            with open(localpath, "wb") as local_file:
                offset = 0
                inflight: list[asyncio.Future] = []
                done = False

                while not done or inflight:
                    # Fill pipeline up to window size
                    while not done and len(inflight) < _WINDOW:
                        req_id = self._get_next_request_id()
                        fut: asyncio.Future = loop.create_future()
                        self._pending_requests[req_id] = fut
                        msg = SFTPReadMessage(
                            request_id=req_id,
                            handle=remote_file._handle,
                            offset=offset,
                            length=_CHUNK,
                        )
                        await self._send_message(msg)
                        inflight.append(fut)
                        offset += _CHUNK

                    if not inflight:
                        break

                    # Drain oldest in-order
                    response = await inflight.pop(0)
                    if isinstance(response, SFTPDataMessage):
                        local_file.write(response.data)
                    elif isinstance(response, SFTPStatusMessage):
                        if response.status_code == SSH_FX_EOF:
                            done = True
                        else:
                            raise SFTPError(
                                f"Read failed: {response.message}",
                                response.status_code,
                            )
                    else:
                        raise SFTPError("Unexpected response to read request")
        finally:
            await remote_file.close()

    except Exception as e:
        if isinstance(e, SFTPError):
            raise
        raise SFTPError(f"File download failed: {e}") from e
get_recursive(remotepath, localpath) async

Download directory recursively and asynchronously.

Parameters:

Name Type Description Default
remotepath str

Remote directory path

required
localpath str

Local destination path

required
Source code in spindlex/client/async_sftp_client.py
async def get_recursive(self, remotepath: str, localpath: str) -> None:
    """
    Download directory recursively and asynchronously.

    Args:
        remotepath: Remote directory path
        localpath: Local destination path
    """
    import stat

    attrs = await self.stat(remotepath)
    if not stat.S_ISDIR(attrs.st_mode):
        await self.get(remotepath, localpath)
        return

    if not os.path.exists(localpath):
        os.makedirs(localpath)

    items = await self.listdir(remotepath)
    tasks = []
    for item in items:
        remote_item = (
            f"{remotepath}/{item}"
            if not remotepath.endswith("/")
            else f"{remotepath}{item}"
        )
        local_item = os.path.join(localpath, item)
        tasks.append(self.get_recursive(remote_item, local_item))

    if tasks:
        await asyncio.gather(*tasks)
listdir(path='.') async

List directory contents asynchronously.

Parameters:

Name Type Description Default
path str

Directory path to list

'.'

Returns:

Type Description
list[str]

List of filenames in directory

Raises:

Type Description
SFTPError

If listing fails

Source code in spindlex/client/async_sftp_client.py
async def listdir(self, path: str = ".") -> list[str]:
    """
    List directory contents asynchronously.

    Args:
        path: Directory path to list

    Returns:
        List of filenames in directory

    Raises:
        SFTPError: If listing fails
    """
    try:
        # Open directory handle
        handle = await self._opendir(path)

        filenames = []

        try:
            # Read directory entries
            while True:
                try:
                    entries = await self._readdir(handle)
                    if not entries:
                        break

                    for entry in entries:
                        filename = entry[0]
                        if filename not in (".", ".."):
                            filenames.append(filename)

                except SFTPError as e:
                    if e.status_code == SSH_FX_EOF:
                        break
                    raise
        finally:
            await self._close(handle)

        return filenames

    except Exception as e:
        if isinstance(e, SFTPError):
            raise
        raise SFTPError(f"Directory listing failed: {e}") from e
lstat(path) async

Get file/directory attributes (without following symlinks) asynchronously.

Parameters:

Name Type Description Default
path str

File or directory path

required

Returns:

Type Description
Any

File attributes

Raises:

Type Description
SFTPError

If lstat fails

Source code in spindlex/client/async_sftp_client.py
async def lstat(self, path: str) -> Any:
    """
    Get file/directory attributes (without following symlinks) asynchronously.

    Args:
        path: File or directory path

    Returns:
        File attributes

    Raises:
        SFTPError: If lstat fails
    """
    try:
        request_id = self._get_next_request_id()

        # Send lstat request
        lstat_msg = SFTPLStatMessage(request_id=request_id, path=path)
        await self._send_message(lstat_msg)

        # Wait for response
        response = await self._wait_for_response(request_id)

        if isinstance(response, SFTPAttrsMessage):
            return response.attrs
        elif isinstance(response, SFTPStatusMessage):
            raise SFTPError(
                f"Lstat failed: {response.message}", response.status_code
            )
        else:
            raise SFTPError("Unexpected response to lstat request")

    except Exception as e:
        if isinstance(e, SFTPError):
            raise
        raise SFTPError(f"Lstat operation failed: {e}") from e
mkdir(path, mode=493) async

Create directory asynchronously.

Parameters:

Name Type Description Default
path str

Directory path to create

required
mode int

Directory permissions

493

Raises:

Type Description
SFTPError

If mkdir fails

Source code in spindlex/client/async_sftp_client.py
async def mkdir(self, path: str, mode: int = 0o755) -> None:
    """
    Create directory asynchronously.

    Args:
        path: Directory path to create
        mode: Directory permissions

    Raises:
        SFTPError: If mkdir fails
    """
    try:
        request_id = self._get_next_request_id()

        # Create attributes with mode
        attrs = SFTPAttributes()
        attrs.st_mode = mode

        # Send mkdir request
        mkdir_msg = SFTPMkdirMessage(request_id=request_id, path=path, attrs=attrs)
        await self._send_message(mkdir_msg)

        # Wait for response
        response = await self._wait_for_response(request_id)

        if isinstance(response, SFTPStatusMessage):
            if response.status_code != SSH_FX_OK:
                raise SFTPError(
                    f"Mkdir failed: {response.message}", response.status_code
                )
        else:
            raise SFTPError("Unexpected response to mkdir request")

    except Exception as e:
        if isinstance(e, SFTPError):
            raise
        raise SFTPError(f"Directory creation failed: {e}") from e
normalize(path) async

Resolve remote path to its absolute canonical form.

Source code in spindlex/client/async_sftp_client.py
async def normalize(self, path: str) -> str:
    """Resolve remote path to its absolute canonical form."""
    try:
        request_id = self._get_next_request_id()
        realpath_msg = SFTPRealPathMessage(request_id=request_id, path=path)
        await self._send_message(realpath_msg)
        response = await self._wait_for_response(request_id)
        if isinstance(response, SFTPNameMessage):
            if response.names:
                return response.names[0][0]
            raise SFTPError("Empty response to realpath request")
        elif isinstance(response, SFTPStatusMessage):
            raise SFTPError(
                f"Normalize failed: {response.message}", response.status_code
            )
        else:
            raise SFTPError("Unexpected response to realpath request")
    except Exception as e:
        if isinstance(e, SFTPError):
            raise
        raise SFTPError(f"Normalize failed: {e}") from e
open(filename, mode='r') async

Open remote file asynchronously.

Parameters:

Name Type Description Default
filename str

Remote file path

required
mode str

File open mode

'r'

Returns:

Type Description
AsyncSFTPFile

Async SFTP file object

Raises:

Type Description
SFTPError

If file open fails

Source code in spindlex/client/async_sftp_client.py
async def open(self, filename: str, mode: str = "r") -> "AsyncSFTPFile":
    """
    Open remote file asynchronously.

    Args:
        filename: Remote file path
        mode: File open mode

    Returns:
        Async SFTP file object

    Raises:
        SFTPError: If file open fails
    """
    try:
        request_id = self._get_next_request_id()

        # Convert mode to SFTP flags
        flags = self._mode_to_flags(mode)

        # Send open request
        open_msg = SFTPOpenMessage(
            request_id=request_id,
            filename=filename,
            pflags=flags,
            attrs=SFTPAttributes(),
        )
        await self._send_message(open_msg)

        # Wait for response
        response = await self._wait_for_response(request_id)

        if isinstance(response, SFTPHandleMessage):
            return AsyncSFTPFile(self, response.handle, mode)
        elif isinstance(response, SFTPStatusMessage):
            raise SFTPError(
                f"File open failed: {response.message}", response.status_code
            )
        else:
            raise SFTPError("Unexpected response to open request")

    except Exception as e:
        if isinstance(e, SFTPError):
            raise
        raise SFTPError(f"File open failed: {e}") from e
put(localpath, remotepath) async

Upload file to remote server asynchronously.

Parameters:

Name Type Description Default
localpath str

Local file path

required
remotepath str

Remote file path

required

Raises:

Type Description
SFTPError

If upload fails

Source code in spindlex/client/async_sftp_client.py
async def put(self, localpath: str, remotepath: str) -> None:
    """
    Upload file to remote server asynchronously.

    Args:
        localpath: Local file path
        remotepath: Remote file path

    Raises:
        SFTPError: If upload fails
    """
    _CHUNK = 32768
    _WINDOW = 64
    try:
        remote_file = await self.open(remotepath, "wb")
        try:
            loop = asyncio.get_running_loop()
            with open(localpath, "rb") as local_file:
                offset = 0
                inflight: list[asyncio.Future] = []

                while True:
                    chunk = local_file.read(_CHUNK)
                    if not chunk:
                        break

                    req_id = self._get_next_request_id()
                    fut: asyncio.Future = loop.create_future()
                    self._pending_requests[req_id] = fut
                    msg = SFTPWriteMessage(
                        request_id=req_id,
                        handle=remote_file._handle,
                        offset=offset,
                        data=chunk,
                    )
                    await self._send_message(msg)
                    inflight.append(fut)
                    offset += len(chunk)

                    # Drain when window is full
                    while len(inflight) >= _WINDOW:
                        response = await inflight.pop(0)
                        if not isinstance(response, SFTPStatusMessage):
                            raise SFTPError("Unexpected response to write request")
                        if response.status_code != SSH_FX_OK:
                            raise SFTPError(
                                f"Write failed: {response.message}",
                                response.status_code,
                            )

                # Drain remaining inflight
                for fut in inflight:
                    response = await fut
                    if not isinstance(response, SFTPStatusMessage):
                        raise SFTPError("Unexpected response to write request")
                    if response.status_code != SSH_FX_OK:
                        raise SFTPError(
                            f"Write failed: {response.message}",
                            response.status_code,
                        )
        finally:
            await remote_file.close()

    except Exception as e:
        if isinstance(e, SFTPError):
            raise
        raise SFTPError(f"File upload failed: {e}") from e
put_recursive(localpath, remotepath) async

Upload directory recursively and asynchronously.

Parameters:

Name Type Description Default
localpath str

Local directory path

required
remotepath str

Remote destination path

required
Source code in spindlex/client/async_sftp_client.py
async def put_recursive(self, localpath: str, remotepath: str) -> None:
    """
    Upload directory recursively and asynchronously.

    Args:
        localpath: Local directory path
        remotepath: Remote destination path
    """
    if not os.path.isdir(localpath):
        await self.put(localpath, remotepath)
        return

    try:
        await self.mkdir(remotepath)
    except SFTPError as e:
        if e.sftp_code != SFTPError.SSH_FX_FAILURE:
            raise

    items = os.listdir(localpath)
    tasks = []
    for item in items:
        local_item = os.path.join(localpath, item)
        remote_item = (
            f"{remotepath}/{item}"
            if not remotepath.endswith("/")
            else f"{remotepath}{item}"
        )
        tasks.append(self.put_recursive(local_item, remote_item))

    if tasks:
        await asyncio.gather(*tasks)

Read symbolic link asynchronously.

Parameters:

Name Type Description Default
path str

Path to symbolic link

required

Returns:

Type Description
str

Target path of the link

Raises:

Type Description
SFTPError

If readlink fails

Source code in spindlex/client/async_sftp_client.py
async def readlink(self, path: str) -> str:
    """
    Read symbolic link asynchronously.

    Args:
        path: Path to symbolic link

    Returns:
        Target path of the link

    Raises:
        SFTPError: If readlink fails
    """
    try:
        request_id = self._get_next_request_id()
        readlink_msg = SFTPReadLinkMessage(request_id=request_id, path=path)
        await self._send_message(readlink_msg)
        response = await self._wait_for_response(request_id)
        if isinstance(response, SFTPNameMessage):
            if response.names:
                return response.names[0][0]
            raise SFTPError("Empty response to readlink request")
        elif isinstance(response, SFTPStatusMessage):
            raise SFTPError(
                f"Readlink failed: {response.message}", response.status_code
            )
        else:
            raise SFTPError("Unexpected response to readlink request")
    except Exception as e:
        if isinstance(e, SFTPError):
            raise
        raise SFTPError(f"Readlink failed: {e}") from e
remove(path) async

Remove remote file asynchronously.

Parameters:

Name Type Description Default
path str

Remote file path to remove

required

Raises:

Type Description
SFTPError

If removal fails

Source code in spindlex/client/async_sftp_client.py
async def remove(self, path: str) -> None:
    """
    Remove remote file asynchronously.

    Args:
        path: Remote file path to remove

    Raises:
        SFTPError: If removal fails
    """
    try:
        request_id = self._get_next_request_id()
        from ..protocol.sftp_messages import SFTPRemoveMessage

        # Send remove request
        remove_msg = SFTPRemoveMessage(request_id=request_id, filename=path)
        await self._send_message(remove_msg)

        # Wait for response
        response = await self._wait_for_response(request_id)

        if isinstance(response, SFTPStatusMessage):
            if response.status_code != SSH_FX_OK:
                raise SFTPError(
                    f"File removal failed: {response.message}", response.status_code
                )
        else:
            raise SFTPError("Unexpected response to remove request")

    except Exception as e:
        if isinstance(e, SFTPError):
            raise
        raise SFTPError(f"File removal failed: {e}") from e
rename(oldpath, newpath) async

Rename remote file or directory.

Source code in spindlex/client/async_sftp_client.py
async def rename(self, oldpath: str, newpath: str) -> None:
    """Rename remote file or directory."""
    try:
        request_id = self._get_next_request_id()
        rename_msg = SFTPRenameMessage(
            request_id=request_id, oldpath=oldpath, newpath=newpath
        )
        await self._send_message(rename_msg)
        response = await self._wait_for_response(request_id)
        if isinstance(response, SFTPStatusMessage):
            if response.status_code != SSH_FX_OK:
                raise SFTPError(
                    f"Rename failed: {response.message}", response.status_code
                )
        else:
            raise SFTPError("Unexpected response to rename request")
    except Exception as e:
        if isinstance(e, SFTPError):
            raise
        raise SFTPError(f"Rename failed: {e}") from e
rmdir(path) async

Remove directory asynchronously.

Parameters:

Name Type Description Default
path str

Directory path to remove

required

Raises:

Type Description
SFTPError

If rmdir fails

Source code in spindlex/client/async_sftp_client.py
async def rmdir(self, path: str) -> None:
    """
    Remove directory asynchronously.

    Args:
        path: Directory path to remove

    Raises:
        SFTPError: If rmdir fails
    """
    try:
        request_id = self._get_next_request_id()

        # Send rmdir request
        rmdir_msg = SFTPRmdirMessage(request_id=request_id, path=path)
        await self._send_message(rmdir_msg)

        # Wait for response
        response = await self._wait_for_response(request_id)

        if isinstance(response, SFTPStatusMessage):
            if response.status_code != SSH_FX_OK:
                raise SFTPError(
                    f"Rmdir failed: {response.message}", response.status_code
                )
        else:
            raise SFTPError("Unexpected response to rmdir request")

    except Exception as e:
        if isinstance(e, SFTPError):
            raise
        raise SFTPError(f"Directory removal failed: {e}") from e
stat(path) async

Get file/directory attributes asynchronously.

Parameters:

Name Type Description Default
path str

File or directory path

required

Returns:

Type Description
Any

File attributes

Raises:

Type Description
SFTPError

If stat fails

Source code in spindlex/client/async_sftp_client.py
async def stat(self, path: str) -> Any:
    """
    Get file/directory attributes asynchronously.

    Args:
        path: File or directory path

    Returns:
        File attributes

    Raises:
        SFTPError: If stat fails
    """
    try:
        request_id = self._get_next_request_id()

        # Send stat request
        stat_msg = SFTPStatMessage(request_id=request_id, path=path)
        await self._send_message(stat_msg)

        # Wait for response
        response = await self._wait_for_response(request_id)

        if isinstance(response, SFTPAttrsMessage):
            return response.attrs
        elif isinstance(response, SFTPStatusMessage):
            raise SFTPError(
                f"Stat failed: {response.message}", response.status_code
            )
        else:
            raise SFTPError("Unexpected response to stat request")

    except Exception as e:
        if isinstance(e, SFTPError):
            raise
        raise SFTPError(f"Stat operation failed: {e}") from e

Create symbolic link asynchronously.

Parameters:

Name Type Description Default
targetpath str

Target path for the link

required
linkpath str

Path where link should be created

required

Raises:

Type Description
SFTPError

If symlink fails

Source code in spindlex/client/async_sftp_client.py
async def symlink(self, targetpath: str, linkpath: str) -> None:
    """
    Create symbolic link asynchronously.

    Args:
        targetpath: Target path for the link
        linkpath: Path where link should be created

    Raises:
        SFTPError: If symlink fails
    """
    try:
        request_id = self._get_next_request_id()
        symlink_msg = SFTPSymlinkMessage(
            request_id=request_id, targetpath=targetpath, linkpath=linkpath
        )
        await self._send_message(symlink_msg)
        response = await self._wait_for_response(request_id)
        if isinstance(response, SFTPStatusMessage):
            if response.status_code != SSH_FX_OK:
                raise SFTPError(
                    f"Symlink failed: {response.message}", response.status_code
                )
        else:
            raise SFTPError("Unexpected response to symlink request")
    except Exception as e:
        if isinstance(e, SFTPError):
            raise
        raise SFTPError(f"Symlink failed: {e}") from e

AsyncSFTPFile

Async SFTP file object for remote file operations.

Source code in spindlex/client/async_sftp_client.py
class AsyncSFTPFile:
    """Async SFTP file object for remote file operations."""

    _PIPELINE_DEPTH = 64

    def __init__(self, client: AsyncSFTPClient, handle: bytes, mode: str) -> None:
        """
        Initialize async SFTP file.

        Args:
            client: SFTP client instance
            handle: File handle from server
            mode: File open mode
        """
        self._client = client
        self._handle = handle
        self._mode = mode
        self._offset = 0
        self._closed = False
        self._write_queue: list[tuple[int, int]] = []

    async def read(self, size: int = -1) -> bytes:
        """
        Read data from file asynchronously.

        Args:
            size: Number of bytes to read (-1 for all)

        Returns:
            Read data

        Raises:
            SFTPError: If read fails
        """
        if self._closed:
            raise SFTPError("File is closed")

        try:
            if size < 0:
                # Pipelined read until EOF
                _CHUNK = 32768
                result = bytearray()
                inflight: list[int] = []
                done = False
                offset = self._offset

                while not done or inflight:
                    # Fill pipeline
                    while not done and len(inflight) < self._PIPELINE_DEPTH:
                        req_id = self._client._get_next_request_id()
                        msg = SFTPReadMessage(
                            request_id=req_id,
                            handle=self._handle,
                            offset=offset,
                            length=_CHUNK,
                        )
                        await self._client._send_message(msg)
                        inflight.append(req_id)
                        offset += _CHUNK

                    if not inflight:
                        break

                    # Collect next in-order response
                    rid = inflight.pop(0)
                    response = await self._client._wait_for_response(rid)
                    if isinstance(response, SFTPDataMessage):
                        result.extend(response.data)
                    elif isinstance(response, SFTPStatusMessage):
                        if response.status_code == SSH_FX_EOF:
                            done = True
                        else:
                            raise SFTPError(
                                f"Read failed: {response.message}",
                                response.status_code,
                            )
                    else:
                        raise SFTPError("Unexpected response to read request")

                self._offset += len(result)
                return bytes(result)

            # Single read
            request_id = self._client._get_next_request_id()
            read_msg = SFTPReadMessage(
                request_id=request_id,
                handle=self._handle,
                offset=self._offset,
                length=size,
            )
            await self._client._send_message(read_msg)

            # Wait for response
            response = await self._client._wait_for_response(request_id)

            if isinstance(response, SFTPDataMessage):
                self._offset += len(response.data)
                return response.data
            elif isinstance(response, SFTPStatusMessage):
                if response.status_code == SSH_FX_EOF:
                    return b""
                raise SFTPError(
                    f"Read failed: {response.message}", response.status_code
                )
            else:
                raise SFTPError("Unexpected response to read request")

        except Exception as e:
            if isinstance(e, SFTPError):
                raise
            raise SFTPError(f"File read failed: {e}") from e

    async def write(self, data: bytes) -> None:
        """
        Write data to file asynchronously.

        Args:
            data: Data to write

        Raises:
            SFTPError: If write fails
        """
        if self._closed:
            raise SFTPError("File is closed")

        try:
            from ..protocol.sftp_constants import SFTP_MAX_PACKET_SIZE

            # Chunk large writes to fit within server limits.
            # 64 KB is generally safe for most servers.
            _MAX_CHUNK = SFTP_MAX_PACKET_SIZE - 1024
            offset = 0
            while offset < len(data):
                chunk = data[offset : offset + _MAX_CHUNK]
                request_id = self._client._get_next_request_id()
                # Calculate offset considering pipelined writes
                send_offset = self._offset + sum(n for _, n in self._write_queue)

                # Send write request
                write_msg = SFTPWriteMessage(
                    request_id=request_id,
                    handle=self._handle,
                    offset=send_offset,
                    data=chunk,
                )
                await self._client._send_message(write_msg)

                # Add to pipeline
                self._write_queue.append((request_id, len(chunk)))

                # Drain oldest if pipeline is full
                if len(self._write_queue) >= self._PIPELINE_DEPTH:
                    rid, nbytes = self._write_queue.pop(0)
                    response = await self._client._wait_for_response(rid)
                    if isinstance(response, SFTPStatusMessage):
                        if response.status_code == SSH_FX_OK:
                            self._offset += nbytes
                        else:
                            raise SFTPError(
                                f"Write failed: {response.message}",
                                response.status_code,
                            )
                    else:
                        raise SFTPError("Unexpected response to write request")

                offset += len(chunk)

        except Exception as e:
            if isinstance(e, SFTPError):
                raise
            raise SFTPError(f"File write failed: {e}") from e

    async def _flush_write_queue(self) -> None:
        """Drain all outstanding pipelined write ACKs."""
        for rid, nbytes in self._write_queue:
            response = await self._client._wait_for_response(rid)
            if isinstance(response, SFTPStatusMessage):
                if response.status_code == SSH_FX_OK:
                    self._offset += nbytes
                else:
                    raise SFTPError(
                        f"Write failed: {response.message}", response.status_code
                    )
            else:
                raise SFTPError("Unexpected response to write request")
        self._write_queue.clear()

    async def close(self) -> None:
        """Close file handle."""
        if not self._closed:
            try:
                # Flush pending writes first
                await self._flush_write_queue()

                request_id = self._client._get_next_request_id()

                # Send close request
                close_msg = SFTPCloseMessage(request_id=request_id, handle=self._handle)
                await self._client._send_message(close_msg)

                # Wait for response
                await self._client._wait_for_response(request_id)

            except Exception as e:
                self._client._logger.debug(
                    f"Ignore error during SFTP client close: {e}"
                )
            finally:
                self._closed = True

    async def __aenter__(self) -> "AsyncSFTPFile":
        """Async context manager entry."""
        return self

    async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        """Async context manager exit."""
        await self.close()
Methods:
__aenter__() async

Async context manager entry.

Source code in spindlex/client/async_sftp_client.py
async def __aenter__(self) -> "AsyncSFTPFile":
    """Async context manager entry."""
    return self
__aexit__(exc_type, exc_val, exc_tb) async

Async context manager exit.

Source code in spindlex/client/async_sftp_client.py
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
    """Async context manager exit."""
    await self.close()
__init__(client, handle, mode)

Initialize async SFTP file.

Parameters:

Name Type Description Default
client AsyncSFTPClient

SFTP client instance

required
handle bytes

File handle from server

required
mode str

File open mode

required
Source code in spindlex/client/async_sftp_client.py
def __init__(self, client: AsyncSFTPClient, handle: bytes, mode: str) -> None:
    """
    Initialize async SFTP file.

    Args:
        client: SFTP client instance
        handle: File handle from server
        mode: File open mode
    """
    self._client = client
    self._handle = handle
    self._mode = mode
    self._offset = 0
    self._closed = False
    self._write_queue: list[tuple[int, int]] = []
close() async

Close file handle.

Source code in spindlex/client/async_sftp_client.py
async def close(self) -> None:
    """Close file handle."""
    if not self._closed:
        try:
            # Flush pending writes first
            await self._flush_write_queue()

            request_id = self._client._get_next_request_id()

            # Send close request
            close_msg = SFTPCloseMessage(request_id=request_id, handle=self._handle)
            await self._client._send_message(close_msg)

            # Wait for response
            await self._client._wait_for_response(request_id)

        except Exception as e:
            self._client._logger.debug(
                f"Ignore error during SFTP client close: {e}"
            )
        finally:
            self._closed = True
read(size=-1) async

Read data from file asynchronously.

Parameters:

Name Type Description Default
size int

Number of bytes to read (-1 for all)

-1

Returns:

Type Description
bytes

Read data

Raises:

Type Description
SFTPError

If read fails

Source code in spindlex/client/async_sftp_client.py
async def read(self, size: int = -1) -> bytes:
    """
    Read data from file asynchronously.

    Args:
        size: Number of bytes to read (-1 for all)

    Returns:
        Read data

    Raises:
        SFTPError: If read fails
    """
    if self._closed:
        raise SFTPError("File is closed")

    try:
        if size < 0:
            # Pipelined read until EOF
            _CHUNK = 32768
            result = bytearray()
            inflight: list[int] = []
            done = False
            offset = self._offset

            while not done or inflight:
                # Fill pipeline
                while not done and len(inflight) < self._PIPELINE_DEPTH:
                    req_id = self._client._get_next_request_id()
                    msg = SFTPReadMessage(
                        request_id=req_id,
                        handle=self._handle,
                        offset=offset,
                        length=_CHUNK,
                    )
                    await self._client._send_message(msg)
                    inflight.append(req_id)
                    offset += _CHUNK

                if not inflight:
                    break

                # Collect next in-order response
                rid = inflight.pop(0)
                response = await self._client._wait_for_response(rid)
                if isinstance(response, SFTPDataMessage):
                    result.extend(response.data)
                elif isinstance(response, SFTPStatusMessage):
                    if response.status_code == SSH_FX_EOF:
                        done = True
                    else:
                        raise SFTPError(
                            f"Read failed: {response.message}",
                            response.status_code,
                        )
                else:
                    raise SFTPError("Unexpected response to read request")

            self._offset += len(result)
            return bytes(result)

        # Single read
        request_id = self._client._get_next_request_id()
        read_msg = SFTPReadMessage(
            request_id=request_id,
            handle=self._handle,
            offset=self._offset,
            length=size,
        )
        await self._client._send_message(read_msg)

        # Wait for response
        response = await self._client._wait_for_response(request_id)

        if isinstance(response, SFTPDataMessage):
            self._offset += len(response.data)
            return response.data
        elif isinstance(response, SFTPStatusMessage):
            if response.status_code == SSH_FX_EOF:
                return b""
            raise SFTPError(
                f"Read failed: {response.message}", response.status_code
            )
        else:
            raise SFTPError("Unexpected response to read request")

    except Exception as e:
        if isinstance(e, SFTPError):
            raise
        raise SFTPError(f"File read failed: {e}") from e
write(data) async

Write data to file asynchronously.

Parameters:

Name Type Description Default
data bytes

Data to write

required

Raises:

Type Description
SFTPError

If write fails

Source code in spindlex/client/async_sftp_client.py
async def write(self, data: bytes) -> None:
    """
    Write data to file asynchronously.

    Args:
        data: Data to write

    Raises:
        SFTPError: If write fails
    """
    if self._closed:
        raise SFTPError("File is closed")

    try:
        from ..protocol.sftp_constants import SFTP_MAX_PACKET_SIZE

        # Chunk large writes to fit within server limits.
        # 64 KB is generally safe for most servers.
        _MAX_CHUNK = SFTP_MAX_PACKET_SIZE - 1024
        offset = 0
        while offset < len(data):
            chunk = data[offset : offset + _MAX_CHUNK]
            request_id = self._client._get_next_request_id()
            # Calculate offset considering pipelined writes
            send_offset = self._offset + sum(n for _, n in self._write_queue)

            # Send write request
            write_msg = SFTPWriteMessage(
                request_id=request_id,
                handle=self._handle,
                offset=send_offset,
                data=chunk,
            )
            await self._client._send_message(write_msg)

            # Add to pipeline
            self._write_queue.append((request_id, len(chunk)))

            # Drain oldest if pipeline is full
            if len(self._write_queue) >= self._PIPELINE_DEPTH:
                rid, nbytes = self._write_queue.pop(0)
                response = await self._client._wait_for_response(rid)
                if isinstance(response, SFTPStatusMessage):
                    if response.status_code == SSH_FX_OK:
                        self._offset += nbytes
                    else:
                        raise SFTPError(
                            f"Write failed: {response.message}",
                            response.status_code,
                        )
                else:
                    raise SFTPError("Unexpected response to write request")

            offset += len(chunk)

    except Exception as e:
        if isinstance(e, SFTPError):
            raise
        raise SFTPError(f"File write failed: {e}") from e