【发布时间】:2022-03-05 00:10:02
【问题描述】:
我最近开始研究 python 及其相关的并发方面,我正在研究 asyncio。
数据结构:其中包含用户列表的公司列表。
目标:我想并行执行 gRPC 调用,任务始终为特定公司运行。此外,API 调用在用户列表中,并且是批量调用[不是一个公司的单个调用]
我关注的参考文献:https://docs.python.org/3/library/asyncio-queue.html [根据我的用例稍作修改]
我做了什么:以下 3 个小函数,process_cname_vs_users 的输入为 company_id vs users-list
async def update_data(req_id, user_ids, company_id): # <-- THIS IS THE ASYNC CALL ON CHUNK OF SOME USERS
# A gRPC call to server here.
async def worker(worker_id, queue, company_vs_user_ids):
while True:
company_id = await queue.get()
user_ids = cname_vs_user_ids.get(company_id)
user_ids_chunks = get_data_in_chunks(user_ids, 20)
for user_id_chunk in user_ids_chunks:
try:
await update_data(user_id_chunk, company_id)
except Exception as e:
print("error: {}".format(e))
# Notify the queue that the "work item" has been processed.
queue.task_done()
async def process_cname_vs_users(cname_vs_user_ids):
queue = asyncio.Queue()
for company_id in cname_vs_user_ids:
queue.put_nowait(company_id)
tasks = []
for i in range(5): # <- number of workers
task = asyncio.create_task(
worker(i, queue, cname_vs_user_ids))
tasks.append(task)
# Wait until the queue is fully processed.
await queue.join()
# Cancel our worker tasks.
for task in tasks:
task.cancel()
try:
# Wait until all worker tasks are cancelled.
responses = await asyncio.gather(*tasks, return_exceptions=True)
print('Results:', responses)
except Exception as e:
print('Got an exception:', e)
期望:任务应同时为 5 个(工人数量)公司执行。
现实:唯一的首要任务是按顺序为所有公司工作。
任何帮助/建议都会有所帮助。在此先感谢:)
【问题讨论】:
标签: python-3.x async-await python-asyncio coroutine concurrent.futures