【问题标题】:How to run Python code in multithreading environment?如何在多线程环境中运行 Python 代码?
【发布时间】:2021-03-27 03:11:41
【问题描述】:

我正在尝试使用 asyncio 在 Python 上并行执行代码。这个想法是并行运行多个作业。

这是我的代码:

import asyncio
import threading

async def print_thread():
    for n in range(5):
        print("Number: {}".format(threading.get_ident()))

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(print_thread())
    finally:
        loop.close()

输出是:

Number: 4599266752
Number: 4599266752
Number: 4599266752
Number: 4599266752
Number: 4599266752

据我了解,代码已在单个线程上执行。有没有办法并行化它?

PS

如果我将代码更改为:

async def print_thread():
    print("Number: {}".format(threading.get_ident()))


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        for n in range(5):
            loop.run_until_complete(print_thread())

我得到了同样的结果。

【问题讨论】:

  • 有什么理由不使用线程池(multiprocessing.dummy.pool.Pool)?
  • asynciomultiprocessing 有区别吗?我是 Python 的新手。

标签: python python-asyncio


【解决方案1】:

你的for 循环在你的协程中,所以它不能在不同的线程中。但即使你将循环放在异步函数之外,它仍然会在同一个线程中运行:

import asyncio
import threading


async def print_thread():
    print("Thread: {}".format(threading.get_ident()))


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    tasks = []
    for i in range(10):
        tasks.append(asyncio.ensure_future(print_thread()))
    loop.run_until_complete(asyncio.gather(*tasks))
    

仍然会输出相同的 id:

Thread: 140366741292864
Thread: 140366741292864
Thread: 140366741292864
Thread: 140366741292864
Thread: 140366741292864
Thread: 140366741292864
Thread: 140366741292864
Thread: 140366741292864
Thread: 140366741292864
Thread: 140366741292864

解决方案是使用ThreadPoolExecutor,但它需要一个函数,而不是协程,所以你必须从定义中删除async

import asyncio
import threading
import concurrent.futures


def print_thread():
    print("Thread: {}".format(threading.get_ident()))


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    with concurrent.futures.ThreadPoolExecutor() as pool:
        for i in range(10):
            loop.run_in_executor(pool, print_thread)

哪些输出:

Thread: 140446369556224
Thread: 140446361163520
Thread: 140446369556224
Thread: 140446361163520
Thread: 140446369556224
Thread: 140446352508672
Thread: 140446361163520
Thread: 140446344115968
Thread: 140446369556224
Thread: 140446335723264

如你所见,线程比调用少,这很正常。但是如果你有大批量,你可以在ThreadPoolExecutor构造函数中使用max_workers参数来改变线程数。

如果你还想使用协程,那里有解决方案:https://*.com/a/46075571/7414475

【讨论】:

  • 我怎样才能等待你的例子中的所有期货?我需要收集所有期货并等待它们吗?
  • 是的,我会添加另一个答案!
【解决方案2】:

根据 cmets 的要求,关于结果收集的另一个答案:

import asyncio
import threading
import concurrent.futures


def get_thread():
    return "Thread: {}".format(threading.get_ident())


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    with concurrent.futures.ThreadPoolExecutor() as pool:
        tasks = []
        for i in range(10):
            tasks.append(loop.run_in_executor(pool, get_thread))
        print(loop.run_until_complete(asyncio.gather(*tasks)))

输出:

['Thread: 139740266125056', 'Thread: 139740266125056', 'Thread: 139740266125056', 'Thread: 139740183525120', 'Thread: 139740266125056', 'Thread: 139740175132416', 'Thread: 139740183525120', 'Thread: 139740166739712', 'Thread: 139740266125056', 'Thread: 139740158347008']

【讨论】:

  • 如果我需要给get_thread加锁,是否可以在ThreadPoolExecutor中加锁?
最近更新 更多