【问题标题】:Create another thread for an await function为 await 函数创建另一个线程
【发布时间】:2023-04-13 19:33:01
【问题描述】:

我是第一次使用Webserver,之前我使用过套接字和并行,但它非常不同且简单,它没有使用异步作为并行。

我的目标很简单,我有我的服务器和我的客户端。在我的客户端中,我想创建一个单独的线程来接收服务器将发送的消息,并在前一个线程中执行一些其他操作,如代码示例 (client.py):

from typing import Dict
import websockets
import asyncio
import json

URL = "my localhost webserver"
connection = None

async def listen() -> None:
    global connection

    input("Press enter to connect.")
    
    async with websockets.connect(URL) as ws:
        connection = ws

        msg_initial: Dict[str,str] = get_dict()
        await ws.send(json.dumps(msg_initial))
        

        ## This i want to be in another thread
await receive_msg()

print("I`m at listener`s thread")

# do some stuffs

async def recieve_msg() -> None:
    while True:
        msg = await connection.recv()
        print(f"Server: {msg}")

asyncio.get_event_loop().run_until_complete(listen())

为了让我收到消息,我需要在recv() 中使用await,但我不知道如何为此创建单独的线程。我已经尝试使用threading 来创建一个单独的线程,但它不起作用。

有谁知道如何做到这一点以及是否可以做到这一点?

【问题讨论】:

    标签: python python-3.x multithreading async-await webserver


    【解决方案1】:

    目前尚不清楚您想做什么可以按照您建议的确切方式完成。在以下示例中,我将连接到回显服务器。直接实现您建议的最直接的方法是创建一个新线程来传递连接。但这并不完全奏效:

    import websockets
    import asyncio
    from threading import Thread
    
    URL = "ws://localhost:4000"
    
    async def listen() -> None:
        async with websockets.connect(URL) as ws:
            # pass connection:
            t = Thread(target=receiver_thread, args=(ws,))
            t.start()
            # Generate some messages to be echoed back:
            await ws.send('msg1')
            await ws.send('msg2')
            await ws.send('msg3')
            await ws.send('msg4')
            await ws.send('msg5')
    
    def receiver_thread(connection):
        print("I`m at listener`s thread")
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop.run_until_complete(receive_msg(connection))
    
    async def receive_msg(connection) -> None:
        while True:
            msg = await connection.recv()
            print(f"Server: {msg}")
    
    asyncio.get_event_loop().run_until_complete(listen())
    

    打印:

    I`m at listener`s thread
    Server: msg1
    Server: msg2
    Server: msg3
    Server: msg4
    Server: msg5
    Exception in thread Thread-1:
    Traceback (most recent call last):
      File "C:\Program Files\Python38\lib\threading.py", line 932, in _bootstrap_inner
        self.run()
      File "C:\Program Files\Python38\lib\threading.py", line 870, in run
        self._target(*self._args, **self._kwargs)
      File "C:\Ron\test\test.py", line 22, in receiver_thread
        loop.run_until_complete(receive_msg(connection))
      File "C:\Program Files\Python38\lib\asyncio\base_events.py", line 616, in run_until_complete
        return future.result()
      File "C:\Ron\test\test.py", line 29, in receive_msg
        msg = await connection.recv()
      File "C:\Program Files\Python38\lib\site-packages\websockets\legacy\protocol.py", line 404, in recv
        await asyncio.wait(
      File "C:\Program Files\Python38\lib\asyncio\tasks.py", line 424, in wait
        fs = {ensure_future(f, loop=loop) for f in set(fs)}
      File "C:\Program Files\Python38\lib\asyncio\tasks.py", line 424, in <setcomp>
        fs = {ensure_future(f, loop=loop) for f in set(fs)}
      File "C:\Program Files\Python38\lib\asyncio\tasks.py", line 667, in ensure_future
        raise ValueError('The future belongs to a different loop than '
    ValueError: The future belongs to a different loop than the one specified as the loop argument
    

    消息接收正常,但问题出现在函数receiver_thread语句上:

    loop.run_until_complete(receive_msg(connection))
    

    启动的线程必须没有正在运行的事件循环,并且不能使用函数listen 使用的事件循环,因此必须创建一个新的事件循环。如果这个线程/事件循环没有使用来自不同事件循环的任何资源(即连接),那会很好:

    import websockets
    import asyncio
    from threading import Thread
    
    URL = "ws://localhost:4000"
    
    async def listen() -> None:
        async with websockets.connect(URL) as ws:
            t = Thread(target=receiver_thread)
            t.start()
    
    def receiver_thread():
        print("I`m at listener`s thread")
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop.run_until_complete(receive_msg())
    
    async def receive_msg() -> None:
        await asyncio.sleep(2)
        print('I just slept for 2 seconds')
    
    asyncio.get_event_loop().run_until_complete(listen())
    

    打印:

    I`m at listener`s thread
    I just slept for 2 seconds
    

    根据您显示的最少代码,我认为没有真正需要在线程中运行任何东西,但假设您省略了显示对接收到的消息的一些处理,仅asyncio 是不够的,那么您可能只需要做的是在当前运行循环中接收消息(在函数listen)并使用线程来处理消息:

    from typing import Dict
    import websockets
    import asyncio
    import json
    from threading import Thread
    
    URL = "my localhost webserver"
    
    async def listen() -> None:
    
        input("Press enter to connect.")
    
        async with websockets.connect(URL) as ws:
    
            msg_initial: Dict[str,str] = get_dict()
            await ws.send(json.dumps(msg_initial))
    
            while True:
                msg = await ws.recv()
                print(f"Server: {msg}")
                # Non-daemon threads so program will not end until these threads terminate:
                t = Thread(target=process_msg, args=(msg,))
                t.start()
                
    
    asyncio.get_event_loop().run_until_complete(listen())
    

    更新

    根据您对我关于创建聊天程序的回答的最后评论,您应该使用纯多线程或纯异步来实现它。这是使用 asyncio 的粗略大纲:

    import websockets
    import asyncio
    import aioconsole
    
    URL = "my localhost webserver"
    
    async def receiver(connection):
        while True:
            msg = await connection.recv()
            print(f"\nServer: {msg}")
    
    async def sender(connection):
        while True:
            msg = await aioconsole.ainput('\nEnter msg: ')
            await connection.send(msg)
    
    async def chat() -> None:
        async with websockets.connect(URL) as ws:
            await asyncio.gather(
                receiver(ws),
                sender(ws)
            )
    
    asyncio.get_event_loop().run_until_complete(chat())
    

    但是,您可以使用 asyncio 执行的用户输入类型可能会受到限制。因此,我认为多线程可能是更好的方法。

    【讨论】:

    • 嗯我理解你的建议,我会参加测试,我会告诉你正确的。但我的应用程序的想法是在线纸牌游戏。考虑一下,我需要一个线程,客户端将只接收一些信息,而另一个线程则发送它。 send、recv、recv、send 没有正确的顺序……这就像聊天,需要两个线程才能进行在线聊天,思路是一样的
    • 查看更新答案。
    • 我将运行此测试并查看我的应用程序的限制。如果这条路径不起作用,我会看看你是否可以在没有 asyncio 的情况下使用 Webserver 库。特别是我更喜欢另一种并行性,但我认为库本身是基于这种架构的。还是谢谢你。