【问题标题】:Simultaneous file writing using python multiprocessing使用python多处理同时写入文件
【发布时间】:2016-04-24 22:49:40
【问题描述】:

我正在开发一个项目,其中我使用大约 6 个传感器输入 Beaglebone Black,它将数据保存到 6 个不同的文件中连续。通过另一个 SO 问题 (https://stackoverflow.com/a/36634587/2615940),我了解到多处理模块会为我执行此操作,但是在运行我的新代码时,我只获得 1 个文件而不是 6 个。如何修改此代码以获得所需的 6结果文件?

*我已经根据下面 skrrgwasme 的建议编辑了我的文件以包含 Manager,但现在代码运行了,但什么也没产生。没有错误,没有文件。只是跑。

代码:

import Queue
import multiprocessing
import time

def emgacq(kill_queue, f_name, adcpin):
     with open(f_name, '+') as f:
        while True:
            try:
                val = kill_queue.get(block = False)
                if val == STOP:
                    return
            except Queue.Empty:
                pass
            an_val = ADC.read(adcpin) * 1.8
            f.write("{}\t{}\n".format(ms, an_val))
def main():    
    #Timing stuff
    start = time.time()
    elapsed_seconds = time.time() - start
    ms = elapsed_seconds * 1000

    #Multiprcessing settings
    pool = multiprocessing.Pool()
    m = multiprocessing.Manager()
    kill_queue = m.Queue()            

    #All the arguments we need run thru emgacq()
    arg_list = [
        (kill_queue, 'HamLeft', 'AIN1'),
        (kill_queue, 'HamRight', 'AIN2'),
        (kill_queue, 'QuadLeft', 'AIN3'),
        (kill_queue, 'QuadRight', 'AIN4'),
        (kill_queue, 'GastLeft', 'AIN5'),
        (kill_queue, 'GastRight',  'AIN6'),
        ]

    for a in arg_list:
        pool.apply_async(emgacq, args=a) 

    try: 
        while True:
            time.sleep(60) 
    except KeyboardInterrupt:
        for a in arg_list:
            kill_queue.put(STOP)
        pool.close()
        pool.join()
        raise f.close()

if __name__ == "__main__":
    main()

【问题讨论】:

  • 鉴于您一直在处理这两个问题的问题,我强烈建议您阅读一些基本的 Python 教程。您似乎对函数调用、变量赋值和参数传递等基本概念有些困惑。如果您能够在深入研究下一个脚本/程序之前了解这些基础知识,您将会取得更大的成功。

标签: python file debian multiprocessing beagleboneblack


【解决方案1】:

您的主要问题是您的子流程函数的参数列表不正确:

f_list = [
    emgacq(kill_queue, 'HamLeft', 'AIN1'), 
    # this calls the emgacq function right here - blocking the rest of your 
    # script's execution

另外,您的apply_async 呼叫是错误的:

for f in f_list:
    pool.apply_async(f, args=(kill_queue)) 
    # f is not a function here - the arguments to the apply_async function
    # should be the one function you want to call followed by a tuple of 
    # arguments that should be provided to it

您想要这个,其中还包括一个用于队列的manager(请参阅https://stackoverflow.com/a/9928191/2615940)并将您的所有代码放入一个main 函数中:

# put your imports here
# followed by the definition of the emgacq function

def main():

    #Timing stuff
    start = time.time()
    elapsed_seconds = time.time() - start
    ms = elapsed_seconds * 1000

    pool = multiprocessing.Pool()
    m = multiprocessing.Manager()
    kill_queue = m.Queue()

    arg_list = [
            (kill_queue, 'HamLeft', 'AIN1'), 
            (kill_queue, 'HamRight', 'AIN2'),
            (kill_queue, 'QuadLeft', 'AIN3'),
            (kill_queue, 'QuadRight', 'AIN4'),
            (kill_queue, 'GastLeft', 'AIN5'),
            (kill_queue, 'GastRight',  'AIN6'),
        ]

    for a in arg_list:
        pool.apply_async(emgacq, args=a)
        # this will call the emgacq function with the arguments provided in "a"

if __name__ == "__main__":
    # you want to have all of your code in a function, because the workers
    # will start by importing the main module they are executing from,
    # and you don't want them to execute that code all over again
    main()

【讨论】:

  • 我不明白这里发生了什么。现在似乎有一个继承问题。具体来说,“队列对象只能通过继承在进程之间共享。”
  • 我编辑了主帖,这样您就可以看到当我尝试Manager. 时发生了什么,我是否需要调整我使用kill_queue 的位置,因为它的定义已经改变了? (kill_queue = m.Queue() 而不是kill_queue = multiprocessing.Queue
猜你喜欢
  • 2013-03-09
  • 2017-08-22
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-12-23
  • 2012-11-06
相关资源
最近更新 更多