【问题标题】:How to do asyncronous file transfers over sftp?如何通过 sftp 进行异步文件传输?
【发布时间】:2020-10-24 18:05:35
【问题描述】:

我要解决的问题如下: 我有一台包含大量数据(约 5 TB)的台式计算机,我想对其进行分析。数据由 500k 个文件组成,每个文件都可以单独分析。 对于分析,我在大学有一系列可用的服务器,但是,服务器没有空间存储所有这些数据,也没有空间存储分析的输出。

所以我的想法是将数据分段复制到服务器,运行分析,将结果传输回桌面,删除服务器上的输入和输出数据,然后重复。

对于文件传输,我昨天安装了 paramiko,它似乎工作得很好:

remote_get = 'test'
local_deliver = './test'

ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.load_host_keys(os.path.expanduser(os.path.join("~", ".ssh", "known_hosts")))
ssh.connect(server, username=username, password=password)
sftp = ssh.open_sftp()

for root, dirs, files in os.walk(local_path):
    for fname in files:
        full_fname = os.path.join(root, fname)
        full_remote = os.path.join(remote_path, fname)
        sftp.put(full_fname, full_remote)
sftp.close()
ssh.close()

但是我唯一的问题是,我需要传输的数据量可能需要几天的时间来回传输,因此我希望尽可能异步启动数据传输,这样我就可以对数据进行分析当前数据集,同时传输下一个要分析的数据集。

但我不知道如何做这样的事情,谁能指出我正确的方向?

【问题讨论】:

  • 您的意思是要手动启动工作并让多个线程/进程移动数据吗?还是您希望程序始终运行并在新文件到达桌面时让它复制文件?
  • 我想手动启动它。输入数据量保持不变。
  • 我不清楚异步传输如何帮助您实现目标。如果多个进程共享同一个线程并根据 FD 何时准备好进行读/写来进行调度,那么对于要同时复制的每个文件使用不同的线程无法做到这一点,您可以做什么? (而如果您一次不想复制多个文件,“异步给您带来了什么?”就变得更不清楚了)。
  • 明白了。老实说,既然如此,我会考虑将工作分解为单独的流程。如果你有一个transfer-and-process 脚本,给定一个文件名,首先将文件复制到远程服务器,然后在那里处理它,像find . -type f -print0 | xargs -n 1 -P 5 ./transfer-and-process 这样简单的东西就足够了。出于几个原因,这不是理想,但它是一个开始思考问题的地方;如果您的处理速度比传输速度快,更好的方法就是在远程端有一个脚本来监视目录中的新文件......
  • 或者,您可以从服务器可以挂载的本地计算机打开一个网络共享(unix 风格的 NFS 或 windows 风格的 CIFS 挂载)。假设服务器只扫描和写入一次数据,则不会比复制更多开销。如果你想在服务器上预留数据,它可以在需要时从共享中复制。

标签: python asynchronous parallel-processing sftp paramiko


【解决方案1】:

此解决方案使用multiprocessing.Pool 创建单独进程的任务池。每次调用apply_async 时,都会传递一个函数指针和一个参数列表。在这种情况下,要执行的函数是copy_file,arg 是文件名:

import os
import paramiko

from multiprocessing import Pool

remote_get = 'test'
local_deliver = './test'

pool = Pool(processes=4)  # Experiment with this number based on your # CPUs
def copy_file(filename):
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    ssh.load_host_keys(os.path.expanduser(os.path.join("~", ".ssh", "known_hosts")))
    ssh.connect(server, username=username, password=password)
    sftp = ssh.open_sftp()

    full_fname = os.path.join(root, fname)
    full_remote = os.path.join(remote_path, fname)
    sftp.put(full_fname, full_remote)
    sftp.close()
    ssh.close()

for root, dirs, files in os.walk(local_deliver):
    for fname in files:
        pool.apply_async(copy_file, [fname])

您的原始文件中有一些变量没有被考虑在内,所以我使用了我的最佳猜测。 ssh 和 sftp 客户端创建需要移动到 copy_file 中,因为除非它是可序列化的,否则您无法保存并在进程之间共享它。

multiprocessing.Pool 上的 processes 参数可以根据 CPU 的数量进行调整,但请记住,您将在此处与多个瓶颈作斗争:1. CPU,2. NIC 带宽限制,3. 磁盘 I /O 限制。

这里有更多的多处理文档:https://docs.python.org/3/library/multiprocessing.html#using-a-pool-of-workers

编辑:我还记得 paramiko 的 SFTP 与炮击 SFTP 命令的速度要慢得多。可能值得写出批处理文件并使用subprocess.call 执行它们以获得更好的性能。

【讨论】:

  • 这是一个非常好的答案,虽然我还没有实现它,但我认为这会解决我的问题,非常感谢!
  • 不客气,很高兴为您提供帮助。我只是想到了另一种探索方式:查看rsync。它可能无法完全满足您的需求,但值得一试。
  • 我还建议使用 lftp; rsync 在另一端需要另一个 rsync 副本,以及一个通用的 ssh 传输来运行它;即使关闭了 shell 访问,lftp 也可以在 sftp 上运行,甚至可以在目标上没有 shell 的系统上运行。 (这样做的代价是没有获得差异传输——恢复中断的传输只能追加——但听起来 OP 在目标上没有文件可以从中受益)。
猜你喜欢
  • 2017-10-10
  • 1970-01-01
  • 2015-08-04
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-01-27
  • 2014-01-30
  • 2021-06-07
相关资源
最近更新 更多