【问题标题】:Python Multiprocessing using Queue to write to same filePython多处理使用队列写入同一个文件
【发布时间】:2013-03-09 23:04:39
【问题描述】:

我知道 Stack Exchange 上有很多关于将结果从多处理写入单个文件的帖子,我只阅读了这些帖子就开发了我的代码。我想要实现的是并行运行“RevMapCoord”函数,并使用 multiprocess.queue 将其结果写入一个文件中。但是我在排队工作时遇到了问题。我的代码:

def RevMapCoord(list):
    "Read a file, Find String and Do something"

def feed(queue, parlist):
    for par in parlist:
        print ('Echo from Feeder: %s' % (par))
        queue.put(par)
    print ('**Feeder finished queing**')

def calc(queueIn, queueOut):
     print ('Worker function started')
     while True:
         try:
             par = queueIn.get(block = False)
             res = RevMapCoord(final_res)
             queueOut.put((par,res))
         except:
             break

def write(queue, fname):
    fhandle = open(fname, "w")
    while True:
         try:
            par, res = queue.get(block = False)
            print >>fhandle, par, res
         except:
            break
    fhandle.close()


feedProc = Process(target = feed , args = (workerQueue, final_res))
calcProc = [Process(target = calc , args = (workerQueue, writerQueue)) for i in range(nproc)]
writProc = Process(target = write, args = (writerQueue, sco_inp_extend_geno))

feedProc.start()
print ('Feeder is joining')
feedProc.join ()
for p in calcProc:
    p.start()
for p in calcProc:
    p.join()
writProc.start()
writProc.join ()

当我运行此代码时,脚本卡在“feedProc.start()”步骤。屏幕的最后几行输出显示了“feedProc.start()”末尾的打印语句:

Echo from Feeder: >AK779,AT61680,50948-50968,50959,6,0.406808,Ashley,Dayne
Echo from Feeder: >AK832,AT30210,1091-1111,1102,7,0.178616,John,Caine
**Feeder finished queing**

但在执行下一行“feedProc.join()”之前挂起。代码没有错误并继续运行但什么都不做(挂起)。请告诉我我犯了什么错误。

【问题讨论】:

    标签: python file-io queue multiprocessing


    【解决方案1】:

    我认为你应该把你的例子精简到基础。例如:

    from multiprocessing import Process, Queue
    
    def f(q):
        q.put('Hello')
        q.put('Bye')
        q.put(None)
    
    if __name__ == '__main__':
        q = Queue()
        p = Process(target=f, args=(q,))
        p.start()
        with open('file.txt', 'w') as fp:
            while True:
                item = q.get()
                print(item)
                if item is None:
                    break
                fp.write(item)
        p.join()
    

    这里我有两个进程(主进程,一个p)。 p 将字符串放入队列中,由主进程检索。当主进程找到 None (我用来指示:“我完成了”的哨兵)时,它会中断循环。

    将此扩展到许多进程(或线程)是微不足道的。

    【讨论】:

    • 您应该尝试运行您的示例(它会给出错误)。您不能以这种方式将多个项目放入队列中。你实际上只是把一个项目放在一个列表上。
    • TypeError: expected a character buffer object 我错了:|
    • @b1- new(正确,感谢 Gerrat)版本适用于 python 2.7.5 和 3.2.3。试试看!
    【解决方案2】:

    我通过使用 Python3 中的“map_async”函数实现了将多处理结果写入单个文件。这是我写的函数:

    def PPResults(module,alist):##Parallel processing
        npool = Pool(int(nproc))    
        res = npool.map_async(module, alist)
        results = (res.get())###results returned in form of a list 
        return results
    

    因此,我在“a_list”中为该函数提供了一个参数列表,“module”是一个执行处理并返回结果的函数。上述函数继续以列表的形式收集结果,并在处理完'a_list'中的所有参数后返回。结果可能不是正确的顺序,但由于顺序对我来说并不重要,所以效果很好。 “结果”列表可以迭代,并将单个结果写入文件中,例如:

    fh_out = open('./TestResults', 'w')
    for i in results:##Write Results from list to file
        fh_out.write(i)
    

    为了保持结果的顺序,我们可能需要使用类似于我在问题(上文)中提到的“队列”。虽然我能够修复代码,但我相信这里不需要提及。

    谢谢

    AK

    【讨论】:

      猜你喜欢
      • 2014-12-23
      • 1970-01-01
      • 1970-01-01
      • 2017-08-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-10-02
      • 1970-01-01
      相关资源
      最近更新 更多