【问题标题】:Python threading: make the main thread report the progressPython线程:让主线程报告进度
【发布时间】:2019-04-30 09:35:28
【问题描述】:

我并行运行一些作业,有时可能需要很长时间,所以我希望主线程报告进度。例如,每小时。

以下是我想出的简化版本。代码将在 2 个线程中运行 test_function,参数来自 input_arguments。每 5 秒打印 % 的已完成作业。

import threading
import queue
import time


def test_function(x):
    time.sleep(4)
    print("Finished ", x)


num_processes = 2
input_arguments = range(10)

# Define a worker which will continuously execute function taking input parameters from the queue
def worker():
    while True:
        x = q.get()
        if x is None:
            break
        test_function(x)
        q.task_done()

# Initialize queue and the threads
q = queue.Queue()
threads = []
for i in range(num_processes):
    t = threading.Thread(target=worker)
    t.start()
    threads.append(t)

# Create a queue of input parameters for function
for item in input_arguments:
    q.put(item)

# Report progress every 5 seconds
report_progress(q)

# stop workers
for i in range(num_processes):
    q.put(None)
for t in threads:
    t.join()

其中report_progress定义如下

def report_progress(q):
    qsize_init = q.qsize()
    while not q.empty():
        time.sleep(5)
        portion_finished = 1 - q.qsize() / qsize_init
        print("run_parallel: {:.1%} jobs are finished".format(portion_finished))

但是,我想每小时报告一次进度而不是 5 秒,如果所有作业都完成了,程序可能会闲置几分钟。

另一种可能性是对report_progress 进行不同的定义:

def report_progress(q):
    qsize_init = q.qsize()
    time_start = time.time()
    while not q.empty():
        current_time = time.time()
        if current_time - time_start > 5:
            portion_finished = 1 - q.qsize() / qsize_init
            print("run_parallel: {:.1%} jobs are finished".format(portion_finished))
            time_start = time.time()

我担心不断检查这种情况会消耗 CPU 资源,只是一小部分,但在几个小时的范围内可能会很多。

有标准的处理方法吗?

Python:3.6

【问题讨论】:

  • 也许你应该依赖事件而不是轮询。在这种情况下,您需要将事件超时设置为 1h,如果事件发生,您将继续。 a related answer
  • 同时轮询多个事件的标准方式(例如等待 1 小时超时和进程死亡)是 IO 多路复用。在 python 中,对这些东西最友好的界面是 asyncio 模块。
  • 但是对于您相当简单的情况,使用标准方式可能太复杂了。你可能只是:(1) - 睡眠 10 秒,(2) - 检查线程状态,如果程序死了就退出程序,(3)如果自上次进度报告过去了一个小时,计算并打印一个新的,(4)转到(1)
  • @AndriyMaletsky 我已经有了一些基于threading 模块的代码,我想在它上面做一个小的补充。如果我使用asyncio,我将不得不全部重写,是否正确?您第二条评论中的想法似乎是一个很好的妥协
  • 你说得对,asyncio 很难与现有代码集成。相当常见的模式是运行 asyncio 的“管理器”进程,它安排工作,检查进度,检查所有作业是否已被用户取消,换句话说,一次做很多事情。以及作为子进程启动的“工作”进程,因此与 asyncio 隔离。这种方法允许不重写工作代码

标签: python multithreading python-multithreading


【解决方案1】:

现在我将使用@Andriy Maletsky 在 cmets 中建议的简单解决方案。

如果q不为空,主线程将每隔几秒检查一次,如果自上次报告超过1小时,它将打印一条进度消息。

time_between_reports = 3600
time_between_checks = 5
def report_progress_until_finished(q):
    qsize_init = q.qsize()
    last_report_time = time.time()
    while not q.empty():
        time_elapsed = time.time() - last_report_time
        if time_elapsed > time_between_reports:
            portion_finished = 1 - q.qsize() / qsize_init
            print("run_parallel: {:.1%} jobs are finished".format(portion_finished))
            last_report_time = time.time()
        time.sleep(time_between_checks)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多