【问题标题】:Combining multithreading and multiprocessing with concurrent.futures将多线程和多处理与 concurrent.futures 相结合
【发布时间】:2023-03-03 04:06:01
【问题描述】:

我有一个高度依赖 I/O 和 CPU 密集型的函数。我试图通过多处理和多线程来并行化它,但它被卡住了。这个问题was asked 之前但在不同的设置中。我的函数是完全独立的,什么也不返回。为什么卡住了?怎么解决?

import concurrent.futures
import os
import numpy as np
import time


ids = [1,2,3,4,5,6,7,8]

def f(x):
    time.sleep(1)
    x**2

def multithread_accounts(AccountNumbers, f, n_threads = 2):

    slices = np.array_split(AccountNumbers, n_threads)
    slices = [list(i) for i in slices]

    with concurrent.futures.ThreadPoolExecutor() as executor:
        executor.map(f, slices)



def parallelize_distribute(AccountNumbers, f, n_threads = 2, n_processors = os.cpu_count()):

    slices = np.array_split(AccountNumbers, n_processors)
    slices = [list(i) for i in slices]

    with concurrent.futures.ProcessPoolExecutor(max_workers=n_processors) as executor:
        executor.map( lambda x: multithread_accounts(x, f, n_threads = n_threads) , slices)
        
parallelize_distribute(ids, f, n_processors=2, n_threads=2)

【问题讨论】:

    标签: python multithreading concurrency parallel-processing multiprocessing


    【解决方案1】:

    抱歉,我没时间解释这一切,所以我只给出“有效”的代码。我敦促您从更简单的东西开始,因为学习曲线并非易事。首先将 numpy 排除在外;一开始坚持only 线程;然后移动到 only 进程;除非您是专家,否则不要尝试并行化命名模块级函数以外的任何内容(不,不是函数局部匿名 lambda)。

    正如经常发生的那样,您“应该”收到的错误消息会被抑制,因为它们是异步发生的,因此没有好的方法来报告它们。随意添加print() 语句,看看你能走多远。

    注意:我去掉了 numpy,并添加了所需的东西,以便它也可以在 Windows 上运行。我希望使用 numpy 的 array_split() 可以正常工作,但我当时使用的机器上没有 numpy。

    import concurrent.futures as cf
    import os
    import time
    
    def array_split(xs, n):
        from itertools import islice
        it = iter(xs)
        result = []
        q, r = divmod(len(xs), n)
        for i in range(r):
            result.append(list(islice(it, q+1)))
        for i in range(n - r):
            result.append(list(islice(it, q)))
        return result
        
    ids = range(1, 11)
    
    def f(x):
        print(f"called with {x}")
        time.sleep(5)
        x**2
    
    def multithread_accounts(AccountNumbers, f, n_threads=2):
        with cf.ThreadPoolExecutor(max_workers=n_threads) as executor:
            for slice in array_split(AccountNumbers, n_threads):
                executor.map(f, slice)
    
    def parallelize_distribute(AccountNumbers, f, n_threads=2, n_processors=os.cpu_count()):
        slices = array_split(AccountNumbers, n_processors)
        print("top slices", slices)
        with cf.ProcessPoolExecutor(max_workers=n_processors) as executor:
            executor.map(multithread_accounts, slices,
                                               [f] * len(slices),
                                               [n_threads] * len(slices))
    
    if __name__ == "__main__":
        parallelize_distribute(ids, f, n_processors=2, n_threads=2)
    

    顺便说一句,我建议这对线程部分更有意义:

    def multithread_accounts(AccountNumbers, f, n_threads=2):
        with cf.ThreadPoolExecutor(max_workers=n_threads) as executor:
            executor.map(f, AccountNumbers)
    

    也就是说,这里真的没有必要自己拆分列表 - 线程机器会自己拆分它。您可能在最初的尝试中错过了这一点,因为您发布的代码中的 ThreadPoolExecutor() 调用忘记指定 max_workers 参数。

    【讨论】:

      猜你喜欢
      • 2015-02-11
      • 2014-05-18
      • 1970-01-01
      • 2020-03-02
      • 2014-01-13
      • 2011-09-17
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多