【问题标题】:Python multiprocessing taking longer timePython 多处理需要更长的时间
【发布时间】:2019-08-19 00:21:32
【问题描述】:

我正在尝试使用 python multiprocessing 模块来减少过滤代码的时间。一开始我做了一些实验。结果并不乐观。

我已经定义了一个函数来在一定范围内运行一个循环。然后我在有和没有线程的情况下运行了这个函数并测量了时间。这是我的代码:

import time
from multiprocessing.pool import ThreadPool

def do_loop(i,j):
    l = []
    for i in range(i,j):
        l.append(i)
    return l

#loop veriable
x = 7

#without thredding
start_time = time.time()
c = do_loop(0,10**x)
print("--- %s seconds ---" % (time.time() - start_time))

#with thredding
def thread_work(n):
    #dividing loop size
    a = 0
    b = int(n/2)
    c = int(n/2)
    #multiprocessing
    pool = ThreadPool(processes=10)
    async_result1 = pool.apply_async(do_loop, (a,b))
    async_result2 = pool.apply_async(do_loop, (b,c))
    async_result3 = pool.apply_async(do_loop, (c,n))
    #get the result from all processes]
    result = async_result1.get() + async_result2.get() + async_result3.get()

    return result

start_time = time.time()
ll = thread_work(10**x)
print("--- %s seconds ---" % (time.time() - start_time))

对于 x=7,结果是:

--- 1.0931916236877441 seconds ---
--- 1.4213247299194336 seconds ---

没有线程,它需要更少的时间。这是另一个问题。对于 X=8,大多数时候我会得到 MemoryError 用于线程。一旦我得到这个结果:

--- 17.04124426841736 seconds ---
--- 32.871358156204224 seconds ---

解决方案很重要,因为我需要优化 filtering task,这需要 6 个小时。

【问题讨论】:

  • 线程和多处理是两种不同的并行技术。根据代码,我猜你应该在你的问题中到处替换“线程”。
  • @tripleee ,我假设 ThreadPool() 函数正在创建不同的线程。

标签: python multithreading multiprocessing


【解决方案1】:

根据您的任务,多处理可能需要也可能不需要更长的时间。 如果你想利用你的 CPU 内核并加快你的过滤过程,那么你应该use multiprocessing.Pool

提供了一种方便的方法来并行执行函数 跨多个输入值,将输入数据分布在 进程(数据并行)。

我一直在创建一个数据过滤示例,然后我一直在测量简单方法的时间和多进程方法的时间。 (从你的代码开始)

# take only the sentences that ends in "we are what we dream",  the second word is "are"


import time
from multiprocessing.pool import Pool

LEN_FILTER_SENTENCE = len('we are what we dream')
num_process = 10

def do_loop(sentences):
    l = []
    for sentence in sentences:
        if sentence[-LEN_FILTER_SENTENCE:].lower() =='we are what we doing' and sentence.split()[1] == 'are':     
            l.append(sentence)
    return l

#with thredding
def thread_work(sentences):
    #multiprocessing

    pool = Pool(processes=num_process)
    pool_food = (sentences[i: i + num_process] for i in range(0, len(sentences), num_process))
    result = pool.map(do_loop, pool_food)
    return result

def test(data_size=5, sentence_size=100):
    to_be_filtered = ['we are what we doing'*sentence_size] * 10 ** data_size + ['we are what we dream'*sentence_size] * 10 ** data_size

    start_time = time.time()
    c = do_loop(to_be_filtered)
    simple_time = (time.time() - start_time)



    start_time = time.time()
    ll = [e for l in thread_work(to_be_filtered) for e in l]
    multiprocessing_time = (time.time() - start_time)
    assert c == ll 
    return simple_time, multiprocessing_time

data_size 代表数据的长度,sentence_size 是每个数据元素的乘积因子,您可以看到 sentence_size 与数据中每个项目请求的 CPU 操作数成正比。

data_size = [1, 2, 3, 4, 5, 6]
results = {i: {'simple_time': [], 'multiprocessing_time': []} for i in data_size}
sentence_size = list(range(1, 500, 100))
for size in data_size:
    for s_size in sentence_size:
        simple_time, multiprocessing_time = test(size, s_size)
        results[size]['simple_time'].append(simple_time)
        results[size]['multiprocessing_time'].append(multiprocessing_time)

import pandas as pd

