【问题标题】:Downloading a large file in parts using multiple parallel threads使用多个并行线程分批下载大文件
【发布时间】:2019-10-26 13:46:03
【问题描述】:

我有一个用例,需要使用多个线程分批下载大型远程文件。 每个线程必须同时(并行)运行,获取文件的特定部分。 一旦成功下载所有部分,期望将这些部分组合成一个(原始)文件。

也许使用 requests 库可以完成这项工作,但我不确定如何将其多线程化为将块组合在一起的解决方案。

url = 'https://url.com/file.iso'
headers = {"Range": "bytes=0-1000000"}  # first megabyte
r = get(url, headers=headers)

我也在考虑使用 curl 来协调下载,但我不确定这是正确的方法。它似乎太复杂了,并且偏离了普通的 Python 解决方案。像这样的:

curl --range 200000000-399999999 -o file.iso.part2

有人能解释一下你会怎么做这样的事情吗?或者发布一个在 Python 3 中工作的代码示例?我通常很容易找到与 Python 相关的答案,但这个问题的解决方案似乎让我望而却步。

【问题讨论】:

  • this answer 呢?
  • 这似乎与 Python 2 相关,在 Python 3 中不起作用

标签: python python-3.x multithreading curl python-requests


【解决方案1】:

这是一个使用 Python 3 和 Asyncio 的版本,它只是一个示例,它可以改进,但你应该能够得到你需要的一切。

  • get_size:发送 HEAD 请求获取文件大小
  • download_range:下载单个块
  • download: 下载所有块并合并它们
import asyncio
import concurrent.futures
import requests
import os


URL = 'https://file-examples.com/wp-content/uploads/2017/04/file_example_MP4_1920_18MG.mp4'
OUTPUT = 'video.mp4'


async def get_size(url):
    response = requests.head(url)
    size = int(response.headers['Content-Length'])
    return size


def download_range(url, start, end, output):
    headers = {'Range': f'bytes={start}-{end}'}
    response = requests.get(url, headers=headers)

    with open(output, 'wb') as f:
        for part in response.iter_content(1024):
            f.write(part)


async def download(executor, url, output, chunk_size=1000000):
    loop = asyncio.get_event_loop()

    file_size = await get_size(url)
    chunks = range(0, file_size, chunk_size)

    tasks = [
        loop.run_in_executor(
            executor,
            download_range,
            url,
            start,
            start + chunk_size - 1,
            f'{output}.part{i}',
        )
        for i, start in enumerate(chunks)
    ]

    await asyncio.wait(tasks)

    with open(output, 'wb') as o:
        for i in range(len(chunks)):
            chunk_path = f'{output}.part{i}'

            with open(chunk_path, 'rb') as s:
                o.write(s.read())

            os.remove(chunk_path)


if __name__ == '__main__':
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
    loop = asyncio.get_event_loop()

    try:
        loop.run_until_complete(
            download(executor, URL, OUTPUT)
        )
    finally:
        loop.close()

【讨论】:

    【解决方案2】:

    您可以使用grequests并行下载。

    import grequests
    
    URL = 'https://cdimage.debian.org/debian-cd/current/amd64/iso-cd/debian-10.1.0-amd64-netinst.iso'
    CHUNK_SIZE = 104857600  # 100 MB
    HEADERS = []
    
    _start, _stop = 0, 0
    for x in range(4):  # file size is > 300MB, so we download in 4 parts. 
        _start = _stop
        _stop = 104857600 * (x + 1)
        HEADERS.append({"Range": "bytes=%s-%s" % (_start, _stop)})
    
    
    rs = (grequests.get(URL, headers=h) for h in HEADERS)
    downloads = grequests.map(rs)
    
    with open('/tmp/debian-10.1.0-amd64-netinst.iso', 'ab') as f:
        for download in downloads:
            print(download.status_code)
            f.write(download.content)
    

    PS:我没有检查范围是否正确确定以及下载的 md5sum 是否匹配!这应该只是一般地显示它是如何工作的。

    【讨论】:

    • 这正是我所需要的。顺便提一句。这很好,但是如果您有时间修改代码以显示每个下载部分的进度,那就太棒了。
    • 我发现这个脚本的一个问题是合并的下载文件与原始文件的字节大小不匹配。对于该文件,您显示 (iso) 总大小 = 351272960 字节,但下载的文件长 3 个字节:351272963 字节。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多