【问题标题】:Is it possible to run the asyncio.Server instance while the event loop is already running是否可以在事件循环已经运行时运行 asyncio.Server 实例
【发布时间】:2016-08-12 19:57:57
【问题描述】:

我试图理解,是否可以在事件循环已经通过run_forever 方法运行时运行asyncio.Server 实例(当然,来自单独的线程)。 据我了解,服务器可以通过loop.run_until_complete(asyncio.start_server(...)) 或通过 await asyncio.start_server(...),如果循环已经在运行。 第一种方法对我来说是不可接受的,因为循环已经通过 run_forever 方法运行。但是我也不能使用 await 表达式,因为我要从“循环区域”之外启动它(例如从 main 方法,它不能被标记为异步,对吧?)

def loop_thread(loop):
    asyncio.set_event_loop(loop)
    try:
        loop.run_forever()
    finally:
        loop.close()
        print("loop clesed")

class SchedulerTestManager:
    def __init__(self): 
        ...

        self.loop = asyncio.get_event_loop()
        self.servers_loop_thread = threading.Thread(
            target=loop_thread, args=(self.loop, ))
         ...

    def start_test(self):
        self.servers_loop_thread.start()
        return self.servers_loop_thread

    def add_router(self, router):
        r = self.endpoint.add_router(router)
        host = router.ConnectionParameters.Host
        port = router.ConnectionParameters.Port
        srv = TcpServer(host, port)
        server_coro = asyncio.start_server(
            self.handle_connection, self.host, self.port)
        # does not work since add_router is not async
        # self.server = await server_coro
        # does not work, since the loop is already running
        # self.server = self.loop.run_until_complete(server_coro)
        return r


def maind():
   st_manager = SchedulerTestManager()
   thread = st_manager.start_test()
   router = st_manager.add_router(router)

当然,最简单的解决方案是在开始测试(运行循环)之前添加所有路由器(服务器)。但我想尝试实现它,因此可以在测试已经运行时添加路由器。我认为loop.call_soon (call_soon_threadsafe) 方法可以帮助我,但似乎无法摆脱协程,而只是一个简单的功能。

希望我的解释不是很混乱。提前致谢!

【问题讨论】:

    标签: python python-asyncio


    【解决方案1】:

    为了在一个线程中执行的事件循环和在另一个线程中执行的传统旧线程代码之间进行通信,您可以使用janus 库。

    这是一个有两个接口的队列:异步和线程安全同步一个。

    这是用法示例:

    import asyncio
    import janus
    
    loop = asyncio.get_event_loop()
    queue = janus.Queue(loop=loop)
    
    def threaded(sync_q):
        for i in range(100):
            sync_q.put(i)
        sync_q.join()
    
    @asyncio.coroutine
    def async_coro(async_q):
        for i in range(100):
            val = yield from async_q.get()
            assert val == i
            async_q.task_done()
    
    fut = loop.run_in_executor(None, threaded, queue.sync_q)
    loop.run_until_complete(async_coro(queue.async_q))
    loop.run_until_complete(fut)
    

    您可以创建一个任务,循环等待来自队列的新消息,并根据请求启动新服务器。其他线程可能会将新消息推送到队列中,请求新服务器。

    【讨论】:

    • 谢谢!看来这就是我要找的。​​span>
    猜你喜欢
    • 2015-09-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-11-19
    • 2014-08-08
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多