【问题标题】:Is it possible to run multiple asyncio in the same time in python?是否可以在 python 中同时运行多个 asyncio?
【发布时间】:2020-07-02 18:45:17
【问题描述】:

基于我得到的解决方案:Running multiple sockets using asyncio in python

我尝试使用 asyncio 添加计算部分

设置:Python 3.7.4

import msgpack
import threading
import os
import asyncio
import concurrent.futures
import functools
import nest_asyncio
nest_asyncio.apply()

class ThreadSafeElem(bytes):
  def __init__(self, * p_arg, ** n_arg):
     self._lock = threading.Lock()
  def __enter__(self):
     self._lock.acquire()
     return self
  def __exit__(self, type, value, traceback):
     self._lock.release()

elem = ThreadSafeElem()

async def serialize(data):
   return msgpack.packb(data, use_bin_type=True)
async def serialize1(data1):
   return msgpack.packb(data1, use_bin_type=True)

async def process_data(data,data1):
   loop = asyncio.get_event_loop()
   future = await loop.run_in_executor(None, functools.partial(serialize, data))
   future1 = await loop.run_in_executor(None, functools.partial(serialize1, data1))
   return   await asyncio.gather(future,future1)

 ################ Calculation#############################
def calculate_data():
  global elem
  while True:
      try:
          ... data is calculated (some dictionary))...
          elem, elem1= asyncio.run(process_data(data, data1))
      except:
          pass
#####################################################################
def get_data():
  return elem
def get_data1():
  return elem1
########### START SERVER AND get data contionusly ################
async def client_thread(reader, writer):
  while True:
    try:
        bytes_received = await reader.read(100) 
        package_type = np.frombuffer(bytes_received, dtype=np.int8)
        if package_type ==1 :
           nn_output = get_data1()
        if package_type ==2 :
           nn_output = get_data()               
        writer.write(nn_output)
        await writer.drain()
    except:
        pass

async def start_servers(host, port):
  server = await asyncio.start_server(client_thread, host, port)
  await server.serve_forever()

async def start_calculate():
  await asyncio.run(calculate_data())

def enable_sockets():
 try:
    host = '127.0.0.1'
    port = 60000
    sockets_number = 6
    loop = asyncio.get_event_loop()
    for i in range(sockets_number):
        loop.create_task(start_servers(host,port+i))
    loop.create_task(start_calculate())
    loop.run_forever()
except:
    print("weird exceptions")
##############################################################################

enable_sockets()   

问题是当我从客户端拨打电话时,服务器没有给我任何东西。

我用虚拟数据测试了程序,在计算部分没有异步,所以没有这个 loop.create_task(start_calculate()) 并且服务器正确响应。

我还运行计算数据而不将其添加到启用套接字中并且它工作。它也适用于这个实现,但问题是服务器没有返回任何东西。

我是这样做的,因为我需要计算部分持续运行,并且当其中一个客户端调用以返回该点的数据时。

【问题讨论】:

    标签: python python-3.x server python-asyncio


    【解决方案1】:

    asyncio 事件循环不能嵌套在另一个事件循环中,这样做没有任何意义:asyncio.run(和类似的)阻塞当前线程直到完成。这不会增加并行度,只会禁用任何外部事件循环。

    如果要嵌套另一个asyncio 任务,直接在当前事件循环中运行即可。如果你想运行一个非合作的阻塞任务,run it in the event loop executor

    async def start_calculate():
        loop = asyncio.get_running_loop()
        await loop.run_in_executor(None, calculate_data)
    

    默认执行器使用线程——这允许运行阻塞任务,但不会增加并行度。使用自定义 ProcessPoolExecutor 来使用额外的内核:

    import concurrent.futures
    
    async def start_calculate():
        loop = asyncio.get_running_loop()
        with concurrent.futures.ProcessPoolExecutor() as pool:
            await loop.run_in_executor(pool, calculate_data)
    

    【讨论】:

    • 我尝试了建议的第一种方法并修改了 start_calculate 但我收到了 RuntimeWarning: coroutine 'process_data' is never awaited @MisterMiyagi
    • @cUser calculate_data 如图所示是不完整的,并且它会捕获并丢弃所有错误——我对我一无所知的事情无能为力。但是,您似乎使用asyncio.run(process_data(data)) 在其中进行调度——如前所述,将asyncio.run 嵌套在asyncio.run 中并不是一个好主意。要么将process_data 提交到外部事件循环,要么丢弃包装器并直接在calculate_data 内运行serialize
    • 你是完全正确的 asyncio.run 中的 asyncio.run 远非一个好主意。但是我意识到我将同时序列化我需要做的 5 件事。 Calcualte 数据只是做一些不相关的计算,并将它们添加到字典中,这就是我没有添加它的原因。因此,如果我将在计算数据中进行序列化,它将使一切都变得非常慢@MisterMiyagi。那么在这一点上,即使我在这个loop.run_in_executor(None,calculate_data)中运行,我仍然如何并行进行序列化。告诉我如果我说清楚了
    • 使用问题中显示的代码,每个calculate_data最终都会调用serialize一次:calculate_data阻塞asyncio.run不会增加并发,process_data只提交一个executor任务不会添加并发,asyncio.gather的单个任务不添加并发。因此,asyncio.run(process_data(data)) 可以替换为serialize(data)(并且serialize 应该是def 而不是async def 函数)。
    • @cUser 我仍然不明白你为什么要运行serializeserialize 使用asyncio。他们不使用任何异步功能。不管怎样,我已经在asyncio 中回答了如何运行多个任务。如果您想调试现在出现的特定错误,请提出一个新问题,包括实际调试它所需的所有信息。通过 cmets 将信息分段添加到答案中对我们双方都不是有效的。
    【解决方案2】:

    为什么要多次拨打asyncio.run()

    这个函数总是创建一个新的事件循环并在最后关闭它。它应该用作异步程序的主要入口点,并且理想情况下应该>只被调用一次。

    我建议你阅读docs

    【讨论】:

    • 你可以使用多处理,我亲自做过
    猜你喜欢
    • 1970-01-01
    • 2023-03-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-08-16
    • 1970-01-01
    相关资源
    最近更新 更多