【问题标题】:Python multitprocessing outputs in order?Python多处理输出顺序?
【发布时间】:2019-01-16 02:01:51
【问题描述】:

我正在使用多处理对一组数据进行大量计算以减少计算时间。它工作得非常好,除了一个小警告,当我让我的侦听器进程编写我的输出时,它以错误的顺序出现,这绝对是糟糕的。我需要它以相同的顺序出现。不知道如何实现这一点。示例代码如下。

import numpy, os, multiprocessing
from multiprocessing.sharedctypes import Value, Array, RawArray, RawValue
from multiprocessing import Process, Lock

def domorestuff(value):
    value += value # sample, some other calculation
    q.put(value)
    return

def dostuff(somevalue):
    somevalue += 1 # do some calculation instead of just +=1 here
    domorestuff(somevalue)
    return

def listener(q):
    f = open(os.path.join(outdir, fileout.value), 'w')
    while 1:
        #print("Listener...", flush=True)
        m = q.get()
        if(m == 'kill'):
            break
        #print("Listen write...", flush=True)
        f.write(str(m) + '\n')
        f.flush()
    f.close()

def main():
    manager = multiprocessing.Manager()
    q = manager.Queue()
    pool = multiprocessing.Pool(9)
    watcher = pool.apply_async(listener, (q,))
    pool.map(dostuff, range(8))
    q.put('kill')
    pool.close()

我希望它在文件中给我一组线性值,即:

2, 4, 6, 8, 10, 12, 14, 18

但它们每次都以随机顺序出现。不知如何同步事情,当我不使用监听器并且不进行文件写入时,它似乎按线程数按顺序加入进程。但很难确定,因为我无法安全地将多个线程的输出写入单个文件。

为了更清楚一点,处理发生在输入文件上,每个线程读取它需要的部分,然后根据处理将输出写入侦听器。但是,如上所述,它不是按顺序排列的,而是随机排列的。

【问题讨论】:

    标签: python file queue python-multiprocessing


    【解决方案1】:

    您正在异步运行您的流程。您不能期望这些独立的进程以任何预期的顺序处理/完成它们的任务。

    【讨论】:

    • 只有文件写入项异步运行,其他进程不运行。由于它们不是异步的,它们应该根据pool.map 函数的文档按顺序完成并提交队列请求。
    【解决方案2】:

    @M.Rau 实际上并不正确,您可以运行池中的作业并将它们重新组合在一起以保持顺序,幸运的是 multiprocessing 模块使用 pool.apply_asyncpool.imap 内置了此功能.

    我稍微清理了你的代码(注意队列已经完全消失了),这就是我想出的:

    import numpy, os, multiprocessing
    
    def domorestuff(value):
        return value + value # sample, some other calculation
    
    def dostuff(somevalue):
        somevalue += 1 # do some calculation instead of just +=1 here
        return domorestuff(somevalue)
    
    def main():
        pool = multiprocessing.Pool(9)
        out = list(pool.imap(dostuff, range(8)))
        pool.close()
        print (out)
    

    有关更多信息,请查看an example from the official docs。他们在那里解释了不同的技术。顺便说一句,您的问题中的 python 代码甚至无法编译,并且侦听器函数是无关紧要的。希望这会有所帮助!

    【讨论】:

    • 这里的问题是我正在处理和输出的很多数据都非常大,包括几个千兆字节的文件,我不能一直将代码完全存储在内存中。此外,由于代码往往一次运行几个小时,因此任何中断都是不好的,所以我需要能够在处理过程中将项目写入磁盘,以便在需要时可以从中断的地方恢复。关于监听器函数,它不是无关紧要的,它在使用q.put() 时触发。我不确定为什么它没有为你编译,但也许我在从其他 PC 重新输入时出现了转录错误
    猜你喜欢
    • 2020-11-25
    • 2014-11-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-05-13
    • 2013-03-30
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多