【问题标题】:Timeouts for multiprocessing?多处理超时?
【发布时间】:2018-07-29 23:01:14
【问题描述】:

我搜索了 StackOverflow,虽然我发现了很多关于此的问题,但我还没有找到适合我情况的答案/不是一个强大的 Python 程序员来调整他们的答案以满足我的需要。

我看这里无济于事:

kill a function after a certain time in windows

Python: kill or terminate subprocess when timeout

signal.alarm replacement in Windows [Python]

我正在使用多处理同时运行多个 SAP 窗口以提取报告。设置为按计划每 5 分钟运行一次。每隔一段时间,其中一个报告就会由于 GUI 界面而停滞不前,而且永远不会结束。我没有收到错误或异常,它只是永远停止。我想要的是有一个超时功能,在这部分在 SAP 中执行的代码期间,如果它需要超过 4 分钟,它会超时,关闭 SAP,跳过其余代码,并等待下一个计划报告时间。

我使用的是 Windows Python 2.7

import multiprocessing
from multiprocessing import Manager, Process
import time
import datetime

### OPEN SAP ###
def start_SAP():
   print 'opening SAP program'

### REPORTS IN SAP ###
def report_1(q, lock):
   while True:  # logic to get shared queue
       if not q.empty():
           lock.acquire()
           k = q.get()
           time.sleep(1)
           lock.release()
           break
       else:
           time.sleep(1)
   print 'running report 1'

def report_2(q, lock):
   while True:  # logic to get shared queue
       if not q.empty():
           lock.acquire()
           k = q.get()
           time.sleep(1)
           lock.release()
           break
       else:
           time.sleep(1)
   print 'running report 2'

def report_3(q, lock):
   while True:  # logic to get shared queue
       if not q.empty():
           lock.acquire()
           k = q.get()
           time.sleep(1)
           lock.release()
           break
       else:
           time.sleep(1)

   time.sleep(60000) #mimicking the stall for report 3 that takes longer than allotted time
   print 'running report 3'

def report_N(q, lock):
   while True:  # logic to get shared queue
       if not q.empty():
           lock.acquire()
           k = q.get()
           time.sleep(1)
           lock.release()
           break
       else:
           time.sleep(1)
   print 'running report N'

### CLOSES SAP ###
def close_SAP():
   print 'closes SAP'

def format_file():
   print 'formatting files'

def multi_daily_pull():

    lock = multiprocessing.Lock()  # creating a lock in multiprocessing

    shared_list = range(6)  # creating a shared list for all functions to use
    q = multiprocessing.Queue()  # creating an empty queue in mulitprocessing
    for n in shared_list:  # putting list into the queue
        q.put(n)
    print 'Starting process at ', time.strftime('%m/%d/%Y %H:%M:%S')

    print 'Starting SAP Pulls at ', time.strftime('%m/%d/%Y %H:%M:%S')

    StartSAP = Process(target=start_SAP)
    StartSAP.start()
    StartSAP.join()

    report1= Process(target=report_1, args=(q, lock))
    report2= Process(target=report_2, args=(q, lock))
    report3= Process(target=report_3, args=(q, lock))
    reportN= Process(target=report_N, args=(q, lock))

    report1.start()
    report2.start()
    report3.start()
    reportN.start()

    report1.join()
    report2.join()
    report3.join()
    reportN.join()

    EndSAP = Process(target=close_SAP)
    EndSAP.start()
    EndSAP.join()

    formatfile = Process(target=format_file)
    formatfile .start()
    formatfile .join()

if __name__ == '__main__':
    multi_daily_pull()

【问题讨论】:

  • 嗨@mar​​tineau,我已经更新了代码。如果您需要任何进一步的说明,请告诉我!
  • 甚至没有接近...您的代码仍然远非最小。一方面,其中未使用的imports 比实际需要的要多。另外,schedule 模块是什么?您需要发布足够多的代码才能运行和重现问题(但仅此而已)。
  • @martineau 抱歉,我只是在复制我的代码,我已经删除了未使用的 imports 和不必要的类文件。运行代码时,您将看到在第 5 分钟间隔(从 0:00 开始),代码将运行。它将运行报告 1,2 和 N,但不会完成报告 3 的运行。它停止了(我通过长时间使用 sleep 来模仿这一点)。我需要找到一种方法来确定 X 分钟后结束代码运行,运行 close_SAP 函数,并等待下一次计划运行。

