【问题标题】:Python multiprocessing script partial outputPython多处理脚本部分输出
【发布时间】:2018-06-05 09:26:18
【问题描述】:

我遵循post 中规定的原则来安全地输出最终将写入文件的结果。不幸的是,代码只打印 1 和 2,而不是 3 到 6。

import os
import argparse
import pandas as pd
import multiprocessing
from multiprocessing import Process, Queue
from time import sleep


def feed(queue, parlist):

    for par in parlist:
            queue.put(par)
    print("Queue size", queue.qsize())

def calc(queueIn, queueOut):

    while True:
        try:
            par=queueIn.get(block=False)
            res=doCalculation(par)
            queueOut.put((res))
            queueIn.task_done()
        except:
            break

def doCalculation(par):

    return par

def write(queue):
    while True:
        try:
            par=queue.get(block=False)
            print("response:",par)
        except:
            break


if __name__ == "__main__":

    nthreads = 2
    workerQueue = Queue()
    writerQueue = Queue()

    considerperiod=[1,2,3,4,5,6]

    feedProc = Process(target=feed, args=(workerQueue, considerperiod))
    calcProc = [Process(target=calc, args=(workerQueue, writerQueue)) for i in range(nthreads)]
    writProc = Process(target=write, args=(writerQueue,))

    feedProc.start()
    feedProc.join()
    for p in calcProc:
        p.start()

    for p in calcProc:
        p.join()
    writProc.start()
    writProc.join()

在运行它打印的代码时,

$ python3 tst.py
Queue size 6
response: 1
response: 2

另外,是否可以确保写入函数始终输出 1、2、3、4、5、6,即按照将数据送入馈送队列的顺序?

【问题讨论】:

  • 我的帖子回答你的问题了吗?

标签: python-3.x multiprocessing


【解决方案1】:

错误与task_done() 调用有关。如果您删除了那个,那么它就可以工作,不要问我为什么(IMO 那是一个错误)。但是它的工作方式是queueIn.get(block=False) 调用会抛出异常,因为队列是空的。这对于您的用例可能已经足够了,但更好的方法是使用哨兵(如multiprocessing docs, see last example 中所建议的那样)。这里有一点重写,所以你的程序使用哨兵:

import os
import argparse
import multiprocessing
from multiprocessing import Process, Queue
from time import sleep

def feed(queue, parlist, nthreads):
    for par in parlist:
        queue.put(par)
    for i in range(nthreads):
        queue.put(None)
    print("Queue size", queue.qsize())

def calc(queueIn, queueOut):
    while True:
        par=queueIn.get()
        if par is None:
            break
        res=doCalculation(par)
        queueOut.put((res))

def doCalculation(par):
    return par

def write(queue):
    while not queue.empty():
        par=queue.get()
        print("response:",par)


if __name__ == "__main__":
    nthreads = 2
    workerQueue = Queue()
    writerQueue = Queue()

    considerperiod=[1,2,3,4,5,6]

    feedProc = Process(target=feed, args=(workerQueue, considerperiod, nthreads))
    calcProc = [Process(target=calc, args=(workerQueue, writerQueue)) for i in range(nthreads)]
    writProc = Process(target=write, args=(writerQueue,))

    feedProc.start()
    feedProc.join()
    for p in calcProc:
        p.start()

    for p in calcProc:
        p.join()
    writProc.start()
    writProc.join()

需要注意的几点:

  • 哨兵正在将None 放入队列中。请注意,每个工作进程都需要一个哨兵。
  • 对于write 函数,您不需要进行哨兵处理,因为只有一个进程并且您不需要处理并发(如果您要在您的empty() 然后get() calc 函数如果例如队列中只剩下一个项目并且两个工作人员同时检查empty() 然后都想做get() 然后其中一个被永远锁定,那么你会遇到问题)
  • 您无需将 feedwrite 放入进程中,只需将它们放入您的 main 函数中,因为您不想并行运行它。

我怎样才能使输出中的顺序与输入中的顺序相同? [...] 我猜 multiprocessing.map 可以做到这一点

是的map keeps the order。将您的程序重写为更简单的程序(因为您不需要 workerQueuewriterQueue 并添加随机睡眠以证明输出仍然正常:

from multiprocessing import Pool
import time
import random    

def calc(val):
    time.sleep(random.random())
    return val

if __name__ == "__main__":
    considerperiod=[1,2,3,4,5,6]
    with Pool(processes=2) as pool:
        print(pool.map(calc, considerperiod))

【讨论】:

  • 感谢您的回复。这很有帮助。如 OP 底部所述,我如何才能获得与输入相同的输出顺序。我在 doCalculation 函数中插入了sleep(randint(1, 5)),只是为了引入一些复杂性。结果是与初始提要不同的顺序。
  • @trumee 在多处理环境中,您永远无法说出它的处理顺序。事实上,这正是一个进程完成得更快的进程已经开始处理下一个项目的关键所在。为什么要订购它?
  • 每个线程的输出要写入一个文件,因此顺序很重要。我猜 Pool.map 函数可以做到这一点。
  • @trumee:我现在添加了一个部分,其中详细解释了为什么并行性和顺序不能一起使用,以及如何为您的用例解决这个问题
  • @trumee 进一步阅读后我发现地图确实保持顺序,我现在修改了我的答案
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-07-01
  • 1970-01-01
  • 2017-01-08
  • 2017-11-17
  • 1970-01-01
  • 2021-01-11
相关资源
最近更新 更多