【问题标题】:How to restart asyncio thread?如何重新启动异步线程?
【发布时间】:2020-08-22 07:25:52
【问题描述】:

如何重新启动异步循环?我正在用 asyncio 收听 websocket。我想停止听,重新开始整个循环。我怎样才能做到这一点?我在下面的尝试不起作用

async def start_websocket(streams):
    print("using streams {}".format(streams))
    await asyncio.sleep(30)


def _start_loop(loop, ws):
    asyncio.set_event_loop(loop)
    try:
        loop.run_until_complete(ws)
    except CancelledError:
        pass


for streams in ["a", "b"]:
    ws = start_websocket(streams)  # coroutine
    loop = asyncio.get_event_loop()

    # in case of already running, cancel websocket
    if loop.is_running():
        [t.cancel() for t in asyncio.Task.all_tasks()]

    # restart websocket
    Thread(target=_start_loop, args=(loop, ws)).start()
    time.sleep(2)

我来了

RuntimeError: This event loop is already running

【问题讨论】:

    标签: python asynchronous python-asyncio


    【解决方案1】:

    您可能会停止循环(loop.stop()loop.close())并通过 loop = asyncio.new_event_loop() 创建一个新循环

    另一种选择是创建自定义事件循环策略:Loop Policy

    【讨论】:

    • 我已将示例自包含。你有一些示例代码吗?
    【解决方案2】:
        print("using streams {}".format(streams))
        await asyncio.sleep(30)
    
    
    def _start_loop(loop, ws):
        asyncio.set_event_loop(loop)
        try:
            loop.run_until_complete(ws)
        except CancelledError:
            pass
    
    
    for streams in ["a", "b"]:
        ws = start_websocket(streams)  # coroutine
        loop = asyncio.get_event_loop()
    
        # in case of already running, cancel websocket
        if loop.is_running():
            [t.cancel() for t in asyncio.Task.all_tasks()]
        try:
          loop.close() #docs say "The loop must not be running when this function is called. "
        except:
          loop.stop() 
        # restart websocket
        Thread(target=_start_loop, args=(loop, ws)).start()
        time.sleep(2)
    

    这应该可以解决您之前遇到的错误

    【讨论】:

    • 感谢这给出了错误:RuntimeError: Event loop is closed
    【解决方案3】:
    
    async def subtask(s):
        while True:
            print("running task {}".format(s))
            await asyncio.sleep(5)
    
    async def start_websocket(streams):
        print("using streams {}".format(streams))
        asyncio.ensure_future(subtask(streams+"1"))
        asyncio.ensure_future(subtask(streams+"2"))
        try:
            while True:
                print("running task {}".format(streams))
                await asyncio.sleep(5)
        except CancelledError:
            print("cancelled task {}".format(streams))
    
    def _start_loop(loop):
    
        asyncio.set_event_loop(loop)
        loop.run_forever()
    
    
    for streams in ["a", "b", "c"]:
        loop = asyncio.get_event_loop()
        if not loop.is_running():
            Thread(target=_start_loop, args=(loop,)).start()
        else:
            for t in asyncio.Task.all_tasks():
                t.cancel()
    
        loop.create_task(start_websocket(streams))
        time.sleep(10)
        print("finished {}".format(streams))
    
    time.sleep(60)
    

    【讨论】:

    • 能否补充一些关于原代码和新版本的区别的解释性文字,以便以后的读者学习?
    猜你喜欢
    • 2023-02-15
    • 2020-07-23
    • 1970-01-01
    • 2015-10-20
    • 1970-01-01
    • 1970-01-01
    • 2016-04-04
    相关资源
    最近更新 更多