【问题标题】:Asyncio Tasks running sequentially顺序运行的异步任务
【发布时间】:2022-03-05 00:10:02
【问题描述】:

我最近开始研究 python 及其相关的并发方面,我正在研究 asyncio。

数据结构:其中包含用户列表的公司列表。

目标:我想并行执行 gRPC 调用,任务始终为特定公司运行。此外,API 调用在用户列表中,并且是批量调用[不是一个公司的单个调用]

我关注的参考文献https://docs.python.org/3/library/asyncio-queue.html [根据我的用例稍作修改]

我做了什么:以下 3 个小函数,process_cname_vs_users 的输入为 company_id vs users-list

async def update_data(req_id, user_ids, company_id): # <-- THIS IS THE ASYNC CALL ON CHUNK OF SOME USERS
    # A gRPC call to server here.


async def worker(worker_id, queue, company_vs_user_ids):
    while True:
        company_id = await queue.get()
        user_ids = cname_vs_user_ids.get(company_id)

        user_ids_chunks = get_data_in_chunks(user_ids, 20)
        for user_id_chunk in user_ids_chunks:
            try:
                await update_data(user_id_chunk, company_id)
            except Exception as e:
                print("error: {}".format(e))

        # Notify the queue that the "work item" has been processed.
        queue.task_done()


async def process_cname_vs_users(cname_vs_user_ids):
    queue = asyncio.Queue()

    for company_id in cname_vs_user_ids:
        queue.put_nowait(company_id)

    tasks = []
    for i in range(5):  # <- number of workers
        task = asyncio.create_task(
            worker(i, queue, cname_vs_user_ids))
        tasks.append(task)

    # Wait until the queue is fully processed.
    await queue.join()

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()

    try:
        # Wait until all worker tasks are cancelled.
        responses = await asyncio.gather(*tasks, return_exceptions=True)
        print('Results:', responses)

    except Exception as e:
        print('Got an exception:', e)

期望:任务应同时为 5 个(工人数量)公司执行。

现实:唯一的首要任务是按顺序为所有公司工作。

任何帮助/建议都会有所帮助。在此先感谢:)

【问题讨论】:

    标签: python-3.x async-await python-asyncio coroutine concurrent.futures


    【解决方案1】:

    所以,最后,我在阅读了有关 python 并发的更多信息后才明白。 到目前为止,我已经使用 futures.ThreadPoolExecutor 来实现所需的输出。

    解决方案:

    async def update_data(req_id, user_ids, company_id): # <-- THIS IS THE ASYNC CALL ON CHUNK OF SOME USERS
        # A gRPC call to server here.
    
    async def worker(cname_vs_user_ids):
        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            future_summary = {executor.submit(update_data, req_id, user_items, company_id) for
                              company_id, user_items in
                              cname_vs_user_ids.items()}
            for future in concurrent.futures.as_completed(future_summary):
                try:
                    response = future.result()
                except Exception as error:
                    print("ReqId: {}, Error occurred: {}".format(req_id, str(error)))
                    executor.shutdown()
                    raise error
    
    
    async def process_cname_vs_users(cname_vs_user_ids):
        loop = asyncio.get_event_loop()
        loop.run_until_complete(worker(cname_vs_user_ids))
    

    上述解决方案对我来说非常有用。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-08-30
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-05-07
      • 2012-08-19
      相关资源
      最近更新 更多