df_small_data = pd.DataFrame({'simple_data_size_1': results[1]['simple_time'],
                   'simple_data_size_2': results[2]['simple_time'],
                   'simple_data_size_3': results[3]['simple_time'],
                   'multiprocessing_data_size_1': results[1]['multiprocessing_time'],
                   'multiprocessing_data_size_2': results[2]['multiprocessing_time'],
                   'multiprocessing_data_size_3': results[3]['multiprocessing_time'],

                   'sentence_size': sentence_size})

df_big_data = pd.DataFrame({'simple_data_size_4': results[4]['simple_time'],
                   'simple_data_size_5': results[5]['simple_time'],
                   'simple_data_size_6': results[6]['simple_time'],
                   'multiprocessing_data_size_4': results[4]['multiprocessing_time'],
                   'multiprocessing_data_size_5': results[5]['multiprocessing_time'],
                   'multiprocessing_data_size_6': results[6]['multiprocessing_time'],

                   'sentence_size': sentence_size})

绘制小数据的时序:

ax = df_small_data.set_index('sentence_size').plot(figsize=(20, 10), title = 'Simple vs multiprocessing approach for small data')
ax.set_ylabel('Time in seconds')

绘制大数据(相对大数据)的时序:

如您所见,当您拥有大数据且每个数据元素都需要相对大量的 CPU 能力时,多处理能力就会显现出来。

【讨论】:

    【解决方案2】:

    这里的任务非常小,以至于并行化开销大大超过了好处。这是常见的常见问题解答。

    【讨论】:

      【解决方案3】:

      最好使用 multiprocessing.Process(),因为 python 有 Global Interpreter Lock(GIL)。因此,即使您创建线程来提高任务的速度,它也不会增加,它会一个接一个。您可以参考 Python 文档了解 GIL 和线程。

      【讨论】:

      • 有 gil,但线程会被调度以某种方式适合 cpus 对吧?
      【解决方案4】:

      Aroosh Rana 可能有最好的答案,但在使用这种方法进行测试时,需要注意一些事项。在循环中增长数组的方式可能非常低效,而是考虑预先分配其完整大小。另外,仔细看看你划分工作的方式,你有两个循环处理一半数组,一个循环从 n/2 到 n/2。同样正如其他地方所提到的,完成这个词相当微不足道,不会从并行处理中受益。 我已尝试改进您之前的测试。

      import time
      from multiprocessing.pool import ThreadPool
      import math
      
      def do_loop(array, i,j):
          for k in range(i,j):
              array[k] = math.cos(1/(1+k))
          return array
      
      #loop veriable
      x = 7
      array_size = 2*10**x
      #without thredding
      start_time = time.time()
      array = [0]*array_size
      c = do_loop(array, 0,array_size)
      print("--- %s seconds ---" % (time.time() - start_time))
      
      #with thredding
      def thread_work(n):
          #dividing loop size
          array = [0]*n
          a = 0
          b = int(n/3)
          c = int(2*n/3)
          #multiprocessing
          pool = ThreadPool(processes=4)
          async_result1 = pool.apply_async(do_loop, (array, a,b))
          async_result2 = pool.apply_async(do_loop, (array, b,c))
          async_result3 = pool.apply_async(do_loop, (array, c,n))
          #get the result from all processes]
          result1 = async_result1.get()
          result2 = async_result2.get()
          result3 = async_result3.get()
      
          start_time = time.time()
      
          result = result1+result2+result3
          print("--- %s seconds ---" % (time.time() - start_time))
          return result
      
      start_time = time.time()
      ll = thread_work(array_size)
      print("--- %s seconds ---" % (time.time() - start_time))
      

      另外请记住,使用这样的方法,您不必在最后组合结果,因为每个线程将在同一个数组上进行处理。

      【讨论】:

        【解决方案5】:

        为什么要对线程使用多处理?

        最好是创建多个线程实例。给他们每个人的任务。最后,启动所有这些。并等到他们完成。同时,将结果收集到某个列表中。

        根据我的经验(对于一项特定任务),我发现即使在开始时创建整个线程图,也比直接在图中的下一个节点中开始任务之前产生的开销要小。我的意思是 10、100、1000、10000 个线程。只需确保线程在空闲时间(即time.sleep(0.5))处于休眠状态,以避免浪费 CPU 周期。

        通过线程,您可以使用线程安全的列表、字典和队列。

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2016-09-07
          • 1970-01-01
          • 2013-11-09
          • 1970-01-01
          • 2019-06-27
          • 1970-01-01
          相关资源
          最近更新 更多