【问题标题】:How to code consumer.producer with python asyncio?如何使用 python asyncio 对 consumer.producer 进行编码?
【发布时间】:2019-11-23 08:38:28
【问题描述】:

我的 Python 版本是 3.6.1。

我写了一些东西来使用 Python asyncio 实现消费者-生产者模型。 但它并没有按预期工作。

全部创建了四个事件,但没有任何打印导出。

async def consumer(queue, id):
    while True:
        val = await queue.get()
        print('{} get a val: {}'.format(id, val))
        await asyncio.sleep(1)

async def producer(queue, id):
    for i in range(5):
        val = random.randint(1, 10)
        await queue.put(val)
        print('{} put a val: {}'.format(id, val))
        await asyncio.sleep(1)

async def main():
    queue = asyncio.Queue()

    consumer_1 = asyncio.ensure_future(consumer(queue, 'consumer_1'))
    consumer_2 = asyncio.ensure_future(consumer(queue, 'consumer_2'))

    producer_1 = asyncio.ensure_future(producer(queue, 'producer_1'))
    producer_2 = asyncio.ensure_future(producer(queue, 'producer_2'))

    await asyncio.sleep(10)
    consumer_1.cancel()
    consumer_2.cancel()

    await asyncio.gather(consumer_1, consumer_2, producer_1, producer_2, return_exceptions=True)

loop = asyncio.get_event_loop()
tasks = [asyncio.ensure_future(main())]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

你能帮我改正吗?

【问题讨论】:

  • 我已经成功使用multiprocessing.Queue 而不是asyncio.Queue docs.python.org/3.7/library/…
  • @abdusco 抱歉,我忘记了我的 Python 版本是 3.6.1。我将 asyncio.Queue 更改为 multiprocessing.Queue。程序在 consumer_1 中暂停。

标签: python consumer producer python-asyncio


【解决方案1】:

你的方法中有很多错误的地方,其中一些是:

  • 对于生产者来说是不必要的asyncio.ensure_future
  • asyncio.gathering 以任意顺序处理可疑项目
  • 不必要的tasks = [asyncio.ensure_future(main())]asyncio.wait(tasks) 操作
  • 不受控制queue

异步生产者/消费者方案示例:https://asyncio.readthedocs.io/en/latest/producer_consumer.html


正确的生产者/消费者方案如下所示(针对您的情况):

import asyncio
import random

async def consumer(queue, id):
    while True:
        val = await queue.get()
        print('{} get a val: {}'.format(id, val))
        await asyncio.sleep(1)
        queue.task_done()   # indicate complete task

async def producer(queue, id):
    for i in range(5):
        val = random.randint(1, 10)
        await asyncio.sleep(1)
        await queue.put(val)
        print('{} put a val: {}'.format(id, val))


async def main():
    queue = asyncio.Queue()

    producer_1 = producer(queue, 'producer_1')
    producer_2 = producer(queue, 'producer_2')

    consumer_1 = asyncio.ensure_future(consumer(queue, 'consumer_1'))
    consumer_2 = asyncio.ensure_future(consumer(queue, 'consumer_2'))

    await asyncio.gather(*[producer_1, producer_2], return_exceptions=True)
    await queue.join()  # wait until the consumer has processed all items
    consumer_1.cancel()
    consumer_2.cancel()

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close() 

输出:

producer_1 put a val: 7
producer_2 put a val: 2
consumer_1 get a val: 7
consumer_2 get a val: 2
producer_1 put a val: 9
producer_2 put a val: 2
consumer_1 get a val: 9
consumer_2 get a val: 2
producer_1 put a val: 9
producer_2 put a val: 3
consumer_1 get a val: 9
consumer_2 get a val: 3
producer_1 put a val: 1
producer_2 put a val: 6
consumer_1 get a val: 1
consumer_2 get a val: 6
producer_1 put a val: 2
producer_2 put a val: 2
consumer_1 get a val: 2
consumer_2 get a val: 2

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-10-26
    • 2020-09-27
    • 2018-02-13
    • 1970-01-01
    • 2015-11-25
    • 1970-01-01
    • 2018-06-30
    • 2015-11-23
    相关资源
    最近更新 更多