【问题标题】:How to parallelize small pure function?如何并行化小的纯函数?
【发布时间】:2010-10-09 11:06:33
【问题描述】:

我有一个 D2 程序,在其当前形式下,它是单线程的,并且对于该程序的外循环的每次迭代,在内循环中调用相同的纯函数大约 10 到 100 次。调用之间没有数据依赖关系,即没有调用使用任何其他调用的结果。总的来说,这个函数被调用了数百万次,是我程序的主要瓶颈。参数几乎每次都是唯一的,因此缓存无济于事。

乍一看,这似乎是并行化的完美候选者。唯一的问题是该函数每次调用只需要大约 3 微秒,远低于创建新线程的延迟,并且远不高于将作业添加到任务池的开销(意思是,获取互斥体,分配内存到保存有关任务的信息,处理可能的任务池队列争用等)。有没有什么好的方法可以利用这种细粒度的并行性?

【问题讨论】:

  • 3 微秒和 100 次调用?所以这总共需要 0.0003 秒来执行?瓶颈在哪里?
  • 这是外循环的一次迭代。外部循环执行数百万次,未来可能执行数十亿次。
  • 这是我最近问的一个类似问题:stackoverflow.com/questions/564577/…
  • 那么你不应该在外循环中并行化迭代组吗?单个内部迭代似乎不值得并行化。
  • 但是外部循环确实相互依赖,所以我不能。

标签: performance multithreading optimization parallel-processing d


【解决方案1】:

如果创建多个线程,它们有自己的队列可以工作呢?因为队列没有重叠,所以您不必创建锁。

【讨论】:

  • 主线程仍然需要将任务添加到单独的队列中,所以你仍然需要一个锁。
  • 可以使用无锁的单链表实现(例如微软的Interlocked*SList)。
  • 机会很高,他不需要将每个元素都推入队列,但可以说:线程 1 进行前 100000 次计算,线程 2 进行 100001-200000 等等。
【解决方案2】:

根据程序的结构,您总是可以将一组调用组合成一个任务。如果每个任务执行 50 次函数调用,那么任务管理的开销就不再是一个很大的因素了。

【讨论】:

    【解决方案3】:

    不要启动每个线程来运行单个任务然后立即将其关闭。

    在您的程序开始时,为每个核心创建一个线程,等待来自队列(管道或您自己创建的某种机制)的数据。如果你能想出一种机制,让所有线程都在同一个队列上等待,那就更好了,但是队列的get方法必须同步......

    每当您需要计算一个包含数百或数千个进程的块时,将整个块放入下一个空队列。

    实际上,您最终会得到一个或多个线程为队列提供数据,一堆线程处理来自队列的数据,以及一个或多个读取和处理结果的线程。

    您可能需要在您正在处理的“项目”中放入足够的数据,以便在完成后能够告诉您如何处理它们。它们几乎肯定应该是一个对象,您可能希望它们包含状态信息。

    您可能不希望处理的线程数超过核心数。

    编辑:还要查看一些并发库,例如ThreadPoolExecutor。很容易忘记并发库(就像我刚才所做的那样),这可能正是您正在寻找的(因此强调)

    【讨论】:

      【解决方案4】:

      这听起来像是 SIMD 指令可以提供帮助的东西。如果您有一个自动矢量化编译器,您应该能够重写该函数以同时对 4 个值进行操作,并且编译器可以将其压缩为适当的 SSE 指令。这可以帮助减少函数调用开销。如果您的编译器不擅长自动矢量化代码,那么您可以使用 SSE 内在函数来几乎降低到汇编级别来对函数体进行编程。

      【讨论】:

      • 使用当前的编译器,自己编写 SIMD 代码作为内在函数确实要好得多,而不是将其留给矢量化器。是的,理论上现代编译器应该能够自行正确地对代码进行矢量化处理,但实际上它们不能
      【解决方案5】:

      如上所述,不要每次进入该函数时都启动一个线程,并且“作业”粒度大于内部函数的一次操作,以便很好地摊销创建作业的开销。将您的原始例程描述为:

      void OuterFunction( Thingy inputData[N] )
      {
        for ( int i = 0 ; i < N ; ++i )
          InnerFunction( inputData[i] );
      }
      

      我们将通过(假设存在作业队列系统)解决您的问题:

      void JobFunc( Thingy inputData[], int start, int stop )
      {
        for ( int i = start ; i < stop ; ++i )
          InnerFunction( inputData[i] );  
      }
      void OuterFunction( Thingy inputData[N], int numCores )
      {
         int perCore = N / numCores; // assuming N%numCores=0 
                                     // (omitting edge case for clarity)
         for ( int c = 0 ; c < numCores ; ++c )
           QueueJob( JobFunc, inputData, c * perCore, (c + 1) * perCore );
      }
      

      只要您的输入数据是完全独立的,正如您在原始问题中所说,您无需锁定它;仅当线程之间存在依赖关系时才需要同步,而这里没有。

      此外,在这一性能级别上,微优化开始变得相关:最重要的是,缓存位置。预取可以让您走得更远。

      然后考虑 SIMD 的可能性,您可以对其进行矢量化以通过单个寄存器同时运行四个输入点。使用四个核心和 4 宽 SIMD,您可以理论上获得 16 倍的加速,但这假设 InnerFunction 所做的工作主要是一个固定的数学函数,因为分支往往会抹杀 SSE/VMX 的性能提升。

      【讨论】:

        【解决方案6】:

        多么有趣的问题……正如您所指出的,您将无法承担与传统锁定相关的工作队列的开销。如果可以的话,我鼓励您尝试使用现有的基于细粒度任务的编程环境之一......我在三个工作桶中考虑了这一点:

        问题的第一部分是确保安全性、正确性和可并行性,听起来你已经涵盖了这些,因为你的函数是纯函数。

        我认为下一个最具挑战性的部分是描述并发性,特别是你提到这个函数被多次调用。您可以将其流水线化并将调度功能与其工作分开吗?如果你不能通过管道处理它,它看起来像一个并行循环、一个树遍历还是比这更非结构化。具体来说,obeying Amdahl 如果你不能重叠工作并确保它有多个实例或其他东西同时运行,那么即使你是纯粹的,你实际上也是连续的。无论使用何种库,您可以采取任何措施将工作重构为管道、递归树遍历(或并行循环),或者如果您必须进行更多非结构化工作以及任务之间的显式依赖关系,都将有所帮助。

        我考虑的最后一个方面是确保在您的平台上高效执行,这涉及减少代码和调度代码中的开销和争用,并确保任何串行代码绝对尽可能高效。如果您不能使用现有库之一并且必须构建自己的库,我鼓励您查看work-stealing queue 和自导调度算法,正如您所指出的那样,您将无法从中受益使用传统锁,因为它们的成本超过了您的功能成本,您很可能需要查看无锁技术来降低调度成本并将任务删除到您使用的任何队列中。您还需要非常注意调度算法和函数内部的共享和争用,因为在这种粒度级别上,除了通常的分支预测错误和指令吞吐量问题之外,您还需要查看在shared state and contention even on reads because they can be sources of contention too

        很抱歉,如果这不是非常具体,但我希望它有用。

        【讨论】:

          【解决方案7】:

          您也许可以使用比较和交换将循环翻转过来,以获得原子无锁增量:

          void OuterFunction()
          {
            for(int i = 0; i < N; i++)
              InnerFunction(i);
          }
          

          前往:

          void OuterFunction()
          {
             int i = 0, j = 0;
          
             void Go()
             {
                int k;
                while((k = atomicInc(*i)) < N)
                {
                   InnerFunction(k);
          
                   atomicInc(*j);
                }
             }
          
             for(int t = 0; t < ThreadCount - 1; t++) Thread.Start(&Go);
          
             Go(); // join in
          
             while(j < N) Wait(); // let everyone else catch up.
          }
          

          编辑:我的线程生锈了,因此无法编译,因为名称都错误

          【讨论】:

            【解决方案8】:

            调用之间没有数据依赖关系,即没有调用使用任何其他调用的结果。

            这将有助于并行化,但绝对可以肯定该函数根本没有副作用。如果函数正在更新数据结构,它是线程安全的吗?如果它在做 IO,如果你设法并行执行函数,那么 IO 最终会成为瓶颈吗?

            如果这些问题的答案是“是”,那么前面的建议很好,只需尝试通过将函数的执行分配给每个线程尽可能多的次数来最大化应用程序的粒度。

            不过,您可能不会从大规模并行性中获得任何好处,但也许可以获得一些更适度的加速......

            【讨论】:

              猜你喜欢
              • 1970-01-01
              • 2017-12-24
              • 2020-12-31
              • 2019-05-07
              • 2013-02-12
              • 2023-03-26
              • 1970-01-01
              • 2014-12-31
              • 1970-01-01
              相关资源
              最近更新 更多