【问题标题】:How to run a coroutine that does not end without getting stuck awaiting a result?如何运行一个不会结束而不会等待结果的协程?
【发布时间】:2017-03-19 05:10:34
【问题描述】:

我有这段代码,我想连接到 websocket 并通过发送心跳来保持连接处于活动状态。在执行此操作时,我还希望能够将有效负载发送到 websocket,但是我一直卡在等待我的 keepAlive 协程的结果

import asyncio
import websockets
ws=#path goes here
async def sendHeartbeat(socket):
    message={
        "op": 1,
        "d": 0
        }
    await socket.send(json.dumps(message))
    print("heartbeat sent")


async def keepAlive(socket):
    while 1==1:
        await sendHeartbeat(socket)
        resp=await(socket.recv())
        print(resp)
        if json.loads(resp)["op"]==11:
            #the server sent an ack back
            print("ack received")
        else:
            socket.close()
        await asyncio.sleep(10)




async def beginSocket(loop):
    async with websockets.connect(ws) as socket:
        print(await socket.recv())
        await keepAlive(socket)
        #payloads would be sent here
        print("Hey payloads being sent and stuff")

loop = asyncio.get_event_loop()
loop.run_until_complete(beginSocket(loop))

然而,使用这段代码,等待keepAlive 之后的打印语句永远不会被打印。我怎样才能使代码不等待keepAlive的结果?

【问题讨论】:

标签: python python-3.x python-asyncio python-3.6


【解决方案1】:

虽然keepAlive(socket) 会立即返回,但由于keepAlive 是协程,await keepAlive(socket) 将永远不会返回,因为keepAlive() 包含无限循环。

不要使用await,而是尝试asyncio.ensure_future(keepAlive(socket))

如果您确实想使用await keepAlive(socket),请尝试从其他地方发送您的有效负载(可能事先使用asyncio.ensure_future(send_payload(socket)))。

【讨论】:

    【解决方案2】:

    这是一种在概念上通过同一个套接字进行两个单独对话的情况。一个对话是你的心跳和回复消息。另一个是您发送的其他数据包。

    我会为此保留 3 个单独的顶级(即它们直接向事件循环报告)任务。我会让他们都保留对某个协调员的引用,这样他们中的任何一个都可以在需要时取消所有其他协调员。

    1. 心跳任务,基本上是你的keepAlive函数。
    2. 处理您通过 websocket 进行的其他对话的任务。
    3. 多路复用从 websocket 读取的任务。

    多路复用任务的工作是将消息路由到适当的任务。心跳任务应该只获得心跳响应,而其他任务应该获得所有其他消息。

    由于 websockets 已经构建了消息,因此您只能 sendrecv 整条消息,它可能拥有的其他工作不相关。

    这是你可以写这个的一种方式。

    import asyncio
    import websockets
    ws=#path goes here
    
    class RoutingTask(object):
        def __init__(self, sock, defaultQueue, **kwargs):
            super().__init__(**kwargs)
            self.sock = sock
            self.defaultQueue = defaultQueue # The queue all messages not otherwise matched go to.
            self.matchers = []
    
        async def run(self):
            while True:
                msg = await self.sock.recv()
                msg = json.loads(msg)
                matched = False
                for matcher, queue in matchers:
                    if matcher(msg):
                        await queue.put(msg)
                        matched = True
                        break
                if not matched:
                    await self.defaultQueue.put(msg)
    
        def addMatcher(self, matcher, queue):
            el = (matcher, queue)
            self.matchers.append(el)
    
    async def heartbeatTask(wssock, incomingq):
        message=json.dumps({
            "op": 1,
            "d": 0
            }) # Do this just once.
        while True:
           await wssock.send(message)
           print("heartbeat sent")
           response = await asyncio.wait_for(incomingq.get(), 10) # Wait 10 seconds for response.
           assert response['op'] == 11
           print("heartbeat response received.")
           await asyncio.sleep(10) # Wait 10 seconds to send another heartbeat.
    
    async def beginSocket(loop):
        def heartbeatMatcher(jsondict):
            return jsondict.get('op', None) == 11
    
        async with websockets.connect(ws) as socket:
            myq = asyncio.Queue(maxsize=1)
            heartbeatq = asyncio.Queue(maxsize=1)
            router = RoutingTask(socket, myq)
            router.addMatcher(heartbeatMatcher, heartbeatq)
            router = asyncio.ensure_future(router.run())
            heartbeat = asyncio.ensure_future(heartbeatTask(socket, heartbeatq)
    
            print(await myq.get())
            #payloads would be sent here
            print("Hey payloads being sent and stuff")
    
        heartbeat.cancel() # Stop the heartbeat
        router.cancel() # Stop the router task
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(beginSocket(loop))
    

    这里有一些问题。如果抛出异常,heartbeatrouter 任务可能最终不会被取消。他们也没有真正好的方法将问题报告给主要的beginSocket 任务。这基本上是一种快速而肮脏的一次性演示如何做你想做的事情。

    在我看来,asyncio.ensure_future 命名错误。它的作用是告诉事件循环有一个新事物需要继续运行。它基本上是启动一个线程的协程等价物。

    【讨论】:

      猜你喜欢
      • 2019-05-21
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-04-11
      • 2021-01-27
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多