【问题标题】:Stream producer and consumer with asyncio gather python流生产者和消费者与 asyncio 收集 python
【发布时间】:2018-06-13 16:20:29
【问题描述】:

我为套接字服务器编写了一个脚本,它只监听传入的连接并处理传入的数据。选择的架构是用于套接字管理的asyncio.start_server 和用于在生产者和消费者协程之间传递数据的asyncio.Queues。问题是consume(q1) 函数只执行一次(在第一次脚本启动时)。然后就不再执行了。 run_until_complete(asyncio.gather())这行错了吗?

import asyncio
import functools

async def handle_readnwrite(reader, writer, q1): #Producer coroutine
    data = await reader.read(1024)
    message = data.decode()
    await writer.drain()
    await q1.put(message[3:20])
    await q1.put(None)
    writer.close() #Close the client socket

async def consume(q1): #Consumer coroutine
    while True:
        # wait for an item from the producer
        item = await q1.get()
        if item is None:
            logging.debug('None items') # the producer emits None to indicate that it is done
            break
        do_something(item)

loop = asyncio.get_event_loop()
q1 = asyncio.Queue(loop=loop)
producer_coro = asyncio.start_server(functools.partial(handle_readnwrite, q1=q1), '0.0.0.0', 3000, loop=loop)
consumer_coro = consume(q1)
loop.run_until_complete(asyncio.gather(consumer_coro,producer_coro))

try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

loop.close()

【问题讨论】:

  • 可能问题是handle_readnwrite 总是将None 排入队列,这导致consume 中断(并因此完成协程)。您如何期望consume 处理多条消息?
  • 非常感谢@user4815162342!它现在工作正常。 进程应该一直在运行。
  • 我现在添加了相同的解释作为答案。

标签: python-3.x queue python-asyncio asyncsocket


【解决方案1】:

handle_readnwrite 总是将None 终止符排入队列,这会导致consume 中断(并因此完成协程)。如果consume 应该继续运行并处理其他消息,则不得在每条消息之后发送None 终止符。

【讨论】:

    猜你喜欢
    • 2021-07-12
    • 1970-01-01
    • 2011-11-08
    • 2018-02-10
    • 2019-08-19
    • 1970-01-01
    • 2011-07-23
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多