【问题标题】:Python How to download multiple files in parallel using multiprocessing.poolPython 如何使用 multiprocessing.pool 并行下载多个文件
【发布时间】:2019-07-25 15:27:23
【问题描述】:

我正在尝试使用 multiprocessing.Pool 下载和提取 zip 文件。但每次我执行脚本时,只会下载 3 个 zips 并且目录中看不到剩余文件(CPU % 为也触及 100%)。有人可以帮助我如何解决这个问题/建议更好的方法并遵循我尝试过的 sn-p。我对多处理完全陌生。我的目标是在不达到最大 CPU 的情况下并行下载多个文件。

import StringIO
import os
import sys
import zipfile
from multiprocessing import Pool, cpu_count

import requests

filePath = os.path.dirname(os.path.abspath(__file__))
print("filePath is %s " % filePath)
sys.path.append(filePath)
url = ["http://mlg.ucd.ie/files/datasets/multiview_data_20130124.zip",
       "http://mlg.ucd.ie/files/datasets/movielists_20130821.zip",
       "http://mlg.ucd.ie/files/datasets/bbcsport.zip",
       "http://mlg.ucd.ie/files/datasets/movielists_20130821.zip",
       "http://mlg.ucd.ie/files/datasets/3sources.zip"]


def download_zips(url):
    file_name = url.split("/")[-1]
    response = requests.get(url)
    sourceZip = zipfile.ZipFile(StringIO.StringIO(response.content))
    print("\n Downloaded {} ".format(file_name))
    sourceZip.extractall(filePath)
    print("extracted {} \n".format(file_name))
    sourceZip.close()


if __name__ == "__main__":
    print("There are {} CPUs on this machine ".format(cpu_count()))
    pool = Pool(cpu_count())
    results = pool.map(download_zips, url)
    pool.close()
    pool.join()

下面的输出

filePath is C:\Users\Documents\GitHub\Python-Examples-Internet\multi_processing 
There are 4 CPUs on this machine 
filePath is C:\Users\Documents\GitHub\Python-Examples-Internet\multi_processing 
filePath is C:\Users\Documents\GitHub\Python-Examples-Internet\multi_processing 
filePath is C:\Users\Documents\GitHub\Python-Examples-Internet\multi_processing 
filePath is C:\Users\Documents\GitHub\Python-Examples-Internet\multi_processing 

 Downloaded bbcsport.zip 
extracted bbcsport.zip 


 Downloaded 3sources.zip 
extracted 3sources.zip 


 Downloaded multiview_data_20130124.zip 

 Downloaded movielists_20130821.zip 

 Downloaded movielists_20130821.zip 
extracted multiview_data_20130124.zip 

extracted movielists_20130821.zip 

extracted movielists_20130821.zip 

【问题讨论】:

  • 如果您的操作系统是 Windows,则为 Windows 保留 1 个内核。因此,您可以使用 3 个核心来计算。 Linux 为您提供一切。默认情况下,应用程序以full access 计入资源。例如,在 Java 中,您可以限制 RAM 的使用,但不能限制 CPU / 磁盘。为了解决这个问题,您需要使用虚拟机,如 WMware、Docker,在其中限制 CPU、Ram、DIsc 等资源......
  • 我在windows VM中尝试过限制pool = Pool(1),但结果是一样的,我在目录中只看到3个文件。我的 8 核 CPU linux 机器也一样。

标签: python python-multiprocessing


【解决方案1】:

我已经在你的函数中做了几个小的 tweeks,它工作正常。请注意:

  1. 文件".../movielists_20130821.zip" 出现在您的列表中两次,因此您下载了两次相同的内容(可能是拼写错误?)
  2. 文件".../multiview_data_20130124.zip"".../movielists_20130821.zip"".../3sources.zip" 提取后会生成一个新目录。但是,文件".../bbcsport.zip" 在提取时会将其文件放在根文件夹中,即您当前的工作目录(见下图)。也许您错过了这张支票?
  3. 我在 donwload 函数中添加了一个 try/except 块。为什么?多处理通过创建新的(子)进程来运行东西来工作。 如果子进程抛出异常,父进程不会捕获它。因此,如果此子流程中出现任何错误,则必须在此处记录/处理。

import sys, os
import zipfile
import requests
from multiprocessing import Pool, cpu_count
from functools import partial
from io import BytesIO


def download_zip(url, filePath):
    try:
        file_name = url.split("/")[-1]
        response = requests.get(url)
        sourceZip = zipfile.ZipFile(BytesIO(response.content))
        print(" Downloaded {} ".format(file_name))
        sourceZip.extractall(filePath)
        print(" extracted {}".format(file_name))
        sourceZip.close()
    except Exception as e:
        print(e)


if __name__ == "__main__":
    filePath = os.path.dirname(os.path.abspath(__file__))
    print("filePath is %s " % filePath)
    # sys.path.append(filePath) # why do you need this?
    urls = ["http://mlg.ucd.ie/files/datasets/multiview_data_20130124.zip",
            "http://mlg.ucd.ie/files/datasets/movielists_20130821.zip",
            "http://mlg.ucd.ie/files/datasets/bbcsport.zip",
            "http://mlg.ucd.ie/files/datasets/movielists_20130821.zip",
            "http://mlg.ucd.ie/files/datasets/3sources.zip"]

    print("There are {} CPUs on this machine ".format(cpu_count()))
    pool = Pool(cpu_count())
    download_func = partial(download_zip, filePath = filePath)
    results = pool.map(download_func, urls)
    pool.close()
    pool.join()

【讨论】:

  • 以防万一有人想知道partial 是什么,我要添加一个解释:download_zip 接受 2 个参数,urlfilePathpartial(download_zip, filePath = filePath) 输出一个函数,其中 @987654334 @ 设置为 filePath 变量。因此,download_func 只需要一个参数,而 download_zip 需要 2 个参数...
  • 函数调用必须在__main__中执行
【解决方案2】:

我建议您使用多线程来执行此操作,因为它是 I/O 绑定,如下所示:

import requests, zipfile, io
import concurrent.futures 
url = ["http://mlg.ucd.ie/files/datasets/multiview_data_20130124.zip",
   "http://mlg.ucd.ie/files/datasets/movielists_20130821.zip",
   "http://mlg.ucd.ie/files/datasets/bbcsport.zip",
   "http://mlg.ucd.ie/files/datasets/movielists_20130821.zip",
   "http://mlg.ucd.ie/files/datasets/3sources.zip"]

def download_zips(url):
   file_name = url.split("/")[-1]
   response = requests.get(url)
   sourceZip = zipfile.ZipFile(io.BytesIO(response.content))
   print("\n Downloaded {} ".format(file_name))
   sourceZip.extractall(filePath)
   print("extracted {} \n".format(file_name))
   sourceZip.close()

with concurrent.futures.ThreadPoolExecutor() as exector : 
   exector.map(download_zip, urls)

【讨论】:

  • 我想做类似的事情,但使用 paramiko SCPClient 从服务器下载许多 .zip(每个约 2mo)文件。我尝试了类似于上面的东西。如果我尝试下载几个文件,它的工作速度非常快,但对于超过 10 个 .zip 文件来说效率完全低下。有什么建议吗?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2020-11-14
  • 2013-12-22
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多