【问题标题】:Turning multithreading code with unlimited threads into multithreading code with max number of simultaneously running threads将具有无限线程的多线程代码转换为具有最大同时运行线程数的多线程代码
【发布时间】:2019-10-10 23:02:52
【问题描述】:

我有一个通过多线程执行某个功能的脚本。现在,让并行运行的线程数量与拥有 CPU 核心的数量一样多是很有趣的。 现在使用 threading.thread 语句的当前代码 (1:) 创建了 1000 个线程并同时运行它们。 我想把它变成同时只运行固定数量的线程(例如 8 个)的东西,并将其余的放入队列中,直到正在执行的线程/cpu 核心可以免费使用。

1:

import threading

nSim = 1000

def simulation(i):
    print(str(threading.current_thread().getName()) + ': '+ str(i))

if __name__ == '__main__':
    threads = [threading.Thread(target=simulation,args=(i,)) for i in range(nSim)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()

Q1:代码2:是否符合我的描述? (同时运行最大线程数的多线程)是否正确? (我想是的,但我不是 100% 确定)

Q2:现在代码同时启动1000个线程,在8个线程上执行。有没有办法只在执行线程/cpu 内核空闲时才启动一个新线程(这样我就没有 990 个线程调用从一开始就等待执行?

Q3:有没有办法跟踪哪个cpu-core执行了哪个线程?只是为了证明代码正在做它应该做的事情。

2:

import threading
import multiprocessing

print(multiprocessing.cpu_count())
from concurrent.futures import ThreadPoolExecutor

nSim = 1000

def simulation(i):
    print(str(threading.current_thread().getName()) + ': '+ str(i))

if __name__ == '__main__':
    with ThreadPoolExecutor(max_workers=8) as executor:
        for i in range (nSim):
            res = executor.submit(simulation, i)
            print(res.result())

【问题讨论】:

    标签: python multithreading python-multithreading threadpoolexecutor


    【解决方案1】:

    A1:为了限制可以同时访问某些资源的线程数,你可以使用 threading.Semaphore 实际上 1000 个线程不会给你带来巨大的速度提升,每个进程的推荐线程数是 mp.cpu_count() *1 或某些文章中的 mp.cpu_count()*2。另请注意,线程适用于 python 中的 IO 操作,但由于 GIL,不适用于计算。

    A2。如果您只想同时运行 8 个线程,为什么需要这么多线程?仅创建 8 个线程,然后在任务准备就绪时为它们提供任务,为此您需要使用线程安全的 queue.Queue()。但是在您的具体示例中,您可以执行以下操作,以便在模拟功能中使用 while 每个线程运行 250 次测试,顺便说一下,在这种情况下您不需要 Semaphore。

    A3。当我们谈论多线程时,您有一个具有多个线程的进程。

    导入线程
    进口时间
    将多处理导入为 mp

    def 模拟(i,_s):
    # s 是线程。Semaphore()
    与_s:
    打印(str(threading.current_thread().getName()) + ': ' + str(i))
    time.sleep(3)

    如果 name == 'ma​​in':
    print("核心数:{}".format(mp.cpu_count()))
    # 在一些文章中推荐线程数为 mp.cpu_count()*1 或 mp.cpu_count()*2
    nSim = 25

    s = threading.Semaphore(4) # max number of threads which can work simultaneously with resource is 4 threads = [threading.Thread(target=simulation, args=(i, s, )) for i in range(nSim)] for t in threads: t.start() # just to prove that all threads are active in the start and then their number decreases when the work is done for i in range(6): print("Active threads number {}".format(threading.active_count())) time.sleep(3)

    【讨论】:

    • A2:好吧,我将使用 python 来模拟 dymola 模型。每个模型的 cpu 时间可以/可能会有所不同,因此如果您只创建 8 个线程并为它们提供每 125 个任务(总共 = 1000 个),我猜每个线程的活动线程时间将是不同的效率低于激活所有线程并让它们等待第一个空闲执行任务的线程?
    • A3: 好的,所以 Semaphore 用于限制同时运行的线程的数量,但这也意味着它们同步运行,并且线程在开始下一个线程之前等待最后一个线程完成其工作?难道没有更有效的方法允许异步执行任务,同时仍然只使用 4 作为同时运行的最大线程数?
    • @Matthi9000 我用 100 个线程执行了三个测试,其中 4 个线程分别打印 i 250 次,并使用多处理模型。 1000个线程与信号量25.252382700000002,25.2508892,25.3252732,25.2702972,25.247306只有4,其每一个做尝试250线程:25.3412317,25.336504599999998,25.3274046,25.3280475,25.484402600000003只有4多进程:25.561975399999998,25.6724675,25.732875。最快的是1000个线程,最慢的是多处理,但是如果每个进程都进行大量计算,mp会是最快的,所以这真的取决于测试的种类
    • @Matthi9000 4 个线程可能比 1000 个线程慢,因为全局锁会“减慢”计算(我将 4 个线程中的每个线程中的整数从 250 减少到 0)但不影响 IN OUT 操作(像打印),所以这可能是信号量的 1000 个线程比 4 个线程慢的原因。您可以查看本文以获取更多信息medium.com/@bfortuner/…如果答案对您有用,请将其标记为解决方案。
    【解决方案2】:

    A1:不,您的代码提交了一个任务,在res 中接收到一个Future,然后调用result,它等待 得到结果。只有在前一个任务完成后,才会给线程一个新任务。一次只有一个工作线程真正在工作。

    查看ThreadPool.map(实际上是Pool.map)而不是submit,以便在工作人员之间分配任务。

    A2:这里最多只使用 8 个线程(worker 的数量)。如果使用map,可能会存储 1000 个任务的输入数据(需要内存),但不会创建额外的线程。

    A3:我不知道。线程不绑定到内核,它可以在它们之间快速切换。

    【讨论】:

    • A1:谢谢,不知道一个工作人员正在等待另一个工作人员执行任务。 pool.map 的问题是我不知道如何设置参数“i”(这对于“提交”功能很简单 A2:是的,它使用内存的事实不一定是问题但如果任务仅在 OS A3 可以执行时才使用内存会更有效:我问这个的原因是我正在使用 python-dymola 接口从 python 运行 1000 次模拟。最快的方法这是每个 cpu 核心上只运行一个模拟的情况;性能方面
    猜你喜欢
    • 2020-02-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-10-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多