【问题标题】:Make websocket callback asynchronous with asyncio使用 asyncio 使 websocket 回调异步
【发布时间】:2016-11-03 20:25:45
【问题描述】:

我正在尝试使用 asynciowebsockets 和 Python 3.5.2 实现一个基本的 websocket 客户端。

基本上,我希望connect_to_dealer 成为阻塞调用,但在不同线程上等待 websocket 消息。

在阅读了一些文档(我对 Python 的经验很少)之后,我得出结论,asyncio.ensure_future() 传递协程 (listen_for_message) 是可行的方法。

现在,我可以在不同的线程上运行 listen_for_message,但在协程中我似乎无法使用 await 或任何其他机制来进行调用同步。如果我这样做,即使是简单的sleep,执行也会永远等待(挂起)。

我想知道我做错了什么。

async def listen_for_message(self, future, websocket):
    while (True):
        try:
            await asyncio.sleep(1) # It hangs here
            print('Listening for a message...')
            message = await websocket.recv() # If I remove the sleep, hangs here
            print("< {}".format(message))
            future.set_result(message)
            future.done()
        except websockets.ConnectionClosed as cc:
            print('Connection closed')
        except Exception as e:
            print('Something happened')

def handle_connect_message(self, future):
    # We must first remove the websocket-specific payload because we're only interested in the connect protocol msg
    print(future.result)

async def connect_to_dealer(self):
    print('connect to dealer')
    websocket = await websockets.connect('wss://mywebsocket'))
    hello_message = await websocket.recv()
    print("< {}".format(hello_message))
    # We need to parse the connection ID out of the message
    connection_id = hello_message['connectionId']
    print('Got connection id {}'.format(connection_id))
    sub_response = requests.put('https://subscribetotraffic{user_id}?connection={connection_id}'.format(user_id='username', connection_id=connection_id), headers=headers)
    if sub_response.status_code == 200:
        print('Now we\'re observing traffic')
    else:
        print('Oops request failed with {code}'.format(code=sub_response.status_code))
    # Now we need to handle messages but continue with the regular execution
    try:
        future = asyncio.get_event_loop().create_future()
        future.add_done_callback(self.handle_connect_message)
        asyncio.ensure_future(self.listen_for_message(future, websocket))
    except Exception as e:
        print(e)

【问题讨论】:

    标签: python multithreading python-3.x websocket python-asyncio


    【解决方案1】:

    您是否有特定原因需要使用显式期货?

    使用asyncio,您可以结合使用coroutinesTasks 来实现大多数目的。任务本质上是封装的协程,它们在后台自行启动,独立于其他异步代码,因此您不必显式管理它们的流程或将它们与其他代码位混合。

    我不完全确定您的最终目标,但也许下面阐述的方法可以为您提供一些工作:

    import asyncio
    
    async def listen_for_message(websocket):
    
        while True:
    
            await asyncio.sleep(0)
    
            try:
                print('Listening for a message...')
                message = await websocket.recv()
    
                print("< {}".format(message))
    
            except websockets.ConnectionClosed as cc:
                print('Connection closed')
    
            except Exception as e:
                print('Something happened')
    
    
    async def connect_to_dealer():
    
        print('connect to dealer')
        websocket = await websockets.connect('wss://mywebsocket')
    
        hello_message = await websocket.recv()
        print("< {}".format(hello_message))
    
        # We need to parse the connection ID out of the message
        connection_id = hello_message['connectionId']
        print('Got connection id {}'.format(connection_id))
    
        sub_response = requests.put('https://subscribetotraffic{user_id}?connection={connection_id}'.format(
            user_id='username', connection_id=connection_id), headers=headers)
    
        if sub_response.status_code == 200:
            print('Now we\'re observing traffic')
        else:
            print('Oops request failed with {code}'.format(code=sub_response.status_code))
    
    
    async def my_app():
    
        # this will block until connect_to_dealer() returns
        websocket = await connect_to_dealer()
    
        # start listen_for_message() in its own task wrapper, so doing it continues in the background
        asyncio.ensure_future(listen_for_message(websocket))
    
        # you can continue with other code here that can now coexist with listen_for_message()
    
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        loop.run_until_complete(my_app())
        loop.run_forever() 
    

    【讨论】:

    • 嗨 shongolo,我不需要使用明确的期货,但我在文档上阅读并使用了它。感谢您的建议,我解决了我的问题,我认为关键是不要从 connect_to_dealer() 协程中调用 asyncio.ensure_future(listen_for_message(websocket))
    • 很高兴它触发了解决方案。从另一个协程调用ensure_future() 是可以的,但在代码中的某处,您需要使用loop.run_until_complete() 驱动所有协程,如果有必要(例如使用任务)loop.run_forever()
    • @songololo await asyncio.sleep(0) 的原因是什么?
    • @walksignnison 它为事件循环提供了一个在协程之间切换的立足点。即没有await 语句,事件循环将卡在while 循环内。所有协程都需要await 语句,以便事件循环可以在协程之间切换(否则需要return 并让开)。
    猜你喜欢
    • 2021-10-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-07-04
    相关资源
    最近更新 更多