【问题标题】:How to parallelize a CPU-intesive data processing task using Python?如何使用 Python 并行化 CPU 密集型数据处理任务?
【发布时间】:2017-06-15 05:56:19
【问题描述】:

我正在创建一些代码来对大型 pdf 数据集执行 OCR,并将提取的文本写入 csv。这是使用 Imagemagick、Pillow、PyOCR (Tesseract) 等库的组合完成的,并且已经在小数据样本上进行了测试。

数据包含大量文件夹(约 2500 个),每个文件夹包含大约 15 个 pdf。为每个文件夹中的 pdf 创建一个 csv,处理每个文件夹大约需要 10 分钟。这意味着在所有 2500 个文件夹上运行 OCR 大约需要 18 天,这实在是太长了。我需要能够在 7 天或更短的时间内完成整个流程。

因此,我正在研究并行处理每个子文件夹,因为每个子文件夹的处理独立于任何其他子文件夹。我的第一种方法是使用 concurrent.futures 模块来创建一个 ProcessPool,如下所示:

executor = concurrent.futures.ProcessPoolExecutor(4)
futures = ([executor.submit(run_pdf_to_text_ocr, folder) for folder in sub_folders])
concurrent.futures.wait(futures)

这里的run_pdf_to_text_ocr() 是为每个文件夹运行处理的主要函数。进程使用的相同资源存在一些问题,我正在解决这些问题,以便每个文件夹(进程/线程)在其资源使用中被隔离。

当然,这个处理管道是 CPU 密集型的,并且在运行时会最大限度地利用处理器。我将启动一个大型 AWS EC2 实例以最终完成完整运行。所以,在开始之前我想知道的是:

如果我采用的这种方法是正确的? 我可以采取任何替代方法以更好的方式做到这一点吗?我应该使用分布式处理吗?我可以做些什么来正确监控这个长时间运行的管道,以便我能够了解在处理过程中可能出现的任何问题?

我对 Python 非常满意,并且希望尽可能多地使用它的解决方案。

【问题讨论】:

    标签: python amazon-ec2 parallel-processing batch-processing distributed-computing


    【解决方案1】:

    我认为multiprocessing 模块是你想要的。

    from multiprocessing import Pool
    
    workers = Pool()
    workers.apply_async(your_task_function, args=(args_for_this_task))
    workers.join()
    

    因此,您可以轻松地将整个任务拆分为多个小任务,然后传递给多 CPU。您申请的任务将被放入workers的队列中。

    此外,要监控错误,您可以像这样传递错误回调:workers.apply_async(your_task_function, args=(args_for_this_task), error_callback=error_callback)

    此外,您可以使用multiprocessing.Manager 轻松地在进程之间共享变量,甚至可以在不同的机器上。

    【讨论】:

      【解决方案2】:

      序幕

      这项工作可以在一天或一夜之间完成,但这不是任何并行处理架构的工作。为什么?

      • 在一组 { thread- |进程-}-执行。

      • 牛群中的任何对/组之间存在零依赖/互通/同步

      • 零需要共享单个独立 OCR 作业的内容/上下文

      • 那么,为什么要付出所有这些巨大的性能成本以及更多的 Python GIL 锁定惩罚(许多线程,但只是 GIL 锁定为 仅执行 1 次 + 所有其他人在队列中等待 接收 GIL 锁权限以向前运行几步,然后在纯 SEQ-relay-race 中后退并释放 GIL 锁(?!?! ) )

      因此,换句话说,适当的架构应该寻求可行的性能扩展(线性越多越好),而不是在原本只是顺序的命令式代码中利用真正的并行部分。 租用恐龙大小的模拟代码执行实例在 AWS 扩展上也可能会人为地昂贵,只是为了许多并发线程执行(仍然有被毁坏的风险) GIL-stepped SEQ-relay 1-works + ALL-wait ),而其他场景可以自适应且经济高效地工作。

      最后一个主要反对意见是,智能系统架构遵循工作流执行的逻辑并首先优化性能限制部分,而不是试图用一些在教科书示例中看起来很酷的 SLOC 来说服设计板,但是从来没有。


      Smarter way goes into Adaptive, Distributed-processing Architecture

      为什么?

      • 可以即时增加性能(添加更多轻量级工作单元,如果时间需要,预算允许,直到所有 CPU 容量稳定在 100% 上)
      • 人们可以从分布式处理接收增量交付(完成的工作单元 (OCR'd PDF)),因为它们看起来已经完成,而不是在等待之后,在任何处理控制之外,7 天获得第一个或一无所获根本上,如果引擎盖下出了什么问题。

      • 人们甚至可以原型/部署/调整/修改/更新/重新设计处理发生的方式,同时仍然保持工作的完成——实时更新可以顺利进行,不会浪费任何时间,而且已经执行的工作单元,到目前为止由以前的 Worker 版本处理。

      总而言之,忘记使用 Python 内置插件(即使努力将它们修改为 GIL 发布的形式和形状),而是全面评估大规模可扩展分布式处理的性能、可扩展性和 ROI 优势解决方案。

      • 在需要时添加工作节点以及需要多少个(甚至数百或数千个在办公室类型环境中精心安排的通宵班次,或由可用的 IaaS 凭证或其他(几乎)零成本处理节点授予)李>
      • 根据需要配备额外的信号工具(可以要求 Worker(s) 仅从终端报告其实际状态/ETA/远程 CPU 负载/...)
      • 运行在 tcp:// 上,用于真正的异构分布式系统(AWS 和其他外部 IaaS 配置工具)和许多其他 { inproc:// | ipc:// | pgm:// | epgm:// } 传输类,用于高性能设计的所有强大设计方法。

      结语:不用提醒你 PIL 非常慢,所以如果需要加速单个文档作业处理,profiler 很可能会指向这个热点。 p>

      【讨论】:

        猜你喜欢
        • 2020-06-16
        • 1970-01-01
        • 2023-02-07
        • 2015-06-12
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2021-01-02
        • 2015-02-03
        相关资源
        最近更新 更多