【问题标题】:multiprocess module with paramiko带有 paramiko 的多进程模块
【发布时间】:2011-06-22 21:06:10
【问题描述】:

我正在尝试使用 paramiko python 模块 (1.7.7.1) 将命令和/或 xfer 文件并行执行到一组远程服务器。一项任务如下所示:

jobs = []   
for obj in appObjs:
    if obj.stop_app:
        p = multiprocessing.Process(target=exec_cmd, args=(obj, obj.stop_cmd))
        jobs.append(p)
        print "Starting job %s" % (p)
        p.start()

"obj" 包含 paramiko SSHClient、传输和 SFTPClient 等内容。 appObjs 列表包含大约 25 个这样的对象,因此有 25 个连接到 25 个不同的服务器。

我在回溯中看到 paramiko 的 transport.py 出现以下错误

raise AssertionError("PID check failed. RNG must be re-initialized after fork(). 
Hint:   Try Random.atfork()")

我根据https://github.com/newsapps/beeswithmachineguns/issues/17 的帖子修补了 /usr/lib/python2.6/site-packages/paramiko/transport.py,但似乎没有帮助。我已经验证了上面提到的路径中的 transport.py 是正在使用的。 paramiko 邮件列表似乎消失了。

这看起来像是 paramiko 中的问题还是我误解/误用了多处理模块?有人愿意建议一个实用的解决方法吗?非常感谢,

【问题讨论】:

    标签: python ssh paramiko


    【解决方案1】:

    更新:正如@ento 所说,分叉的 ssh 包是 merged back into paramiko,所以下面的答案现在无关紧要,您现在应该再次使用 Paramiko。

    这是 Paramiko 中的一个已知问题,已在 Paramiko 的一个分支(停滞在 1.7.7.1 版)中修复,现在称为ssh package on pypi(在撰写本文时,它会将事情带到 1.7.11 版)。

    显然,在主线 Paramiko 中获取一些重要补丁时存在问题,并且维护者没有响应,因此 Fabric 的维护者 @bitprophet 将 Paramiko 分叉为新的包名称 ssh package on pypi。你可以看到你提到的具体问题是discussed here,这也是他决定分叉的原因之一;如果你真的想,你可以阅读gory details

    【讨论】:

    • +1 主要是因为我不知道 Paramiko 已经被@bitprophet 的 ssh 包取代了
    • pypi 上的 ssh 包(使用版本 1.7.11、1.7.13 测试)给出了问题中提到的相同错误,至少对于 Python 2.7.1。
    • 从 paramiko v1.8.0 开始的 pypi has been merged back upstream 上的 ssh 包
    • 感谢@ento!我已经更新了我的答案以反映它被合并回 Paramiko。
    【解决方案2】:

    正如Paramiko issue 中的一条评论所提到的,RNG 错误可以通过为每个进程打开一个单独的 ssh 句柄来避免,然后 paramiko 将不再抱怨。 这个示例脚本演示了这一点(我使用的是池而不是进程):

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    import ssh
    from multiprocessing import Pool
    import getpass
    
    hostnames = [HOST1, HOST2]
    user = USERNAME
    pw = getpass.getpass("Enter ssh password:")
    
    def processFunc(hostname):
        handle = ssh.SSHClient()
        handle.set_missing_host_key_policy(ssh.AutoAddPolicy())
        handle.connect(hostname, username=user, password=pw)
        print("child")
        stdin, stdout, stderr = handle.exec_command("ls -l /var/log; sleep 5")
        cmdOutput = ""
        while True:
            try:
                cmdOutput += stdout.next()
            except StopIteration:
                break
        print("Got output from host %s:%s" % (hostname, cmdOutput))
        handle.close()
    
    pool = Pool(len(hostnames))
    pool.map(processFunc, hostnames, 1)
    pool.close()
    pool.join()
    
    ## If you want to compare speed:
    # for hostname in hostnames:
    #     processFunc(hostname)
    

    【讨论】: