【问题标题】:Multiprocessing and multithreading in PythonPython中的多处理和多线程
【发布时间】:2021-02-08 09:59:12
【问题描述】:

我有一个 python 程序,它 1)从磁盘读取一个非常大的文件(约 95% 的时间),然后 2)处理并提供一个相对较小的输出(约 5% 的时间)。该程序将在 TeraBytes 文件上运行。

现在我希望通过利用多处理和多线程来优化这个程序。我正在运行的平台是一个在虚拟机上有 4 个处理器的虚拟机。

我计划有一个调度程序进程,它将执行 4 个进程(与处理器相同),然后每个进程应该有一些线程,因为大部分是 I/O。每个线程将处理 1 个文件并将结果报告给主线程,主线程又将通过 IPC 将其报告回调度程序进程。调度程序可以将这些排队并最终以有序的方式将它们写入磁盘

所以想知道如何决定为这种情况创建的进程和线程的数量?有没有一种数学方法可以找出最好的组合。

谢谢

【问题讨论】:

  • 对于parallel processing,您可以创建的“最佳”线程数等于您拥有的内核数,如here 所说
  • 线程可能不是你想要的,例如 here 解释的那样。
  • 如果您的单进程程序几乎将所有时间都花在读取数据上,那么并行处理将无济于事。每个进程将花费大部分时间等待从磁盘读取数据的机会,而另一个进程正在使用磁盘。多处理可帮助您利用额外的 CPU,而不是神奇地让您的磁盘读取速度更快。
  • 有多少个磁盘为大文件提供服务?在您的瓶颈似乎是 I/O 时增加并行性可能不会有多大好处:您可能会从查找成本中获得更少的吞吐量(如果不是旋转磁盘类型的查找成本,可能会降低预读效率)。但是,例如,如果您在 Linux 上的 md raid1 中有 2 个磁盘,则并行执行 2 个 I/O 操作可能会使吞吐量翻倍:每个操作将从一个磁盘读取。
  • 按照您描述问题的方式,线程似乎没有帮助;当您的 I/O 问题处理多个请求时,线程将最有帮助,但在这种情况下,您正在等待一个文件。除非您可以为每个线程指定要读取此文件的哪些部分,否则线程不会帮助加快文件读取速度。

标签: python multitasking


【解决方案1】:

对于并行处理: 我看到了this question,并引用了已接受的答案:

在实践中,可能很难找到最佳线程数,甚至每次运行程序时,这个数字也可能会有所不同。因此,理论上,最佳线程数将是您机器上的内核数。如果您的核心是“超线程”(英特尔称之为),它可以在每个核心上运行 2 个线程。那么,在这种情况下,最佳线程数是您机器上内核数的两倍。

对于多处理: 有人问过类似的问题here,接受的答案是这样的:

如果您的所有线程/进程确实受 CPU 限制,您应该运行与 CPU 报告的内核数一样多的进程。由于超线程,每个物理 CPU 内核可能能够呈现多个虚拟内核。调用multiprocessing.cpu_count获取虚拟核心数。

如果只有 1 个线程中的 p 受 CPU 限制,则可以通过乘以 p 来调整该数字。例如,如果您的一半进程受 CPU 限制 (p = 0.5),并且您有两个 CPU,每个 CPU 有 4 个内核和 2 个超线程,那么您应该启动 0.5 * 2 * 4 * 2 = 8 个进程。

这里的关键是了解您使用的是什么机器,从中,您可以选择一个接近最佳数量的线程/进程来拆分代码的执行。我说几乎是最优的,因为每次运行脚本时它都会有所不同,所以从数学的角度预测这个最优数字是很困难的。

对于您的具体情况,如果您的机器有 4 个核心,我建议您最多只创建 4 个线程,然后将它们拆分:

  • 1 到主线程。
  • 3 用于文件读取和处理。

【讨论】:

    【解决方案2】:

    使用多个进程来加速 IO 性能可能不是一个好主意,请检查下面的 thissample code 看看是否有帮助

    【讨论】:

      【解决方案3】:

      一个想法可以是让一个线程只读取文件(如果我理解得很好,只有一个文件)并将独立部分(例如行)推送到带有消息的队列中。
      消息可以由 4 个线程处理。这样,您可以优化处理器之间的负载。

      【讨论】:

        【解决方案4】:

        我想我会安排它与你正在做的相反。也就是说,我将创建一个特定大小的线程池来负责产生结果。提交到此池的任务将作为参数传递给处理器池,工作线程可以使用该处理器池来提交受 CPU 限制的工作部分。换句话说,线程池工作人员将主要执行所有与磁盘相关的操作,并将任何 CPU 密集型工作移交给处理器池。

        处理器池的大小应该就是您环境中的处理器数量。很难给线程池一个精确的大小;这取决于在收益递减定律发挥作用之前它可以处理多少并发磁盘操作。它还取决于您的内存:池越大,占用的内存资源就越大,尤其是在必须将整个文件读入内存进行处理的情况下。因此,您可能必须尝试使用​​此值。下面的代码概述了这些想法。您从线程池中获得的 I/O 操作的重叠比您仅使用小型处理器池所能达到的效果要大:

        from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
        from functools import partial
        import os
        
        def cpu_bound_function(arg1, arg2):
            ...
            return some_result
        
        
        
        def io_bound_function(process_pool_executor, file_name):
            with open(file_name, 'r') as f:
                # Do disk related operations:
                . . . # code omitted
                # Now we have to do a CPU-intensive operation:
                future = process_pool_executor.submit(cpu_bound_function, arg1, arg2)
                result = future.result() # get result
                return result
            
        file_list = [file_1, file_2, file_n]
        N_FILES = len(file_list)
        MAX_THREADS = 50 # depends on your configuration on how well the I/O can be overlapped
        N_THREADS = min(N_FILES, MAX_THREADS) # no point in creating more threds than required
        N_PROCESSES = os.cpu_count() # use the number of processors you have
        
        with ThreadPoolExecutor(N_THREADS) as thread_pool_executor:
            with ProcessPoolExecutor(N_PROCESSES) as process_pool_executor:
                results = thread_pool_executor.map(partial(io_bound_function, process_pool_executor), file_list)
        

        重要提示

        另一种更简单的方法是只使用一个处理器池,其大小大于您拥有的 CPU 处理器数量,例如 25 个。工作进程将同时执行 I/O和 CPU 操作。即使您的进程多于 CPU,许多进程仍将处于等待 I/O 完成的等待状态,从而允许 CPU 密集型工作运行。

        这种方法的缺点是创建 N 个进程的开销远大于创建 N 个线程 + 少量进程的开销。但是,随着提交到池的任务的运行时间变得越来越大,这种增加的开销在总运行时间中所占的百分比越来越小。因此,如果您的任务不是微不足道的,那么这可能是一个相当高效的简化。

        更新:两种方法的基准

        我针对这两种处理 24 个大小约为 10,000KB 的文件的方法做了一些基准测试(实际上,这只是 3 个不同的文件,每个文件处理了 8 次,因此可能已经做了一些缓存):

        方法一(线程池+处理器池)

        from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
        from functools import partial
        import os
        from math import sqrt
        import timeit
        
        
        def cpu_bound_function(b):
            sum = 0.0
            for x in b:
                sum += sqrt(float(x))
            return sum
        
        def io_bound_function(process_pool_executor, file_name):
            with open(file_name, 'rb') as f:
                b = f.read()
                future = process_pool_executor.submit(cpu_bound_function, b)
                result = future.result() # get result
                return result
        
        def main():
            file_list = ['/download/httpd-2.4.16-win32-VC14.zip'] * 8 + ['/download/curlmanager-1.0.6-x64.exe'] * 8 + ['/download/Element_v2.8.0_UserManual_RevA.pdf'] * 8
            N_FILES = len(file_list)
            MAX_THREADS = 50 # depends on your configuration on how well the I/O can be overlapped
            N_THREADS = min(N_FILES, MAX_THREADS) # no point in creating more threds than required
            N_PROCESSES = os.cpu_count() # use the number of processors you have
        
            with ThreadPoolExecutor(N_THREADS) as thread_pool_executor:
                with ProcessPoolExecutor(N_PROCESSES) as process_pool_executor:
                    results = list(thread_pool_executor.map(partial(io_bound_function, process_pool_executor), file_list))
                    print(results)
        
        if __name__ == '__main__':
            print(timeit.timeit(stmt='main()', number=1, globals=globals()))
        

        方法 2(仅限处理器池)

        from concurrent.futures import ProcessPoolExecutor
        from math import sqrt
        import timeit
        
        
        def cpu_bound_function(b):
            sum = 0.0
            for x in b:
                sum += sqrt(float(x))
            return sum
        
        def io_bound_function(file_name):
            with open(file_name, 'rb') as f:
                b = f.read()
                result = cpu_bound_function(b)
                return result
        
        def main():
            file_list = ['/download/httpd-2.4.16-win32-VC14.zip'] * 8 + ['/download/curlmanager-1.0.6-x64.exe'] * 8 + ['/download/Element_v2.8.0_UserManual_RevA.pdf'] * 8
            N_FILES = len(file_list)
            MAX_PROCESSES = 50 # depends on your configuration on how well the I/O can be overlapped
            N_PROCESSES = min(N_FILES, MAX_PROCESSES) # no point in creating more threds than required
        
            with ProcessPoolExecutor(N_PROCESSES) as process_pool_executor:
                results = list(process_pool_executor.map(io_bound_function, file_list))
                print(results)
        
        if __name__ == '__main__':
            print(timeit.timeit(stmt='main()', number=1, globals=globals()))
        

        结果:

        (我有 8 个核心)

        线程池 + 处理器池:13.5 秒 仅处理器池:13.3 秒

        结论:我会首先尝试更简单的方法,即对所有内容都使用处理器池。现在棘手的一点是确定要创建的最大进程数,这是您最初问题的一部分,当它所做的只是 CPU 密集型计算时,它有一个简单的答案。如果您正在阅读的文件数量不是太多,那么这一点是没有意义的;每个文件可以有一个进程。但是,如果您有数百个文件,您将不希望在池中拥有数百个进程(您可以创建多少个进程也有上限,并且还有那些令人讨厌的内存限制)。我无法给你一个确切的数字。如果您确实有大量文件,请从较小的池大小开始并不断增加,直到您没有进一步的好处(当然,您可能不希望处理超过这些测试的最大数量的文件,否则您将永远运行,只是为实际运行确定一个合适的池大小)。

        【讨论】:

          【解决方案5】:

          在 I/O 密集型进程(如您所描述的那样)上,您不一定需要多线程或多处理:您还可以使用操作系统中更高级的 I/O 原语。

          例如,在 Linux 上,您可以向内核提交读取请求以及适当大小的可变缓冲区,并在缓冲区填满时收到通知。这可以使用AIO API 来完成,我为此编写了一个纯python 绑定:python-libaio(pypi 上的libaio),或者使用最近的io_uring API,似乎有一个@ 987654325@(pypy 上的liburing)(我既没有使用过 io_uring 也没有使用过这个 python 绑定)。

          这消除了您级别的并行处理的复杂性,可能会减少操作系统/用户空间上下文切换的数量(进一步减少 CPU 时间),并让操作系统了解您正在尝试执行的操作,从而为其提供更有效地调度 IO 的机会(在虚拟化环境中,如果它减少了数据副本的数量,我不会感到惊讶,尽管我自己没有尝试过)。

          当然,缺点是您的程序将更紧密地绑定到您正在执行它的操作系统,需要更多的努力才能让它在另一个操作系统上运行。

          【讨论】:

            猜你喜欢
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 2013-04-03
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 2012-03-17
            相关资源
            最近更新 更多