【问题标题】:How to do asynchronous file copying in Python?如何在 Python 中进行异步文件复制?
【发布时间】:2015-02-03 17:39:11
【问题描述】:

我正在使用 Python 中的一个函数,该函数接收文件路径列表和目标列表,并将每个文件复制到每个给定目标。我让这个函数的复制部分正常工作,但我需要能够在除 gui 操作之外异步运行这个函数,以便减少填写每个“表单”的时间。每次将文件复制到所有目录时,我还需要复制功能来通知用户。

我已经对如何做到这一点进行了一些研究,但每个选项都有很大的不同,例如使用不同的库。你会如何建议我这样做?

【问题讨论】:

  • 在特定操作系统上还是独立于操作系统上?

标签: python file parallel-processing


【解决方案1】:

我同意通过队列进行通信的线程是一个很好的解决方案。这是一个示例类:

import os, shutil, threading, Queue

class FileCopy(threading.Thread):
    def __init__(self, queue, files, dirs):
        threading.Thread.__init__(self)
        self.queue = queue
        self.files = list(files)  # copy list
        self.dirs = list(dirs)    # copy list
        for f in files:
            if not os.path.exists(f):
                raise ValueError('%s does not exist' % f)
        for d in dirs:
            if not os.path.isdir(d):
                raise ValueError('%s is not a directory' % d)

    def run(self):
        # This puts one object into the queue for each file,
        # plus a None to indicate completion
        try:
            for f in self.files:
                try:
                    for d in self.dirs:
                        shutil.copy(f, d)
                except IOError, e:
                    self.queue.put(e)
                else:
                    self.queue.put(f)
        finally:
            self.queue.put(None)  # signal completion

以下是如何使用此类的示例:

queue = Queue.Queue()
files = ['a', 'b', 'c']
dirs = ['./x', './y', './z']
copythread = FileCopy(queue, files, dirs)
copythread.start()
while True:
    x = queue.get()
    if x is None:
        break
    print(x)
copythread.join()

【讨论】:

    【解决方案2】:

    由于您的问题是 IO 限制,我建议您查看 threading 模块。结合Queue 模块,您将实现这一目标。

    【讨论】:

      【解决方案3】:

      早期answer 中的线程总是按顺序执行,没有任何并行性。通过一些更改,可以让 IO 绑定作业一起运行,以便线程的完成取决于文件的大小。

      我对 answer 中的代码进行了细微更改,并尝试验证最小的文件首先完成,而其他 IO 操作在后台继续。

      import os, shutil, threading, queue
      
      class FileCopy(threading.Thread):
          def __init__(self, queue, files, dirs):
              threading.Thread.__init__(self)
              self.queue = queue
              self.files = list(files)  # copy list
              self.dirs = list(dirs)    # copy list
              for f in files:
                  if not os.path.exists(f):
                      raise ValueError('%s does not exist' % f)
              for d in dirs:
                  if not os.path.isdir(d):
                      raise ValueError('%s is not a directory' % d)
      
          def run(self):
              # This puts one object into the queue for each file
              try:
                  for f in self.files:
                      try:
                          for d in self.dirs:
                              shutil.copy(f, d)
                      except IOError as e:
                          self.queue.put(e)
                      else:
                          self.queue.put(f)
              finally:
                  pass
      
      
      queue = queue.Queue()
      files = ['a', 'b', 'c']
      dirs = ['./x', './y', './z']
      thlist = []
      
      for file in files:
          copythread = FileCopy(queue, [file], dirs)
          thlist.append(copythread)
      
      for th in thlist:
          th.start()
      
      for file in files:
           x = queue.get()
           print("Finished copying " + x)
      
      for th in thlist:
          th.join()
      

      【讨论】:

      • 这里不考虑问题的核心方面,异步复制一个文件。我想你可能有一个在线程完成后解析的未来,但是在异步系统中使用线程可能会破坏异步引擎解决这些未来的好处。考虑异步引擎将操作分解为并行期货的情况,并希望在继续之前等待所有这些都解决,您的线程将独立运行且不受引擎管理,这意味着它将吸收引擎可能的运行时间使用更优化
      【解决方案4】:

      v3.2+ concurrent.futures

      concurrent.futures 内部,第一个示例将shutil.copy 生成到处理池中。

      也许值得考虑ThreadPoolExecutorProcessPoolExecutor (no GIL) 之间的区别

      import shutil
      with ThreadPoolExecutor(max_workers=4) as e:
          e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
          e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
          e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
          e.submit(shutil.copy, 'src4.txt', 'dest4.txt')
      

      v3.9 asyncio

      如上所述here (async versions of shutil)

      @graingert (an author) cmets 方法在异步引擎内部创建托管线程。不幸的是,这似乎只适用于 v3.9

      https://docs.python.org/3.9/library/asyncio-task.html#asyncio.to_thread 当然有

      await asyncio.to_thread(shutil.copyfile, "a", "b")
      

      之前的 v3.9 aiofiles

      @xyloguy建议

      正如(也)提到的here (async versions of shutil)

      如果您使用的 python 版本早于 3.9(我是),您可以使用 aiofiles.os.wrap,其实现与 @pwwang mention in their comment 相同。否则我会同意按照@graingert 的建议使用 asyncio.to_thread。

      import shutil
      from aiofiles.os import wrap
      copyfile = wrap(shutil.copyfile)
      copyfileobj = wrap(shutil.copyfileobj)
      
      await copyfile(src, dst)
      

      为什么以前的答案不充分

      大部分答案都忽略了问题的异步部分。

      我想你可能会有一个在线程完成后解决的未来,但是......

      在异步系统中使用线程可能会破坏异步引擎解决这些未来的好处。考虑异步引擎将操作分解为并行期货的情况,并希望在继续之前等待所有这些都解决,您的线程将独立运行且不受引擎管理。我们通过使用上面提供的示例来避免这个问题。

      tl;博士

      最理想的情况是,我们希望智能引擎为我们完成这项工作。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2019-01-19
        • 2019-08-07
        • 1970-01-01
        • 2017-08-29
        • 2020-06-07
        • 1970-01-01
        • 2014-02-15
        相关资源
        最近更新 更多