【问题标题】:Running asynchronous code synchronously in separate thread在单独的线程中同步运行异步代码
【发布时间】:2018-02-08 22:15:16
【问题描述】:

我正在使用 Django Channels 来支持 websocket,并使用他们的组概念向同一组中的多个消费者广播消息。为了在消费者之外发送消息,您需要在其他同步代码中调用异步方法。不幸的是,这在测试时会出现问题。

我开始使用loop.run_until_complete

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.ensure_future(channel_layer.group_send(group_name, {'text': json.dumps(message),
                                                                                    'type': 'receive_group_json'}),
                                              loop=loop))

然后堆栈跟踪读取该线程没有事件循环:RuntimeError: There is no current event loop in thread 'Thread-1'.。为了解决这个问题,我补充说:

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(asyncio.ensure_future(channel_layer.group_send(group_name, {'text': json.dumps(message),
                                                                                    'type': 'receive_group_json'}),
                                              loop=loop))

现在堆栈跟踪正在读取RuntimeError: Event loop is closed,尽管如果我添加打印语句loop.is_closed() 打印False

对于上下文,我使用的是 Django 2.0、Channels 2 和一个 redis 后端。

更新:我尝试在 Python 解释器中运行它(在 py.test 之外以删除移动变量)。当我运行第二个代码块时,我没有收到Event loop is closed 错误(这可能是由于 Pytest 结束时出现的超时等原因)。但是,我没有在我的客户端中收到群消息。但是,我确实看到了打印声明:

({<Task finished coro=<RedisChannelLayer.group_send() done, defined at /Users/my/path/to/venv/lib/python3.6/site-packages/channels_redis/core.py:306> result=None>}, set())

更新 2:在刷新 redis 之后,我在 py.test 中添加了一个固定装置,用于为每个函数以及会话范围的事件循环刷新它。这一次从 RedisChannelLayer 产生另一个打印:

({<Task finished coro=<RedisChannelLayer.group_send() done, defined at /Users/my/path/to/venv/lib/python3.6/site-packages/channels_redis/core.py:306> exception=RuntimeError('Task <Task pending coro=<RedisChannelLayer.group_send() running at /Users/my/path/to/venv/lib/python3.6/site-packages/channels_redis/core.py:316>> got Future <Future pending> attached to a different loop',)>}, set())

【问题讨论】:

    标签: django python-asyncio django-channels


    【解决方案1】:

    如果channel_layer 期望驻留在另一个线程中自己的事件循环中,您将需要获取该事件循环对象。拥有它后,您可以向它提交协程并与您的线程同步,如下所示:

    def wait_for_coro(coro, loop):
        # submit coroutine to the event loop in the other thread
        # and wait for it to complete
        future = asyncio.run_coroutine_threadsafe(coro, loop)
        return future.wait()
    
    wait_for_coro(channel_layer.group_send(group_name, ...), channel_loop)
    

    【讨论】:

    • 在线程之间传递事件循环的标准做法是什么?我是否需要将事件循环作为参数传递给工作线程?还是在 daphne 或 async 库中有调用?
    • @williamrfry 因为事件循环是一个和其他对象一样的对象,将asyncio.get_event_loop() 参数传递给工作线程是非常好的。唯一要记住的是,除了通过asyncio.run_coroutine_threadsafe 之外,您不能在工作线程中使用该事件循环执行任何操作(例如提交任务或查询其状态)。
    【解决方案2】:

    默认情况下,只有主线程获得事件循环,在其他线程调用get_event_loop会失败。

    如果您需要在另一个线程中使用事件循环(例如处理 HTTP 或 WebSockets 请求的线程),您需要使用new_event_loop 自己创建。之后,您可以使用set_event_loop 并且将来的get_event_loop 调用将起作用。我这样做:

    # get or create an event loop for the current thread
    def get_thread_event_loop():
        try:
            loop = asyncio.get_event_loop()  # gets previously set event loop, if possible
        except RuntimeError:
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
        return loop
    

    More here.

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-01-13
      • 1970-01-01
      • 1970-01-01
      • 2022-10-01
      • 2022-01-08
      • 1970-01-01
      相关资源
      最近更新 更多