【问题标题】:Parallel Merge-Sort in OpenMPOpenMP 中的并行合并排序
【发布时间】:2012-11-28 11:11:29
【问题描述】:

我在this 论文中看到了一种并行合并排序算法。这是代码:

void mergesort_parallel_omp (int a[], int size, int temp[], int threads) 
{  
    if ( threads == 1)       { mergesort_serial(a, size, temp); }
    else if (threads > 1) 
    {
         #pragma omp parallel sections
         {
             #pragma omp section
             mergesort_parallel_omp(a, size/2, temp, threads/2);
             #pragma omp section
             mergesort_parallel_omp(a + size/2, size - size/2, temp + size/2, threads - threads/2);
         }
         merge(a, size, temp); 
    } // threads > 1
}

我在多核上运行它。发生的情况是,在树的叶子上,2 个线程并行运行。在他们完成工作后,其他 2 个线程开始,依此类推。即使我们有所有叶节点的空闲核心。

我认为原因是这个 OpenMP 代码不会在并行区域内创建并行区域。我说的对吗?

【问题讨论】:

    标签: c++ c multithreading parallel-processing openmp


    【解决方案1】:

    我认为原因是 OpenMP 无法创建并行区域 平行区域内

    你可以有一个平行区域的平行区域。

    OpenMP 并行区域可以相互嵌套。如果嵌套 并行性被禁用,然后由一个线程创建的新团队 在平行区域内遇到平行构造包括 仅限遇到的线程。如果启用了嵌套并行, 那么新团队可能包含多个线程 (source)。

    为了正确运行您的代码,您需要调用omp_set_nested(1)omp_set_num_threads(2)

    嵌套并行可以通过设置 OMP_NESTED 环境变量或调用 omp_set_nested() 函数


    为了获得更好的性能而不是部分,您可以使用 OpenMP 任务(详细信息和示例可以在 here 找到),如下所示:

    void merge(int * X, int n, int * tmp) {
       ...
    } 
    
    void mergeSort(int *X, int n, int *tmp)
    {  
       if (n < 2) return;
       
       #pragma omp task shared(X) if (n > TASK_SIZE)
       mergeSort(X, n/2, tmp);
       
       #pragma omp task shared(X) if (n > TASK_SIZE)
       mergeSort(X+(n/2), n-(n/2), tmp + n/2);
       
       #pragma omp taskwait
       mergeSortAux(X, n, tmp);
    }
    
    
    
    int main()
    {
       ...
       #pragma omp parallel
       {
          #pragma omp single
          mergesort(data, n, tmp);
       }
    } 
    

    合并算法的顺序代码来自 Dr. Johnnie W. Baker webpage.。但是,我在此答案中提供的代码有一些更正和性能改进。

    完整的运行示例:

    #include <assert.h>
    #include <string.h>
    #include <stdlib.h>
    #include <stdio.h>
    #include <omp.h>
    
    #define TASK_SIZE 100
    
    unsigned int rand_interval(unsigned int min, unsigned int max)
    {
        // https://stackoverflow.com/questions/2509679/
        int r;
        const unsigned int range = 1 + max - min;
        const unsigned int buckets = RAND_MAX / range;
        const unsigned int limit = buckets * range;
    
        do
        {
            r = rand();
        } 
        while (r >= limit);
    
        return min + (r / buckets);
    }
    
    void fillupRandomly (int *m, int size, unsigned int min, unsigned int max){
        for (int i = 0; i < size; i++)
        m[i] = rand_interval(min, max);
    } 
    
    void mergeSortAux(int *X, int n, int *tmp) {
       int i = 0;
       int j = n/2;
       int ti = 0;
    
       while (i<n/2 && j<n) {
          if (X[i] < X[j]) {
             tmp[ti] = X[i];
             ti++; i++;
          } else {
             tmp[ti] = X[j];
             ti++; j++;
          }
       }
       while (i<n/2) { /* finish up lower half */
          tmp[ti] = X[i];
          ti++; i++;
       }
       while (j<n) { /* finish up upper half */
          tmp[ti] = X[j];
          ti++; j++;
       }
       memcpy(X, tmp, n*sizeof(int));
    } 
    
    void mergeSort(int *X, int n, int *tmp)
    {
       if (n < 2) return;
    
       #pragma omp task shared(X) if (n > TASK_SIZE)
       mergeSort(X, n/2, tmp);
    
       #pragma omp task shared(X) if (n > TASK_SIZE)
       mergeSort(X+(n/2), n-(n/2), tmp + n/2);
    
       #pragma omp taskwait
       mergeSortAux(X, n, tmp);
    }
    
    void init(int *a, int size){
       for(int i = 0; i < size; i++)
           a[i] = 0;
    }
    
    void printArray(int *a, int size){
       for(int i = 0; i < size; i++)
           printf("%d ", a[i]);
       printf("\n");
    }
    
    int isSorted(int *a, int size){
       for(int i = 0; i < size - 1; i++)
          if(a[i] > a[i + 1])
            return 0;
       return 1;
    }
    
    int main(int argc, char *argv[]) {
            srand(123456);
            int N  = (argc > 1) ? atoi(argv[1]) : 10;
            int print = (argc > 2) ? atoi(argv[2]) : 0;
            int numThreads = (argc > 3) ? atoi(argv[3]) : 2;
            int *X = malloc(N * sizeof(int));
            int *tmp = malloc(N * sizeof(int));
    
            omp_set_dynamic(0);              /** Explicitly disable dynamic teams **/
            omp_set_num_threads(numThreads); /** Use N threads for all parallel regions **/
    
             // Dealing with fail memory allocation
            if(!X || !tmp)
            { 
               if(X) free(X);
               if(tmp) free(tmp);
               return (EXIT_FAILURE);
            }
    
            fillupRandomly (X, N, 0, 5);
    
            double begin = omp_get_wtime();
            #pragma omp parallel
            {
                #pragma omp single
                mergeSort(X, N, tmp);
            }   
            double end = omp_get_wtime();
            printf("Time: %f (s) \n",end-begin);
        
            assert(1 == isSorted(X, N));
    
            if(print){
               printArray(X, N);
            }
    
            free(X);
            free(tmp);
            return (EXIT_SUCCESS);
    }
    

    4 核机器中的 had-doc 基准测试产生以下结果:

    100000000 elements 
    1 thread : Time: 11.052081 (s)
    2 threads: Time: 5.907508  (s)
    4 threads: Time: 4.984839  (s)
    
    A overall Speed up of 2.21x
    

    GitHub 将提供未来的改进。


    可以在here 找到并行版本的高级 C++ 版本。最终算法如下所示:

    void mergeSortRecursive(vector<double>& v, unsigned long left, unsigned long right) {
       if (left < right) {
          if (right-left >= 32) {
             unsigned long mid = (left+right)/2; 
             #pragma omp taskgroup
             {
                #pragma omp task shared(v) untied if(right-left >= (1<<14))
                mergeSortRecursive(v, left, mid);
                #pragma omp task shared(v) untied if(right-left >= (1<<14))
                mergeSortRecursive(v, mid+1, right);
                #pragma omp taskyield
             }
             inplace_merge(v.begin()+left, v.begin()+mid+1, v.begin()+right+1);
          }else{
             sort(v.begin()+left, v.begin()+right+1);
         }
        }
      }
    }
    
    
    void mergeSort(vector<double>& v) { 
         #pragma omp parallel
         #pragma omp single
         mergeSortRecursive(v, 0, v.size()-1); 
    }
    

    据报道,对于 48 个线程,6.61x 的加速。

    【讨论】:

      【解决方案2】:

      这个问题的现代答案是使用任务而不是部分。任务是在 OpenMP 3.0 (2009) 中添加的,并且比嵌套并行和部分工作得更好/更容易,因为嵌套并行会导致超额订阅(比可用 CPU 更多的活动线程),从而导致性能显着下降。对于任务,您有一组与 CPU 数量相匹配的线程,它们将处理这些任务。因此,您不需要使用 threads 参数进行手动处理。一个简单的解决方案如下所示:

      // span parallel region outside once outside
      void mergesort_omp(...) {
          #pragma omp parallel
          #pragma omp single
          mergesort_parallel_omp(...)
      }
      
      
      void mergesort_parallel_omp (int a[], int size, int temp[]) 
      {  
          #pragma omp task
          mergesort_parallel_omp(a, size/2, temp);
      
          mergesort_parallel_omp(a + size/2, size - size/2, temp + size/2);
      
          #pragma omp taskwait
          merge(a, size, temp); 
      }
      

      但是,为太小的工作块创建任务仍然可能存在问题,因此根据工作粒度限制并行度很有用,例如像这样:

      void mergesort_parallel_omp (int a[], int size, int temp[]) 
      {  
          if (size < size_threshold) {
              mergesort_serial(a, size, temp);
              return;
          }
          #pragma omp task
          mergesort_parallel_omp(a, size/2, temp);
      
          mergesort_parallel_omp(a + size/2, size - size/2, temp + size/2);
      
          #pragma omp taskwait
          merge(a, size, temp); 
      }
      

      【讨论】:

        猜你喜欢
        • 2022-01-10
        • 2013-04-07
        • 1970-01-01
        • 2011-12-21
        • 1970-01-01
        • 1970-01-01
        • 2011-01-02
        • 2016-04-17
        • 1970-01-01
        相关资源
        最近更新 更多