标签: python windows python-2.7 timeout multiprocessing


【解决方案1】:

一种方法是使用Process.join() 方法接受的可选超时参数。这将使它最多只阻塞调用线程。

我还设置了每个 Process 实例的 daemon 属性,这样即使它启动的某个进程仍在“运行”(或已挂起),您的主线程也能够终止。

最后一点,您不需要multiprocessing.Lock 来控制对multiprocessing.Queue 的访问,因为它们会自动处理这方面的事情,所以我将其删除。出于其他原因,您可能仍然想要一个,例如控制对 stdout 的访问,以便从各种进程打印到它不会重叠并弄乱输出到屏幕的内容。

import multiprocessing
from multiprocessing import Process
import time
import datetime

def start_SAP():
    print 'opening SAP program'

### REPORTS IN SAP ###
def report_1(q):
    while True:  # logic to get shared queue
        if q.empty():
            time.sleep(1)
        else:
            k = q.get()
            time.sleep(1)
            break

    print 'report 1 finished'

def report_2(q):
    while True:  # logic to get shared queue
        if q.empty():
            time.sleep(1)
        else:
            k = q.get()
            time.sleep(1)
            break

    print 'report 2 finished'

def report_3(q):
    while True:  # logic to get shared queue
        if q.empty():
            time.sleep(1)
        else:
            k = q.get()
            time.sleep(60000) # Take longer than allotted time
            break

    print 'report 3 finished'


def report_N(q):
    while True:  # logic to get shared queue
        if q.empty():
            time.sleep(1)
        else:
            k = q.get()
            time.sleep(1)
            break

    print 'report N finished'

def close_SAP():
    print 'closing SAP'

def format_file():
    print 'formatting files'

def multi_daily_pull():
    shared_list = range(6)  # creating a shared list for all functions to use
    q = multiprocessing.Queue()  # creating an empty queue in mulitprocessing
    for n in shared_list:  # putting list into the queue
        q.put(n)
    print 'Starting process at ', time.strftime('%m/%d/%Y %H:%M:%S')

    print 'Starting SAP Pulls at ', time.strftime('%m/%d/%Y %H:%M:%S')

    StartSAP = Process(target=start_SAP)
    StartSAP.start()
    StartSAP.join()

    report1 = Process(target=report_1, args=(q,))
    report1.daemon = True
    report2 = Process(target=report_2, args=(q,))
    report2.daemon = True
    report3 = Process(target=report_3, args=(q,))
    report3.daemon = True
    reportN = Process(target=report_N, args=(q,))
    reportN.daemon = True

    report1.start()
    report2.start()
    report3.start()
    reportN.start()

    report1.join(30)
    report2.join(30)
    report3.join(30)
    reportN.join(30)

    EndSAP = Process(target=close_SAP)
    EndSAP.start()
    EndSAP.join()

    formatfile = Process(target=format_file)
    formatfile .start()
    formatfile .join()

if __name__ == '__main__':
    multi_daily_pull()

【讨论】:

  • 非常感谢@martineau 以及您对我的耐心。该解决方案完美运行!简单却如此强大。万事如意!
  • 凯文:不客气。另请注意,您可能希望添加代码来检查每个join() 是否超时,如果是,则在其上调用Process.terminate(),以便在主脚本退出时不会留下运行“僵尸”进程。由于这样做对于每个报告流程或多或少都是相同的,因此您可以将所有逻辑放入一个函数中,该函数处理作为参数传递给它的任何给定的详细信息。
猜你喜欢
  • 2016-12-07
  • 1970-01-01
  • 2012-01-25
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-10-25
  • 1970-01-01
  • 2021-03-01
相关资源
最近更新 更多