【问题标题】:Python Multiprocessing Worker/QueuePython 多处理工作者/队列
【发布时间】:2014-03-06 20:38:12
【问题描述】:

我有一个必须运行 12 次的 python 函数。我目前已设置为使用多处理库中的 Pool 来并行运行所有这些库。通常我一次运行 6 个,因为该函数是 CPU 密集型的,并且并行运行 12 个通常会导致程序崩溃。当我们一次做 6 个时,在前 6 个过程全部完成之前,第二组 6 个不会开始。理想情况下,我们希望在最初一批 6 个中的一个完成后立即启动另一个(例如第 7 个)——这样 6 个同时运行,同时还有更多的启动。现在代码看起来像这样(它会被调用两次,将前 6 个元素传递到一个列表中,然后将第二个 6 传递到另一个列表中:

from multiprocessing import Pool

def start_pool(project_list):

    pool = Pool(processes=6)
    pool.map(run_assignments_parallel,project_list[0:6])

所以我一直在尝试实施工作人员/队列解决方案并遇到了一些问题。我有一个看起来像这样的工作函数:

def worker(work_queue, done_queue):
    try:
        for proj in iter(work_queue.get, 'STOP'):
            print proj
            run_assignments_parallel(proj)
            done_queue.put('finished ' + proj )
    except Exception, e:        
        done_queue.put("%s failed on %s with: %s" % (current_process().name, proj,        e.message))
    return True

而调用worker函数的代码如下:

workers = 6
work_queue = Queue()
done_queue = Queue()  
processes = []
for project in project_list:
    print project
    work_queue.put(project)
for w in xrange(workers):        
    p = Process(target=worker, args=(work_queue, done_queue))
    p.start()
    processes.append(p)
    work_queue.put('STOP')
for p in processes:
     p.join()    
     done_queue.put('STOP')
for status in iter(done_queue.get, 'STOP'):        
    print status

project_list 只是需要在函数“run_assignments_parallel”中运行的 12 个项目的路径列表。

按照现在的编写方式,同一进程(项目)多次调用该函数,我无法确定发生了什么。这段代码基于我找到的一个例子,我很确定循环结构搞砸了。任何帮助都会很棒,我为我对此事的无知表示歉意。谢谢!

【问题讨论】:

  • 好吧,如果你有 indetation,你会得到错误,请检查一下
  • pxl 修复了明显的格式,但鉴于所有其他行都是正确的 - put('SOP') 与 Python 文档示例不符

标签: python multiprocessing


【解决方案1】:

理想情况下,我们希望在最初一批 6 个中的一个完成后立即启动另一个(例如第 7 个)——这样 6 个同时运行,而还有更多的开始。

您需要更改的只是传递所有 12 个输入参数而不是 6 个:

from multiprocessing import Pool
pool = Pool(processes=6) # run no more than 6 at a time
pool.map(run_assignments_parallel, project_list) # pass full list (12 items)

【讨论】:

  • 谢谢!这确实是正确的,并且我之前尝试过,现在我(再次)意识到为什么我的问题有点复杂 - 该函数使用来自某些专有软件的 API,该 API 调用无法销毁和重新创建的对象。换句话说,每个项目都必须在单独的进程上运行,这就是为什么 start_pool 函数在第二组 6 中再次被调用。关于如何解决这个问题的任何想法?
  • @user2503169: a) 检查对象是否已经创建,例如,使其成为全局对象,如果不是 None b) 或者如果您想要一个新进程,则不要尝试重新创建它对于每个任务:maxtasksperchild=1 c) 创建 12 个 Process 并使用 Semaphore 以避免超过 6 个运行。
  • 看起来 maxtaskperchild 将是一个简单的解决方案,但我正在运行 2.6(因为专有软件)。你能告诉我如何使用信号量吗?非常感谢您的帮助!
  • @user2503169:在主进程中创建Semaphore,值为6。将它传递给两个所有子进程 (12),包装您的代码 with sem: ..your child code here..。每次在with sem 语句中输入处理时,都会自动调用sem.acquire() 以递减信号量值。如果它达到零,则 sem.acquire() 阻塞,直到调用 sem.release()(自动从 with sem: 退出)。比如看RateSemaphore() is used(你的情况比较简单)。
  • 像这样?:'maxconnections = 6 sem = BoundedSemaphore(value=maxconnections) with sem: pool = Pool(processes=12) pool.map(run_assignments_parallel,project_list)'
【解决方案2】:

您可以使用MPipe 模块。

创建一个 6 人的单阶段管道,并将您的所有项目作为任务提供。然后只需阅读最后的结果(在你的情况下,状态)。

from mpipe import Pipeline, OrderedStage

...    

pipe = Pipeline(OrderedStage(run_assignments_parallel), 6)    

for project in project_list:
   pipe.put(project)

pipe.put(None)  # Signal end of input.

for status in pipe.results():
   print(status)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2016-10-07
    • 1970-01-01
    • 1970-01-01
    • 2018-06-30
    • 2012-07-11
    • 1970-01-01
    • 1970-01-01
    • 2013-06-23
    相关资源
    最近更新 更多