【问题标题】:How to handle multiple websocket messages at the same time?如何同时处理多个 websocket 消息?
【发布时间】:2021-05-29 12:33:06
【问题描述】:

如果有人可以在 Python 和 async/await 方面帮助我,我们将不胜感激!

我需要监听 websocket 的消息,所以我设置了以下代码:

import websockets
import asyncio

my_socket = "ws://......."

# I set a "while True" here to reconnect websocket if it stop for any reason
while True:
    try:
        async with websockets.connect(my_socket) as ws:
            # I set a "while True" here to keep listening to messages forever
            while True:
                await on_message(await ws.recv())
    # If websocket gets closed for any reason, we catch exception and wait before new loop
    except Exception as e:
        print(e)
    # Wait 10 secs before new loop to avoid flooding server if it is unavailable for any reason
    await asyncio.sleep(10)

async def on_message(message):
    # Do what needs to be done with received message
    # This function is running for a few minutes, with a lot of sleep() time in it..
    # .. so it does no hold process for itself

我想做的是:

  • 收听消息
  • 收到消息后,立即使用 on_message() 函数应用各种操作,持续几分钟
  • 在之前的消息仍在处理中时继续收听消息on_message()

实际发生的情况:

  • 收听消息
  • 接收消息并启动on_message()函数
  • 然后程序在接收任何新消息之前等待on_message()函数结束,这需要几分钟,并使第二条消息延迟等等

我确实理解它为什么这样做,正如await on_message() 明确表示的那样:等待 on_message() 结束,这样它就不会回去收听新消息了。我不知道的是如何处理消息而无需等待此函数结束。

我的on_message() 函数有很多空闲时间和一些await asyncio.sleep(1),所以我知道我可以同时运行多个任务。

那么,我怎样才能在运行第一个任务的同时继续收听新消息?

【问题讨论】:

  • await on_message(await ws.recv()) 更改为asyncio.create_task(on_message(await ws.recv()))
  • 感谢@user4815162342 这是解决我问题的完美方法!我期待了解更多关于 asyncio.create_task 的信息,尽管它已经像魅力一样发挥作用!

标签: python asynchronous async-await python-asyncio


【解决方案1】:

简而言之,你需要将await on_message(await ws.recv())改为asyncio.create_task(on_message(await ws.recv()))

正如您正确指出的那样,await 对您不起作用,因为它意味着等待任务完成。尽管代码是异步的,但从某种意义上说,它是由事件循环驱动的,并且您可以并行启动许多此类任务,但每个单独的循环都是顺序的。

await 的替代方法是使用 asyncio.create_task() 在后台生成作业。这将创建一个任务,该任务将分段执行协程(两个等待挂起的每个部分),其中散布着其他活动协程的等效片段。 create_task() 将返回任务的句柄,您可以(并且可能在某些时候应该)等待任务完成并获得其结果或异常。由于在您的情况下您不关心结果,因此您甚至不需要存储任务。

【讨论】:

  • 确实,asyncio.create_task() 已经解决了这个问题。感谢您的精彩解释,感谢您的几句好话,我想我对 Python 中的 async 有了更好的理解!如果我出于任何原因想要等待这些,我是否应该将任务放在 var 中,然后使用 await 和/或 asyncio.gather()
  • @KarmaciouS 正确。在某处等待它们的第一个原因是确保异常不会静默传递。
  • 我确实可能会重新考虑等待一些对程序继续运行至关重要的特定任务。无论如何,我将运行一些测试代码来练习这个场景!再次感谢您的大力帮助和您的宝贵时间!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2010-11-03
  • 1970-01-01
  • 2019-01-13
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多