【问题标题】:running asyncio task concurrently and in background在后台同时运行异步任务
【发布时间】:2021-01-03 08:05:54
【问题描述】:

所以很抱歉,因为我看到这个问题问了很多,但是查看了所有问题似乎都没有解决我的问题。我的代码是这样的

TDSession = TDClient()
TDSession.grab_refresh_token()
q = queue.Queue(10)
asyncio.run(listener.startStreaming(TDSession, q))
while True:
    message = q.get()
    print('oh shoot!')
    print(message)
    orderEntry.placeOrder(TDSession=TDSession)

我试过asyncio.create_task(listener.startStreaming(TDSession,q)),问题是我得到了

RuntimeError: no running event loop
sys:1: RuntimeWarning: coroutine 'startStreaming' was never awaited

这让我很困惑,因为这似乎在这里工作 Can an asyncio event loop run in the background without suspending the Python interpreter? 这是我想要做的。

listener.startStreaming 函数看起来像这样

async def startStreaming(TDSession, q):
    streamingClient = TDSession.create_streaming_session()
    streamingClient.account_activity()
    await streamingClient.build_pipeline()
    while True:
        message = await streamingClient.start_pipeline()
        message = parseMessage(message)
        if message != None:
            print('putting message into q')
            print( dict(message) )
            q.put(message)

有没有办法让我可以在后台运行监听器?

编辑:我也试过这个,但它只运行 consumer 函数,而不是同时运行两者

TDSession.grab_refresh_token()
q = queue.Queue(10)
loop = asyncio.get_event_loop()
loop.create_task(listener.startStreaming(TDSession, q))
loop.create_task(consumer(TDSession, q))
loop.run_forever()

【问题讨论】:

    标签: python-3.x async-await python-asyncio


    【解决方案1】:

    如您所见,asyncio.run 函数会运行给定的协程,直到完成。也就是说,它会等待listener.startStreaming 返回的协程完成,然后再继续下一行。

    另一方面,使用asyncio.create_task 要求调用者已经在异步循环中运行。来自文档:

    任务在get_running_loop()返回的循环中执行,如果当前线程没有运行循环,则引发RuntimeError。

    您需要将两者结合起来,方法是创建一个 async 函数,然后在该异步函数中调用 create_task

    例如:

    async def main():
      TDSession = TDClient()
      TDSession.grab_refresh_token()
      q = asyncio.Queue(10)
      streaming_task = asyncio.create_task(listener.startStreaming(TDSession, q))
      while True:
        message = await q.get()
        print('oh shoot!')
        print(message)
        orderEntry.placeOrder(TDSession=TDSession)
    
      await streaming_task  # If you want to wait for `startStreaming` to complete after the while loop
    
    if __name__ == '__main__':
      asyncio.run(main())
    

    编辑:从您的评论中,我意识到您想使用生产者-消费者模式,因此我还更新了上面的示例以使用 asyncio.Queue 而不是 queue.Queue,以便线程能够在生产者(startStreaming)和消费者(while 循环)

    【讨论】:

    • 那么这两个函数可以并行运行吗?我想将它们视为流中的生产者和消费者,其中 startStreaming 函数将消息放入队列中,而主 while 循环中的代码获取它。这可能与 asyncio 吗?开始怀疑是不是
    • 是的,你可以。这里的问题是queue.Queue 是一个阻塞队列(q.get() 没有等待,而是阻塞线程直到有可用值)。相反,你应该使用asyncio.Queue,它有一个协程函数get()
    • 异步队列就是答案!
    猜你喜欢
    • 2016-03-02
    • 2014-03-31
    • 1970-01-01
    • 2021-09-08
    • 1970-01-01
    • 1970-01-01
    • 2018-10-18
    • 1970-01-01
    相关资源
    最近更新 更多