【问题标题】:Copying one file to multiple remote hosts in parallel over SFTP通过 SFTP 将一个文件并行复制到多个远程主机
【发布时间】:2015-10-24 04:45:20
【问题描述】:

我想使用 Python 将本地文件并行复制到多个远程主机。我正在尝试使用 asyncio 和 Paramiko 来做到这一点,因为我已经在我的程序中将这些库用于其他目的。

我使用BaseEventLoop.run_in_executor() 和默认ThreadPoolExecutor,它实际上是旧threading 库的新接口,以及Paramiko 的SFTP 功能来进行复制。

这是一个简单的例子。

import sys
import asyncio
import paramiko
import functools


def copy_file_node(
        *,
        user: str,
        host: str,
        identity_file: str,
        local_path: str,
        remote_path: str):
    ssh_client = paramiko.client.SSHClient()
    ssh_client.load_system_host_keys()
    ssh_client.set_missing_host_key_policy(paramiko.client.AutoAddPolicy())

    ssh_client.connect(
        username=user,
        hostname=host,
        key_filename=identity_file,
        timeout=3)

    with ssh_client:
        with ssh_client.open_sftp() as sftp:
            print("[{h}] Copying file...".format(h=host))
            sftp.put(localpath=local_path, remotepath=remote_path)
            print("[{h}] Copy complete.".format(h=host))


loop = asyncio.get_event_loop()

tasks = []

# NOTE: You'll have to update the values being passed in to
#      `functools.partial(copy_file_node, ...)`
#       to get this working on on your machine.
for host in ['10.0.0.1', '10.0.0.2']:
    task = loop.run_in_executor(
        None,
        functools.partial(
            copy_file_node,
            user='user',
            host=host,
            identity_file='/path/to/identity_file',
            local_path='/path/to/local/file',
            remote_path='/path/to/remote/file'))
    tasks.append(task)

try:
    loop.run_until_complete(asyncio.gather(*tasks))
except Exception as e:
    print("At least one node raised an error:", e, file=sys.stderr)
    sys.exit(1)

loop.close()

我看到的问题是文件被串行复制到主机而不是并行。因此,如果单个主机复制需要 5 秒,两台主机需要 10 秒,以此类推。

我尝试了各种其他方法,包括放弃 SFTP 并将文件通过exec_command() 传送到每个远程主机上的dd,但副本总是连续发生。

我可能在这里误解了一些基本概念。是什么阻止了不同线程并行复制文件?

根据我的测试,似乎阻塞发生在远程写入时,而不是读取本地文件时。但为什么会这样,因为我们正在尝试针对独立远程主机的网络 I/O?

【问题讨论】:

  • 可能paramiko 在内部使用了一些锁。你试过ProcessPoolExecutor吗?
  • 我用一些虚拟代码替换了copy_file_node(),它运行良好,所以我认为是paramiko 阻止了并发。如果是这种情况,ProcessPoolExecutor 应该可以解决问题。您可以发布您的代码的ProcessPoolExecutor 版本吗?
  • @NickChammas 你确定网络带宽不是瓶颈吗?
  • @NickChammas 尝试同时通过 scp 手动将该文件复制到两台主机,看看需要多长时间。
  • @alexanderlukanin13 - 实际上,也许您对带宽的看法是正确的。如果尝试 2 个单独的 scp 进程,一个总是在 ~23 秒内完成,而另一个需要 ~38 秒。哇。所以除了我对自己环境的假设之外,也许没有什么错...... :)

标签: python python-3.x sftp paramiko python-asyncio


【解决方案1】:

您对 asyncio 的使用没有任何问题。

为了证明这一点,让我们尝试一个简化版本的脚本 - 不是 paramiko,只是纯 Python。

import asyncio, functools, sys, time

START_TIME = time.monotonic()

def log(msg):
    print('{:>7.3f} {}'.format(time.monotonic() - START_TIME, msg))

def dummy(thread_id):
    log('Thread {} started'.format(thread_id))
    time.sleep(1)
    log('Thread {} finished'.format(thread_id))

loop = asyncio.get_event_loop()
tasks = []
for i in range(0, int(sys.argv[1])):
    task = loop.run_in_executor(None, functools.partial(dummy, thread_id=i))
    tasks.append(task)
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

有两个线程,这将打印:

$ python3 async.py 2
  0.001 Thread 0 started
  0.002 Thread 1 started       <-- 2 tasks are executed concurrently
  1.003 Thread 0 finished
  1.003 Thread 1 finished      <-- Total time is 1 second

此并发最多可扩展到 5 个线程:

$ python3 async.py 5
  0.001 Thread 0 started
  ...
  0.003 Thread 4 started       <-- 5 tasks are executed concurrently
  1.002 Thread 0 finished
  ...
  1.005 Thread 4 finished      <-- Total time is still 1 second

