【问题标题】:Paramiko hangs while executing a large wget commandParamiko 在执行大型 wget 命令时挂起
【发布时间】:2025-06-17 07:30:02
【问题描述】:

您好,我在执行通过 Ubuntu 10 服务器执行 100mb 文件的 wget 的命令时遇到问题。除此以外,较短的命令可以正常工作。下面的课程包含我如何使用 paramiko 以及我克服这个问题的不同尝试(参见不同的 run 或 exec 方法)。在 exec_cmd 的情况下,执行挂在这一行:

        out = self.in_buffer.read(nbytes, self.timeout)

来自 paramiko 的 channel.py 模块的 recv 方法。

相同的 wget 命令在使用来自 Mac 的普通 ssh 实用程序的 shell 中完美运行。

"""
Management of SSH connections
"""

import logging
import os
import paramiko
import socket
import time
import StringIO


class SSHClient():
    def __init__(self):
        self._ssh_client = paramiko.SSHClient()
        self._ssh_client.load_system_host_keys()
        self._ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self.time_out = 300
        self.wait = 5

    def connect(self, hostname, user, pkey):
        retry = self.time_out
        self.hostname = hostname
        logging.info("connecting to:%s user:%s key:%s" % (hostname, user, pkey))
        while retry > 0:
            try:
                self._ssh_client.connect(hostname,
                                         username=user,
                                         key_filename=os.path.expanduser(pkey),
                                         timeout=self.time_out)
                return
            except socket.error, (value,message):
                if value == 61 or value == 111:
                    logging.warning('SSH Connection refused, will retry in 5 seconds')
                    time.sleep(self.wait)
                    retry -= self.wait
                else:
                    raise
            except paramiko.BadHostKeyException:
                logging.warning("%s has an entry in ~/.ssh/known_hosts and it doesn't match" % self.server.hostname)
                logging.warning('Edit that file to remove the entry and then try again')
                retry = 0
            except EOFError:
                logging.warning('Unexpected Error from SSH Connection, retry in 5 seconds')
                time.sleep(self.wait)
                retry -= self.wait
        logging.error('Could not establish SSH connection')

    def exists(self, path):
        status = self.run('[ -a %s ] || echo "FALSE"' % path)
        if status[1].startswith('FALSE'):
            return 0
        return 1

    def shell(self):
        """
        Start an interactive shell session on the remote host.
        """
        channel = self._ssh_client.invoke_shell()
        interactive_shell(channel)

    def run(self, command):
        """
        Execute a command on the remote host.  Return a tuple containing
        an integer status and a string containing all output from the command.
        """
        logging.info('running:%s on %s' % (command, self.hostname))
        log_fp = StringIO.StringIO()
        status = 0
        try:
            t = self._ssh_client.exec_command(command)
        except paramiko.SSHException:
            logging.error("Error executing command: " + command)
            status = 1
        log_fp.write(t[1].read())
        log_fp.write(t[2].read())
        t[0].close()
        t[1].close()
        t[2].close()
        logging.info('output: %s' % log_fp.getvalue())
        return (status, log_fp.getvalue())

    def run_pty(self, command):
        """
        Execute a command on the remote host with a pseudo-terminal.
        Returns a string containing the output of the command.
        """
        logging.info('running:%s on %s' % (command, self.hostname))
        channel = self._ssh_client.get_transport().open_session()
        channel.get_pty()
        status = 0
        try:
            channel.exec_command(command)
        except:
            logging.error("Error executing command: " + command)
            status = 1
        return status, channel.recv(1024)

    def close(self):
        transport = self._ssh_client.get_transport()
        transport.close()

    def run_remote(self, cmd, check_exit_status=True, verbose=True, use_sudo=False):
        logging.info('running:%s on %s' % (cmd, self.hostname))
        ssh = self._ssh_client
        chan = ssh.get_transport().open_session()
        stdin = chan.makefile('wb')
        stdout = chan.makefile('rb')
        stderr = chan.makefile_stderr('rb')
        processed_cmd = cmd
        if use_sudo:
            processed_cmd = 'sudo -S bash -c "%s"' % cmd.replace('"', '\\"')
        chan.exec_command(processed_cmd)
        result = {
            'stdout': [],
            'stderr': [],
        }
        exit_status = chan.recv_exit_status()
        result['exit_status'] = exit_status

        def print_output():
            for line in stdout:
                result['stdout'].append(line)
                logging.info(line)
            for line in stderr:
                result['stderr'].append(line)
                logging.info(line)
        if verbose:
            print processed_cmd
            print_output()
        return exit_status,result 

    def exec_cmd(self, cmd):
        import select
        ssh = self._ssh_client
        channel = ssh.get_transport().open_session()
        END = "CMD_EPILOGqwkjidksjk58754dskhjdksjKDSL"
        cmd += ";echo " + END
        logging.info('running:%s on %s' % (cmd, self.hostname))
        channel.exec_command(cmd)
        out = ""
        buf = ""
        while END not in buf:
          rl, wl, xl = select.select([channel],[],[],0.0)
          if len(rl) > 0:
              # Must be stdout
              buf = channel.recv(1024)
              logging.info(buf)
              out += buf
        return 0, out

【问题讨论】:

    标签: python linux ssh paramiko


    【解决方案1】:

    我遇到了同样的问题,当我在远程 ssh 客户端上运行的 shell 脚本对 400Mb 文件执行 wget 命令时,我的 python 脚本挂起。

    我发现向 wget 命令添加超时可以解决问题。 最初我有:

    wgethttp://blah:8888/file.zip

    现在有了这个:

    wget -q -T90 http://blah:8888/file.zip

    它就像一个魅力!

    希望对你有帮助。

    【讨论】:

      【解决方案2】:
      1. 在这种情况下,我会先添加列表,然后再进行连接。为什么?好吧,字符串在 Python 中是不可变的。这意味着每次您使用+= 时,您基本上都是在创建两个新字符串并读取第三个字符串。另一方面,如果您创建一个列表并附加它,则创建的字符串数量会减半。
      2. 您真的需要多次调用 select 吗?我的理解是,您并不真正关心进程是否是线程阻塞的。由于 select 或多或少是同名 C 方法的包装器:

        select() 和 pselect() 允许程序监视多个文件描述符,等待一个或多个文件描述符“准备好”用于某种 I/O 操作(例如,可能的输入)。文件描述符是 con- 如果可以无阻塞地执行相应的 I/O 操作(例如 read(2)),则准备就绪。

      3. 您没有在代码中侦听 socket.timeout 异常。
      4. 写入标准输出/文件系统可能很昂贵,但您正在记录recv 返回的每一行。你能移动日志行吗?
      5. 您是否考虑过手动处理读取频道?从技术上讲,您唯一需要的代码是:
      try:
          out = self.in_buffer.read(nbytes, self.timeout)
      except PipeTimeout, e:
          # do something with error
      

      不能保证,但会减少额外的处理。

      【讨论】: