【问题标题】:Python Multiprocessing for a Single Output File (CSV)用于单个输出文件 (CSV) 的 Python 多处理
【发布时间】:2025-12-13 11:50:01
【问题描述】:

我正在寻找 Python 中 MultiProcessing 的一些很好的示例代码,这些代码将接收一个大数组(分成同一个主数组的不同部分),以加快后续输出文件的处理。我注意到还有其他的东西,比如 Lock() 函数,以确保它以特定的顺序返回,但不是一个很好的例子,说明如何在作业运行时返回结果数组,以便我可以输出单个 CSV 文件以正确的时间序列顺序。

以下是到目前为止我一直在处理队列的内容。如何分配 q1.get() 或其他人的结果以便以后重新组合?当我尝试使用 temp = q1.get()... 分配它时,它只是旋转... 并且拆分数组,将其发送到多个进程,然后重新组合所调用函数的结果的良好示例将不胜感激。我正在使用 Python 3.7 和 Windows 10。

import time
import multiprocessing
from multiprocessing import Process, Queue

def f1(q, testArray):
    testArray2 = [[41, None, 'help'], [42, None, 'help'], [43, None, 'help']]
    testArray =  testArray + testArray2
    q.put(testArray)

def f2(q, testArray):
    #testArray.append([43, None, 'goodbye'])
    testArray =  testArray + ([44, None, 'goodbye'])
    q.put(testArray)
    return testArray

if __name__ == '__main__':
    print("Number of cpu : ", multiprocessing.cpu_count())
    testArray1 = [1]
    testArray2 = [2]
    q1 = Queue()
    q2 = Queue()
    p1 = multiprocessing.Process(target=f1, args=(q1, testArray1,))
    p2 = multiprocessing.Process(target=f2, args=(q2, testArray2,))

    p1.start()
    p2.start()   

    print(q1.get())      # prints whatever you set in function above
    print(q2.get())      # prints whatever you set in function above
    
    print(testArray1)
    print(testArray2)

    p1.join()
    p2.join()

【问题讨论】:

  • 更好地使用multiprocessing.Pool,你会得到正确的结果。
  • 这个例子不需要使用队列——只需使用return。您只有两个进程分配给p1p2,因此您可以轻松控制订单-首先是p1,然后是p2
  • 如果你有更复杂的例子,那么你可以枚举数据(为数据分配数字)并用这个数字发送数据,然后用相同的数字发回结果 - 然后你可以使用这个数字维持秩序。

标签: python arrays python-3.x multiprocessing python-multiprocessing


【解决方案1】:

我相信您的所有流程只需要一个队列。队列是为进程间通信而设计的。

对于排序,您可以传入进程 ID 并在结果连接后根据该 ID 进行排序。或者您可以按照 furas 的建议尝试使用多处理池。

这听起来像是更好的方法。工作池通常会预先分配一个工作池,然后在该池上运行一组作业。这更有效,因为进程/线程最初设置并用于作业。您的实施将在哪里创建每个作业/功能的流程,这取决于您处理的数据量。

【讨论】:

    最近更新 更多