【问题标题】:Websockets send message in sync functionWebsockets 在同步功能中发送消息
【发布时间】:2022-11-11 01:20:20
【问题描述】:

我正在使用 websockets 和 asyncio 来管理我的应用程序中的连接。

发送方法是异步的

async def send(self, message):
        logging.debug('send {}'.format(message))
        await self.websocket.send(message)

我通常在异步线程中使用它,一切都很好。 只有一种情况我需要从同步方法中调用它。

我试过这样称呼它

asyncio.run(ws.send(json.dumps(payload)))

但我得到了这个例外

Task <Task pending name='Task-134' coro=<WebSocketCommonProtocol.send() running at /usr/local/lib/python3.8/dist-packages/websockets/legacy/protocol.py:631> cb=[_run_until_complete_cb() at /usr/lib/python3.8/asyncio/base_events.py:184]> got Future <Future pending> attached to a different loop

所以我尝试使用当前循环

loop = asyncio.get_event_loop()
asyncio.run(ws.send(json.dumps(payload)), loop=loop)

There is no current event loop in thread 'Thread-37'.

可以做什么?

【问题讨论】:

  • 您的脚本是否混合了asynciothreading?使用asyncio.to_thread 将线程放在asyncio 一侧可能会更好
  • 主要问题是self.websocket... 将绑定到与您尝试同步调用的单独线程中的循环不同的循环。

标签: python python-3.x python-asyncio


【解决方案1】:

在运行异步代码的一个线程上,您应该启动一个任务来监听队列 - 每当它得到一些东西时,它就会处理请求并将结果发回。

这样,在同步线程上,您只需发布到该队列并等待结果。 Queue 本身用于线程间通信,本身不需要异步 - 因此 queue.Queue 是一个您可以使用的类,然后使用 asyncio.sleep 以定期合并队列。

所以,或多或少沿着这个:

from queue import Queue, Empty
import threading

class YouDidNotPostTheClass:

   def __init__(self, ...):
       ...
       self.queue = Queue()
   
   async def _async_init(self):
       # this method is boilerplate - you could
       # just issue the call bellow wherever you start your async code
       self._tasks = [asyncio.create_task(self.sync_send_server()), ]

   async def _async_end(self):
       for task in self._tasks:
           task.cancel()
   
   async def sync_send_server(self):
        while True:
            try:
                message = self.queue.get_nowait()
            except Empty:
                pass
            else:
                await self.send(message)
            await asyncio.sleep(0.05) # Tune at will
            self.queue.task_done()  # <- optional, but ensure you can use all of Queue's functionality on your code.

    def sync_send(self, message):
         # call _this_ method from the thread running sync code
         self.queue.put(message)
    
    async def send(self, message):
        ...


...
async def main():
     client = YouDidNotPostTheClass()
     await client._async_init()
     # your main async code goes here
     ...
     # avoid warnings about running tasks at program end:
     await client._async_end()

def sync_main():
    # code to drive the synchronous thread in your program
    ...

syncthread = threading.Thread(target=sync_main)
syncthread.start()
asyncio.run(main()

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-01-13
    • 2016-02-21
    • 1970-01-01
    • 2021-10-07
    • 2023-03-28
    • 1970-01-01
    相关资源
    最近更新 更多