【问题标题】:Python Multiprocessing.Pool workers hang when using pool.mapPython Multiprocessing.Pool 工作人员在使用 pool.map 时挂起
【发布时间】:2015-07-25 04:13:38
【问题描述】:

所以我有一个脚本可以连接到大约 700 个设备并执行一系列命令,然后退出。我开始使用 Multiprocessing.Pool 和 Pool.map 来减少脚本的运行时间,并允许我同时登录多个设备。

现在,我遇到了一些奇怪的问题,池中的工人会无限期地挂起,我无法弄清楚他们为什么这样做。此外,我对 Python 还有些陌生,因此对我的方法的任何反馈表示赞赏。以下是我的代码:

def expect(ssh_channel_reference, pattern, command, wait, debug):
    timeout_counter = 0
    full_buffer = ''
    temp_buffer = ''
    while timeout_counter < wait:
        temp_buffer = ssh_channel_reference.recv(8192)
        # print temp_buffer
        if pattern in temp_buffer:
            ssh_channel_reference.send('%s\r' % command)
            timeout_counter = wait + 10
            full_buffer += temp_buffer
        else:
            time.sleep(0.01)
            timeout_counter += 0.01
            full_buffer += temp_buffer
        if debug:
            return full_buffer
    if timeout_counter == wait:
        raise Exception()


def backup_device(device, user, user_pass, failures):
    attempt_counter = 0
    if re.search('(^\d+\.\d+\.\d+\.\d+$)', device):
        host_name = socket.gethostbyaddr(device)
    else:
        host_name = device
    logging.info("Connecting to: %s" % host_name)
    command = 'copy config to backup'
    while attempt_counter <= 5:
        try:
            output_buffer = ''
            ssh_handle = paramiko.SSHClient()
            ssh_handle.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            ssh_handle.connect(hostname=host_name, username=user, password=user_pass, timeout=20, look_for_keys=False)
            ssh_channel = ssh_handle.invoke_shell()
            output_buffer += expect(channel=ssh_channel, pattern='#', command='%s\r' % command, wait=10, debug=True)
            output_buffer += expect(channel=ssh_channel, pattern=']?', command='\n', wait=5, debug=True)
            output_buffer += expect(channel=ssh_channel, pattern=']?', command='\n', wait=5, debug=True)
            output_buffer += expect(channel=ssh_channel, pattern='#', command='exit', wait=5, debug=True)
            print output_buffer
            ssh_handle.close()
        except Exception as inst:
            logging.debug(inst.message)
            attempt_counter += 1
        else:
            logging.info('%s - backed up successfully.' % host_name)
    if attempt_counter == 5:
        logging.critical("Unable to log into device - %s" % host_name)
        failures.append(host_name)
    return

if __name__ == "__main__":
    logging.basicConfig(filename='herpderp.log', filemode='w', level=logging.DEBUG,
                        format='%(asctime)s - %(levelname)s:%(message)s')
    partial_method = partial(backup_device, user=username, user_pass=password, failures=failed_devices)
    pool = multiprocessing.Pool(processes=4, maxtasksperchild=1)
    pool_map = pool.map(func=partial_method, iterable=devices, chunksize=4)
    pool.close()
    pool.join()

多处理日志的输出:

Starting MainProcess
[DEBUG/MainProcess] created semlock with handle 47379818131456
[DEBUG/MainProcess] created semlock with handle 47379818135552
[DEBUG/MainProcess] created semlock with handle 47379818139648
[DEBUG/MainProcess] created semlock with handle 47379818143744
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-1] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-2] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-3] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-4] child process calling self.run()

截断的日志输出:

2015-05-13 16:04:11,748 - INFO:Connecting to: HOSTNAME
2015-05-13 16:04:11,760 - DEBUG:starting thread (client mode): 0x1ebaf350L
2015-05-13 16:04:11,769 - INFO:Connected (version 1.99, client Cisco-1.25)
2015-05-13 16:04:11,770 - DEBUG:kex algos:[u'diffie-hellman-group1-sha1'] server key:[u'ssh-rsa'] client encrypt:[u'aes128-cbc', u'3des-cbc', u'aes192-cbc', u'aes256-cbc'] server encrypt:[u'aes128-cbc', u'3des-cbc', u'aes192-cbc', u'aes256-cbc'] client mac:[u'hmac-sha1', u'hmac-sha1-96', u'hmac-md5', u'hmac-md5-96'] server mac:[u'hmac-sha1', u'hmac-sha1-96', u'hmac-md5', u'hmac-md5-96'] client compress:[u'none'] server compress:[u'none'] client lang:[u''] server lang:[u''] kex follows?False
2015-05-13 16:04:11,770 - DEBUG:Ciphers agreed: local=aes128-cbc, remote=aes128-cbc
2015-05-13 16:04:11,770 - DEBUG:using kex diffie-hellman-group1-sha1; server key type ssh-rsa; cipher: local aes128-cbc, remote aes128-cbc; mac: local hmac-sha1, remote hmac-sha1; compression: local none, remote none
2015-05-13 16:04:12,038 - DEBUG:Switch to new keys ...
2015-05-13 16:04:12,064 - DEBUG:Adding ssh-rsa host key for HOSTNAME: KEY
2015-05-13 16:04:12,257 - DEBUG:userauth is OK
2015-05-13 16:04:12,258 - INFO:Auth banner: <INSERT BANNER HERE>
2015-05-13 16:04:12,314 - INFO:Authentication (password) successful!
2015-05-13 16:04:12,330 - DEBUG:[chan 0] Max packet in: 32768 bytes
2015-05-13 16:04:12,341 - DEBUG:[chan 0] Max packet out: 4096 bytes
2015-05-13 16:04:12,341 - DEBUG:Secsh channel 0 opened.
2015-05-13 16:04:12,356 - DEBUG:[chan 0] Sesch channel 0 request ok
2015-05-13 16:04:12,365 - DEBUG:[chan 0] Sesch channel 0 request ok

上面的日志是我在池挂起之前看到的。

【问题讨论】:

  • 我也可以添加日志输出,但我需要一些时间,因为我需要在发布之前对其进行清理。
  • 您可能想要引发该异常而不是返回它(在期望函数结束时)
  • 改了,池子还是挂了。
  • 你能告诉它挂在代码的什么地方吗? ssh_channel_reference.recv(8192) 会永远阻塞吗?
  • 我认为你成功了。我自己和一位同事认为 channel.recv() 默认情况下没有阻塞,显然你必须手动设置,或者指定超时。我现在正在等待脚本完成。我会尽快更新。

标签: python multiprocessing paramiko


【解决方案1】:

所以问题是 Channel.recv(Nbytes) 是一个阻塞调用,导致线程挂起。我添加了 channel.settimeout(float) 并解决了这个问题,唯一需要注意的是我需要捕获由超时引起的 socket.timeout 异常。

【讨论】:

    猜你喜欢
    • 2021-08-17
    • 1970-01-01
    • 1970-01-01
    • 2013-06-02
    • 2011-11-22
    • 1970-01-01
    • 1970-01-01
    • 2013-12-02
    • 1970-01-01
    相关资源
    最近更新 更多