【问题标题】:Python Process Terminate before IO completedPython 进程在 IO 完成前终止
【发布时间】:2016-01-12 17:05:09
【问题描述】:

我正在使用 python 多处理库来处理一组进程中的信息。这些流程还包含进一步划分必须完成的工作量的流程。有一个 Manager.Queue 用于累积所有使用数据的进程的结果。

在python脚本的主线程中。我尝试使用连接来阻塞主线程,直到我们可以合理地确定是否所有子进程都已完成,然后将输出写入单个文件。然而,在所有数据写入文件之前,系统终止并且文件关闭。

以下代码是上述解决方案的实现的简化提取。 对于 inQueues 中的队列: queue.join()

for p in processes:
    p.join() 
print "At the end output has: " + str(out_queue.qsize()) + " records" 

with open("results.csv", "w") as out_file:
    out_file.write("Algorithm,result\n")
    while not out_queue.empty():
        res = out_queue.get()
        out_file.write(res['algorithm'] + ","+res['result']+"\n")
        out_queue.task_done()
        time.sleep(0.05)
    out_queue.join()
    out_file.close()

out_queue.qsize() 将打印超过 500 条可用记录,但仅将 100 条打印到文件中。 同样在这一点上,我不能 100% 确定系统是否总共生成了 500 条记录,而只是此时报告的数量。

如何确保将所有结果写入 results.csv 文件?

【问题讨论】:

  • qsize(): "返回队列的大概大小。由于多线程/多处理语义,这个数字不可靠。"
  • 我知道 qsize 方法指示的队列大小可以更改,但是代码部分是整个程序中唯一从队列中删除的部分,因此不需要打印的记录数将小于队列的大小(这是当前发生的情况)。

标签: python multithreading io multiprocessing


【解决方案1】:

不要等所有进程完成后再消费数据,而是同时处理数据并记住哪些进程仍在运行:

processes = []

"""start processes and append them to processes"""

while True:
    try:
        # get an item
        item = queue.get(True, 0.5)
    except Queue.Empty:
        # no item received in half a second
        if not processes:
            # there are no more processes and nothing left to process
            break
        else:
            proc_num = 0
            while proc_num < len(processes):
                process = processes[proc_num]
                exit_code = process.poll()
                if exit_code is None:
                    # process is still running, proceed to next
                    proc_num += 1
                elif exit_code == 0:
                    # process ended gracefully, remove it from list
                    processes.pop(proc_num)
                else:
                    # process ended with an error, what now?
                    raise Exception('Her last words were: "%r"' % exit_code)
    else:
        # got an item
        """process item"""

不要测试processesQueue.Empty 之外是否为空,否则您将拥有races

但也许你会更高兴higher level function

pool = multiprocessing.Pool(8)
items = pool.map_async(producer_function, producer_arguments)
for item in items:
    """process item"""

【讨论】:

    猜你喜欢
    • 2017-08-31
    • 1970-01-01
    • 2011-10-12
    • 2021-06-01
    • 2011-04-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多