【问题标题】:AttributeError: Can't pickle local object 'computation.. function1 using multiprocessing queueAttributeError: Can't pickle local object 'computation.. function1 using multiprocessing queue
【发布时间】:2018-08-30 07:09:16
【问题描述】:

我使用调度程序和多处理模块有以下代码:

def computation():
    def function1(q):
        while True:
            daydate = datetime.now()
            number = random.randrange(1, 215)
            print('Sent to function2: ({}, {})'.format(daydate, number))
            q.put((daydate, number))
            time.sleep(2)

    def function2(q):
        while True:
            date, number = q.get()
            print("Recevied values from function1: ({}, {})".format(date, number))
            time.sleep(2)

    if __name__ == "__main__":
        q = Queue()
        a = Process(target=function1, args=(q,))
        a.start()
        b = Process(target=function2, args=(q,))
        b.start()
        a.join()
        b.join()

schedule.every().monday.at("08:45").do(computation)
schedule.every().tuesday.at("08:45").do(computation)

while True:
    schedule.run_pending()
    time.sleep(1)

但是在执行代码时出现以下错误:

AttributeError: Can't pickle local object 'computation.. 功能1

还有:

OSError: [WinError 87] 参数不正确

如何解决这个问题?我试图通过在文档 (https://docs.python.org/2/library/pickle.html#what-can-be-pickled-and-unpickled) 中所述的模块顶层定义一个函数来解决此问题,但它仍然给出相同的错误。

【问题讨论】:

    标签: python queue multiprocessing scheduler schedule


    【解决方案1】:

    嵌套函数不是在顶层定义的函数,这就是您收到错误的原因。您需要将function1function2的定义移到外面 的计算。

    按照您的编写方式,您的流程将立即开始,而不是在您安排它们运行的​​日期。这可能符合您的预期:

    import os
    import time
    import random
    from multiprocessing import Process, Queue
    from threading import Thread
    from datetime import datetime
    import schedule
    
    
    def function1(q):
        while True:
            daydate = datetime.now()
            number = random.randrange(1, 215)
            fmt = '(pid: {}) Sent to function2: ({}, {})'
            print(fmt.format(os.getpid(), daydate, number))
            q.put((daydate, number))
            time.sleep(2)
    
    
    def function2(q):
        while True:
            date, number = q.get()
            fmt = "(pid: {}) Received values from function1: ({}, {})"
            print(fmt.format(os.getpid(), date, number))
            # time.sleep(2) no need to sleep here because q.get will block until
            # new items are available
    
    
    def computation():
        q = Queue()
        a = Process(target=function1, args=(q,))
        a.start()
        b = Process(target=function2, args=(q,))
        b.start()
        a.join()
        b.join()
    
    
    if __name__ == "__main__":
    
        # We are spawning new threads as a launching platform for
        # computation. Without it, the next job couldn't start before the last
        # one has finished. If your jobs always end before the next one should 
        # start, you don't need this construct and you can just pass 
        # ...do(computation)
        schedule.every().friday.at("01:02").do(
            Thread(target=computation).start
        )
        schedule.every().friday.at("01:03").do(
            Thread(target=computation).start
        )
    
        while True:
            schedule.run_pending()
            time.sleep(1)
    

    就像现在一样,您的进程将在启动一次后永远运行。如果这不是您想要的,您必须考虑实施一些停止条件。

    【讨论】:

      猜你喜欢
      • 2022-11-12
      • 2021-10-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-08-21
      • 1970-01-01
      • 2010-11-27
      • 1970-01-01
      相关资源
      最近更新 更多