【问题标题】:Using Python's concurrent.futures to process objects in parallel使用 Python 的 concurrent.futures 并行处理对象
【发布时间】:2014-09-03 19:37:08
【问题描述】:

我刚开始使用 Python 3 中的库 concurrent.futures 将许多函数应用于图像列表,以便处理这些图像并重塑它们。 函数是resize(height, width)opacity(number)

另一方面,我有 images() 产生类似文件的对象的函数, 所以我尝试了这段代码来并行处理我的图像:

import concurrent.futures
From mainfile import images
From mainfile import shape


def parallel_image_processing :
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    future = executor.submit(images)
    for fileobject in future.result() :
        future1 = executor.submit( shape.resize, fileobject, "65","85")
        future2 = executor.submit( shape.opacity, fileobject, "0.5")

有人能告诉我我是否走在正确的道路上吗?

【问题讨论】:

  • 你有什么问题?您没有说明当前代码有任何问题。除了告诉你那不是有效的 Python 语法,你还能期待什么?
  • 您能否更具体地了解images 返回的内容?它只是类文件对象的打开句柄列表吗?您可能会遇到跨两个函数并行读取这些类文件对象的问题。
  • @Bakuriu 我的问题是在这种情况下使用 python 并行处理图像是否是正确的方法
  • @dano 这是 images() 返回的内容:yield open(os.path.join(image_dir[0], filename), 'rb')
  • @user1578720 那么它是一个生成器吗? resizeopacity 是否只希望将一个类似文件的对象传递给它们,还是希望有一个可迭代对象(如生成器)?

标签: python multithreading python-3.x future concurrent.futures


【解决方案1】:

我建议让images 只返回一个路径,而不是一个打开的文件对象:

def images():
    ...
    yield os.path.join(image_dir[0], filename)

然后使用这个:

from functools import partial

def open_and_call(func, filename, args=(), kwargs={}):
    with open(filename, 'rb') as f:
        return func(f, *args, **kwargs)

def parallel_image_processing():
    resize_func = partial(open_and_call, shape.resize, args=("65", "85"))
    opacity_func = partial(open_and_call, shape.opacity, args=("0.5"))
    img_list = list(images())
    with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
        futures1 = executor.map(resize_func, img_list)
        futures2 = executor.map(opacity_func, img_list)

        concurrent.futures.wait([futures1, futures2])


if __name__ == "__main__":
    # Make sure the entry point to the function that creates the executor 
    # is inside an `if __name__ == "__main__"` guard if you're on Windows.
    parallel_image_processing()

如果您使用的是 CPython(相对于没有 GIL 的替代实现,如 Jython),您不想使用 ThreadPoolExecutor,因为图像处理是 CPU 密集型的;由于 GIL,在 CPython 中一次只能运行一个线程,因此如果您将线程用于您的用例,您实际上不会并行执行任何操作。相反,使用ProcessPoolExecutor,它将使用进程而不是线程,完全避免 GIL。请注意,这就是我建议不要从images 返回类似文件的对象的原因——您不能将打开的文件句柄传递给工作进程。您必须改为在工作人员中打开文件。

为此,我们让 executor 调用一个小 shim 函数 (open_and_call),它将在工作进程中打开文件,然后使用正确的参数调用 resize/opacity 函数.

我也使用executor.map 而不是executor.submit,这样我们就可以为images() 返回的每个项目调用resize/opacity,而无需显式的for 循环。我使用functools.partial 来更轻松地使用executor.map 调用带有多个参数的函数(它只允许您调用带有单个参数的函数)。

也没有必要在执行程序中调用images(),因为无论如何你都要等待它的结果才能继续。只需像普通函数一样调用它。在调用map 之前,我还将images() 返回的生成器对象转换为list。如果您担心内存使用情况,可以在每个 map 调用中直接调用 images(),但如果不是,则只调用一次 images() 并将其存储为列表可能会更快。

【讨论】:

  • Not all implementations of Python use a GIL,所以最好本地化 Python --> CPython。
  • 感谢您的回复,这与我原来的解决方案有很大不同,您能告诉我它是否可以按照我的方式工作吗?有一些小的修改?
  • @user1578720 您的代码将“工作”,但它不会真正并行运行,因为您使用的是ThreadPoolExecutor。您需要使用ProcessPoolExecutor 来获得真正的并发性。这样做需要更改 images 以返回文件名而不是文件对象。但是您使用executor.submit 的方式是正确的。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2014-01-13
  • 2018-04-24
  • 1970-01-01
  • 1970-01-01
  • 2021-09-25
  • 2021-05-01
  • 2015-06-12
相关资源
最近更新 更多