如果我们再添加一个线程,就会达到线程池限制:

$ python3 async.py 6
  0.001 Thread 0 started
  0.001 Thread 1 started
  0.002 Thread 2 started
  0.003 Thread 3 started
  0.003 Thread 4 started       <-- 5 tasks are executed concurrently
  1.002 Thread 0 finished
  1.003 Thread 5 started       <-- 6th task is executed after 1 second
  1.003 Thread 1 finished
  1.004 Thread 2 finished
  1.004 Thread 3 finished
  1.004 Thread 4 finished      <-- 5 task are completed after 1 second
  2.005 Thread 5 finished      <-- 6th task is completed after 2 seconds

一切都按预期进行,每 5 个项目的总时间增加 1 秒。幻数 5 记录在 ThreadPoolExecutor 文档中:

在 3.5 版更改:如果 max_workersNone 或未给出,则默认为机器上的处理器数量,乘以 5,假设 ThreadPoolExecutor 经常用于重叠 I/O 而不是 CPU 工作,并且工作人员的数量应该高于 ProcessPoolExecutor 的工作人员数量。

第三方库如何阻止我的 ThreadPoolExecutor?

  • 库使用某种全局锁。这意味着该库不支持多线程。尝试使用 ProcessPoolExecutor,但要小心:库可能包含其他反模式,例如使用相同的硬编码临时文件名。

  • 函数执行了很长时间并且没有释放 GIL。它可能表明 C 扩展代码中存在错误,但持有 GIL 的最常见原因是进行一些 CPU 密集型计算。同样,您可以尝试 ProcessPoolExecutor,因为它不受 GIL 的影响。

预计这些都不会发生在 paramiko 之类的库中。

第三方库如何阻止我的 ProcessPoolExecutor?

通常不能。您的任务在不同的进程中执行。如果您看到 ProcessPoolExecutor 中的两个任务花费两倍的时间,则怀疑存在资源瓶颈(例如消耗 100% 的网络带宽)。

【讨论】:

  • 而且,正如您帮助我在有关问题的 cmets 中看到的那样,我看到的明显串行上传背后的原因只是我的上传带宽!
【解决方案2】:

我不确定这是最好的方法,但它对我有用

#start
from multiprocessing import Process

#omitted

tasks = []
for host in hosts:
    p = Process(
        None,
        functools.partial(
          copy_file_node,
          user=user,
          host=host,
          identity_file=identity_file,
          local_path=local_path,
          remote_path=remote_path))

    tasks.append(p)

[t.start() for t in tasks]
[t.join() for t in tasks]

根据评论,添加日期戳并捕获多处理的输出并得到以下信息:

2015-10-24 03:06:08.749683[vagrant1] Copying file...
2015-10-24 03:06:08.751826[basement] Copying file...
2015-10-24 03:06:08.757040[upstairs] Copying file...
2015-10-24 03:06:16.222416[vagrant1] Copy complete.
2015-10-24 03:06:18.094373[upstairs] Copy complete.
2015-10-24 03:06:22.478711[basement] Copy complete.

【讨论】:

  • 我会试试这个。但是,这不应该在功能上等同于将ProcessPoolExecutorasynciolike I did here 一起使用吗?
  • 我会这么认为,因为相同的基本 api,但没有通过两者的全部来源,我会冒“他们实现的东西略有不同”之类的风险另外,因为您使用的是期货,它引出了一个问题,即您使用的是什么 exact 版本的 python 和各种模块。该问题被标记为 3.x,但期货建议您使用带有 3.x 反向端口的 2.x,或早期的 3.x,或其他一些旧模块。可以想象,您正在遇到模块版本之间的奇怪交互/后端中未处理的边缘情况。我使用了 python 3.4.3 和最近的一切。
  • 我正在使用 Python 3.5.0 和 Paramiko 1.15.3。 concurrent.futuresasyncio 上的 3.5 docs 在解释可以向 run_in_executor() 提供哪些执行程序时参考的内容。无论如何,让我试一试,看看是否有区别。当您说它对您有用时,顺便说一句,您是否将足够大的文件复制到 2 个远程主机以注意它们是串行上传还是并行上传?
  • 尝试在输出中添加日期戳或分析线程。由于线程的东西返回的方式,它不一定按照跨多个线程创建的顺序打印输出。您实际上可能正在同时传输而不知道它!
  • 如果它的带宽,因为你是遥控器在 ec2 中,你总是可以将它复制到其中一个 ec2 主机并从那里扇出到其他 ec2 主机,或者将文件发送到 s3 和从那里将数据拉入主机。
猜你喜欢
  • 2017-12-24
  • 1970-01-01
  • 2016-05-22
  • 2014-12-03
  • 1970-01-01
  • 1970-01-01
  • 2014-08-06
  • 2017-05-02
  • 1970-01-01
相关资源
最近更新 更多