【问题标题】:Python - how to run multiple coroutines concurrently using asyncio?Python - 如何使用 asyncio 同时运行多个协程?
【发布时间】:2015-08-17 15:21:54
【问题描述】:

我正在使用websockets 库在 Python 3.4 中创建一个 websocket 服务器。这是一个简单的回显服务器:

import asyncio
import websockets

@asyncio.coroutine
def connection_handler(websocket, path):
    while True:
        msg = yield from websocket.recv()
        if msg is None:  # connection lost
            break
        yield from websocket.send(msg)

start_server = websockets.serve(connection_handler, 'localhost', 8000)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

假设我们 - 另外 - 想要在发生某些事件时向客户端发送消息。为简单起见,让我们每 60 秒定期发送一条消息。我们将如何做到这一点?我的意思是,因为connection_handler 一直在等待收到的消息,所以服务器只能在收到来自客户端的消息 之后才采取行动,对吧?我在这里错过了什么?

也许这种情况需要一个基于事件/回调的框架,而不是基于协程的框架? Tornado?

【问题讨论】:

    标签: python python-3.x websocket python-asyncio


    【解决方案1】:

    TL;DR 使用asyncio.ensure_future() 同时运行多个协程。


    也许这种情况需要一个基于事件/回调的框架,而不是基于协程的框架?龙卷风?

    不,您不需要任何其他框架。异步应用程序与同步的整个想法是它在等待结果时不会阻塞。使用协程或回调如何实现并不重要。

    我的意思是,因为 connection_handler 一直在等待传入的消息,所以服务器只有在收到客户端的消息后才能采取行动,对吧?我在这里错过了什么?

    在同步应用程序中,您将编写类似msg = websocket.recv() 的内容,这将阻塞整个应用程序,直到您收到消息(如您​​所述)。但在异步应用程序中则完全不同。

    当您执行msg = yield from websocket.recv() 时,您会说:暂停执行connection_handler() 直到websocket.recv() 会产生一些结果。在协程中使用yield from 将控制权返回给事件循环,因此可以执行一些其他代码,同时我们正在等待websocket.recv() 的结果。请参考documentation 以更好地了解协程的工作原理。

    假设我们 - 另外 - 想要在发生某些事件时向客户端发送消息。为简单起见,让我们每 60 秒定期发送一条消息。我们该怎么做?

    在对starting event loop 执行阻塞调用之前,您可以使用asyncio.async() 运行任意数量的协程。

    import asyncio
    
    import websockets
    
    # here we'll store all active connections to use for sending periodic messages
    connections = []
    
    
    @asyncio.coroutine
    def connection_handler(connection, path):
        connections.append(connection)  # add connection to pool
        while True:
            msg = yield from connection.recv()
            if msg is None:  # connection lost
                connections.remove(connection)  # remove connection from pool, when client disconnects
                break
            else:
                print('< {}'.format(msg))
            yield from connection.send(msg)
            print('> {}'.format(msg))
    
    
    @asyncio.coroutine
    def send_periodically():
        while True:
            yield from asyncio.sleep(5)  # switch to other code and continue execution in 5 seconds
            for connection in connections:
                print('> Periodic event happened.')
                yield from connection.send('Periodic event happened.')  # send message to each connected client
    
    
    start_server = websockets.serve(connection_handler, 'localhost', 8000)
    asyncio.get_event_loop().run_until_complete(start_server)
    asyncio.async(send_periodically())  # before blocking call we schedule our coroutine for sending periodic messages
    asyncio.get_event_loop().run_forever()
    

    这是一个示例客户端实现。它要求您输入名称,从回显服务器接收它,等待来自服务器的另外两条消息(这是我们的定期消息)并关闭连接。

    import asyncio
    
    import websockets
    
    
    @asyncio.coroutine
    def hello():
        connection = yield from websockets.connect('ws://localhost:8000/')
        name = input("What's your name? ")
        yield from connection.send(name)
        print("> {}".format(name))
        for _ in range(3):
            msg = yield from connection.recv()
            print("< {}".format(msg))
    
        yield from connection.close()
    
    
    asyncio.get_event_loop().run_until_complete(hello())
    

    要点:

    1. 在 Python 3.4.4 中,asyncio.async() 被重命名为 asyncio.ensure_future()
    2. 有一些特殊的方法可以调度delayed calls,但它们不适用于协程。

    【讨论】:

    • 很好的答案,谢谢!我了解协程是什么,但我仍在尝试了解 asyncio 框架。您的回答很有帮助。
    • @weatherfrog 你解决问题了吗?我在here 上有类似的问题
    【解决方案2】:

    我很惊讶gather 没有被提及。

    来自Python documentation

    import asyncio
    
    async def factorial(name, number):
        f = 1
        for i in range(2, number + 1):
            print(f"Task {name}: Compute factorial({i})...")
            await asyncio.sleep(1)
            f *= i
        print(f"Task {name}: factorial({number}) = {f}")
    
    async def main():
        # Schedule three calls *concurrently*:
        await asyncio.gather(
            factorial("A", 2),
            factorial("B", 3),
            factorial("C", 4),
        )
    
    asyncio.run(main())
    
    # Expected output:
    #
    #     Task A: Compute factorial(2)...
    #     Task B: Compute factorial(2)...
    #     Task C: Compute factorial(2)...
    #     Task A: factorial(2) = 2
    #     Task B: Compute factorial(3)...
    #     Task C: Compute factorial(3)...
    #     Task B: factorial(3) = 6
    #     Task C: Compute factorial(4)...
    #     Task C: factorial(4) = 24
    

    【讨论】:

    • 这是最简单的解决方案,适用于该线程中提出的所有方案
    • 可以在Python 3.8上确认,简单易行,就可以了。
    • 这个答案与 websockets 部分没有任何关系。这只是一个异步示例
    【解决方案3】:

    同样的问题,在我在这里看到完美的样本之前很难得到解决方案:http://websockets.readthedocs.io/en/stable/intro.html#both

     done, pending = await asyncio.wait(
            [listener_task, producer_task],
            return_when=asyncio.FIRST_COMPLETED)  # Important
    

    这样,我就可以处理心跳、redis订阅等多协程任务了。

    【讨论】:

    • 我试过你教的链接。但我无法解决我的问题。请对我的问题提出一些建议here
    • 谢谢你...尽管我的问题不一样,但这个解决方案很有帮助
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-04-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-12-18
    相关资源
    最近更新 更多