【问题标题】:How to run multiple blocking IO coroutines in parallel如何并行运行多个阻塞 IO 协程
【发布时间】:2016-07-03 13:24:28
【问题描述】:

我有一个用协程实现的长时间运行的管道。这通常是获取日志流,执行一些丰富(线程化)并将它们写入数据存储。

这是一个模拟管道的小例子:

import time
import random
from concurrent import futures

def coroutine(func):
    def start(*args, **kwargs):
        cr = func(*args, **kwargs)
        next(cr)
        return cr
    return start

@coroutine
def foo():
    pool = futures.ThreadPoolExecutor(max_workers=10)
    while True:
        i = (yield)
        fut = pool.submit(enrich, i)
        fut.add_done_callback(result_handler)
        time.sleep(random.random()*10)

def enrich(i):
    enriched = 'foo' + str(i)
    time.sleep(random.random())
    return enriched

def source(name, target):
    while True:
        time.sleep(random.random())
        i = random.randint(0,10)
        target.send(name + str(i))

如下调用的单个管道可以正常工作。

source('task one ', foo())

现在,我想在后台线程中为不同的日志运行多个管道。一种尝试是再次使用 ThreadPoolExecutor 来驱动多个管道。

def run():
    pool = futures.ThreadPoolExecutor(max_workers=10)
    tasks = [source('task one ', foo()),
             source('task two ', foo())]
    for task in tasks:
        fut = pool.submit(task)
        fut.add_done_callback(result_handler)

但是,管道在第一个任务之后阻塞并且永远不会执行第二个任务。在后台线程中运行如此长时间(可能永远)管道的正确方法是什么?

【问题讨论】:

  • run 函数中,您将source('task one ', foo() 的结果传递给pool.submit(task)。应该是pool.submit(source, 'task one', foo())
  • 或者,tasks = [(source, 'task one', foo()), (source, 'task two', foo())]。然后pool.submit(*task)`
  • 太棒了!这解决了我在这里提出的模拟问题。实际的管道非常复杂,我仍然会收到一些我必须检查的回调异常。感谢您的快速回复!
  • 让我把它作为答案发布。

标签: python multithreading concurrency threadpoolexecutor


【解决方案1】:

因为source 函数是永无止境的,所以不会创建tasks = [source('task one ', foo()), source('task two ', foo())] 列表。这就是第一个任务运行而管道阻塞的原因。
解决方案是将source 及其参数传递给pool.submit

tasks = [(source, 'task one', foo()), (source, 'task two', foo())]
for task in tasks:
    fut = pool.submit(*task)
    fut.add_done_callback(result_handler)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-11-04
    • 1970-01-01
    • 1970-01-01
    • 2019-05-22
    • 2021-02-23
    • 1970-01-01
    • 2020-01-04
    • 1970-01-01
    相关资源
    最近更新 更多