【问题标题】:Assignment of an element to an array does not work (Parallel Mergesort with OpenMP)将元素分配给数组不起作用(使用 OpenMP 进行并行合并排序)
【发布时间】:2021-10-01 09:14:53
【问题描述】:

我想并行化归并排序。我的想法是查看第一个子数组中间的值,然后对另一个数组执行二进制搜索以找到一个上限,其索引应该作为一个支点来拆分第二个数组。然后我对前半部分进行排序第一阵列的第一部分顺序地与第二阵列的第一部分以及第一阵列的后半部分与第二阵列的后半部分。我想在 open mp 中使用任务,这样一个线程执行第一次合并,另一个线程执行第二次合并。

在我的代码中,赋值cretes 垃圾值。 IE。 out[a]=in[b]; 然后当我检查printf("%d%",out[a]); 时,我得到一个值c,它不是in[b}。当我打印出数组时,out[i] 的值有时与打印函数显示的值不同,即out[a]=dd 既不是c 也不是in[b]

我已经准备了一些打印语句的代码和一些输出,显示值in[b]in[a]

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/time.h>

#include <iostream>
#include <algorithm>

#include <cstdlib>
#include <cstdio>

#include <cmath>
#include <ctime>
#include <cstring>
#include <omp.h>
// Constants.h
#if !defined(MYLIB_CONSTANTS_H)
#define MYLIB_CONSTANTS_H 1

const int CUTOFF =11;

#endif



int binarysearchfinduperrank(int *in,int n,int value, int projection){

    int* array= in+projection;
    int L=0;
    int R=n;

      printf("\nUpperBinarysearchrankfinder [");
      for(int i=0;i<n; i++){
          printf("%d,",array[i]);

     }
     printf("]\n");
    while(R-L>1){
        int middle = (R+L)/2;
         printf("L:%d,middle:%d,R:%d,value:%d\n",L,middle,R,value);
        if(array[middle]==value){
            while(array[middle]==value&&middle<n){
                middle=middle+1;

            }
             printf("L:%d,R:%d,result:%d,index:%d\n",L,R,middle,middle+projection);
            return middle;

        }
        if(array[middle]<value){
            L=middle;

        }
        if(array[middle]>value){
            R=middle;

        }
    }

      printf("L:%d,R:%d,value:%d\n",L,R,value);

     if(n==1){
         if(array[0]> value){
              printf(" result:0\n,index:%d\n",0+projection);
             return 0;

        }
        else{
             printf(" result:1,index:%d\n",1+projection);
            return 1;

        }

    }

    if(L==0&&array[L]>value){
         printf("L:%d,R:%d,result:%d,index:%d\n",L,R,0,projection);
        return 0;

    }
    if(R==n && array[R-1]<= value){
          printf("L:%d,R:%d,result:%d,index:%d\n",L,R,n,n+projection);
        return n;

    }
    if(R==n&& array[R-1]>value){
          printf("L:%d,R:%d,result:%d,index:%d\n",L,R,R-1,((R-1)+projection));
        return R-1;

    }
    if(array[R]<=value){
          printf("L:%d,R:%d,result:%d,index:%d\n",L,R,R+1,(R+1+projection));
        return R+1;

    }
    if(array[L]<=value){
         printf("L:%d,R:%d,result:%d,index:%d\n",L,R,R,R+projection);
        return R;

    }

     printf("L:%d,R:%d,result:%d,index:%d\n",L,R,L,L+projection);
    return L;


}

void printarray(int *array,int n){
    printf("\n[");
    for(int i=0;i<n;i++){
        printf("%d,",array[i]);
    }
    printf("]\n");
}


void MsMergeSequential(int *out, int *in, int begin1, int end1, int begin2, int end2, int outBegin) {
    int left = begin1;
    int right = begin2;
    int idx = outBegin;
    printf("Inputarray1:");
    printarray(in+begin1,(end1-begin1));

    printf("Inputarray2:");
    printarray(in+begin2,(end2-begin2));


    while (left < end1 && right < end2) {
        if (in[left] <= in[right]) {
            out[idx] = in[left];
            printf("! %d \n",in[left]);
            printf(" %d \n",out[idx]);
            left++;
        } else {

            out[idx] = in[right];
            printf("!! %d \n",in[left]);
            printf(" %d \n",out[idx]);
            right++;
        }
        idx++;
        }

    while (left < end1) {

        out[idx] = in[left];
        printf("!!! %d \n",in[left]);

        printf(" %d \n",out[idx]);
        left++, idx++;
    }
    while (right < end2) {
        out[idx] = in[right];
        printf(" !!!!%d \n",in[right]);
        printf(" %d \n",out[idx]);
        right++, idx++;
    }

    printf("Outputarray:\n[");
    printarray(out+begin1,(end1-begin1)+(end2-begin2));
    printf("]\n");
}


bool myfunc (long i , long j){return (i<j);}

void MsSequential(int *array, int *tmp, bool inplace, long begin, long end) {
  if( end <= begin + CUTOFF -1){

    std::sort(array+begin,array + end, myfunc);
  }
  else  if (begin < (end - 1)) {
           long half =(begin+end) / 2;
           int halffirst= (half+begin)/2;


            #pragma omp taskgroup
         {
           #pragma omp task shared(array) untied if(end-begin >= (1<<15))

             MsSequential(array, tmp, !inplace, begin, half);

             MsSequential(array, tmp, !inplace, half, end);
              }
        if (inplace){
            printf("Originalinputarray-tmp:");
            printarray(tmp+begin,(end-begin));
            int halfsecond=binarysearchfinduperrank(tmp,(end-half),tmp[halffirst],half)+half+begin;
            printf("%d,%d\n",halffirst,halfsecond);
            printf("Halffirst:%d,Halfsecond:%d\n,",tmp[halffirst],tmp[halfsecond]);
            #pragma omp taskgroup
         {
           #pragma omp task shared(tmp) untied if(end-begin >= (1<<15))

             MsMergeSequential(array, tmp, begin, halffirst, half, halfsecond, begin);


            MsMergeSequential(array, tmp, halffirst, half,halfsecond, end, begin+(halffirst-begin)+(halfsecond-half));
              }



        }
        else {
            int halfsecond=binarysearchfinduperrank(array,(end-half),array[halffirst],half)+half;
            printf("%d,%d\n",halffirst,halfsecond);
            printf("Originalinputarray-arr:");
            printarray(array+begin,(end-begin));
            printf("Halffirst:%d,Halfsecond:%d\n,",array[halffirst],array[halfsecond]);


             #pragma omp taskgroup
         {
           #pragma omp task shared(array) untied if(end-begin >= (1<<15))

             MsMergeSequential(tmp, array, begin, halffirst, half,halfsecond, begin);

            MsMergeSequential(tmp, array, halffirst, half,halfsecond, end, begin+halffirst+(halfsecond-half));
              }




        }

    } else if (!inplace) {

        tmp[begin] = array[begin];
    }
}

void MsParallel(int *array, int *tmp, const size_t size) {

    MsSequential(array, tmp, true, 0, size);
}




bool isSorted(int ref[], int data[], const size_t size){
    std::sort(ref, ref + size);
    for (size_t idx = 0; idx < size; ++idx){
        if (ref[idx] != data[idx]) {
            printf("\nFalscher Index:%d\n",idx);
            return false;
        }
    }
    return true;
}

/**

/**
  * @brief program entry point
  */
int main(int argc, char* argv[]) {
    // variables to measure the elapsed time
    struct timeval t1, t2;
    double etime;

    // expect one command line arguments: array size
    if (argc != 2) {
        printf("Usage: MergeSort.exe <array size> \n");
        printf("\n");
        return EXIT_FAILURE;
    }
    else {
        const size_t stSize = strtol(argv[1], NULL, 10);
        int *data = (int*) malloc(stSize * sizeof(int));
        int *tmp = (int*) malloc(stSize * sizeof(int));
        int *ref = (int*) malloc(stSize * sizeof(int));
        printf("Initialization...\n");

        srand(95);

        #pragma omp parallel for num_threads(100) schedule(static)
        {
        for (size_t idx = 0; idx < stSize; ++idx){
            data[idx] = (int) (stSize * (double(rand()) / RAND_MAX));
        }
        }
        std::copy(data, data + stSize, ref);
        double dSize = (stSize * sizeof(int)) / 1024 / 1024;
        printf("Sorting %u elements of type int (%f MiB)...\n", stSize, dSize);
        gettimeofday(&t1, NULL);
        // Mergesort starts
        #pragma omp parallel num_threads(80)
        {
        #pragma omp single
        {
        MsParallel(data, tmp, stSize);
        }
        }



        gettimeofday(&t2, NULL);
        etime = (t2.tv_sec - t1.tv_sec) * 1000 + (t2.tv_usec - t1.tv_usec) / 1000;
        etime = etime / 1000;
        printf("done, took %f sec. Verification...", etime);
        if (isSorted(ref, data, stSize)) {
            printf(" successful.\n");
        }
        else {
            printf(" FAILED.\n");
        }
        free(data);
        //delete[] data;
        free(tmp);
        //delete[] tmp;
        free(ref);
        //delete[] ref;
    }
    return EXIT_SUCCESS;
}

输出:(!标记用于定位代码工具中赋值的位置)

Initialization...
Sorting 40 elements of type int (0.000000 MiB)...

UpperBinarysearchrankfinder [0,8,8,15,26,29,30,38,39,39,]
L:0,middle:5,R:10,value:15
L:0,middle:2,R:5,value:15
L:2,middle:3,R:5,value:15
L:2,R:5,result:4,index:14
5,14
Originalinputarray-arr:
[0,4,6,7,11,15,17,19,29,33,0,8,8,15,26,29,30,38,39,39,]
Halffirst:15,Halfsecond:26
,Inputarray1:
[0,4,6,7,11,]
Inputarray2:
[0,8,8,15,]
! 0
 0
!! 4
 0
! 4
 4
! 6
 6
! 7
 7
!! 11
 8
!! 11
 8
! 11
 11
 !!!!15
 15
Outputarray:
[
[0,0,4,6,7,8,8,11,15,]
]
Inputarray1:
[15,17,19,29,33,]
Inputarray2:
[26,29,30,38,39,39,]
! 15
 15
! 17
 17
! 19
 19
!! 29
 26
! 29
 29
!! 33
 29
!! 33
 30
! 33
 33
 !!!!38
 38
 !!!!39
 39
 !!!!39
 39
Outputarray:
[
[8,8,11,15,15,17,19,26,29,29,30,]
]

UpperBinarysearchrankfinder [1,5,7,11,16,26,27,30,32,36,]
L:0,middle:5,R:10,value:26
L:0,R:10,result:6,index:36
25,36
Originalinputarray-arr:
[3,4,5,7,10,26,29,33,34,35,1,5,7,11,16,26,27,30,32,36,]
Halffirst:26,Halfsecond:27
,Inputarray1:
[3,4,5,7,10,]
Inputarray2:
[1,5,7,11,16,26,]
!! 3
 1
! 3
 3
! 4
 4
! 5
 5
!! 7
 5
! 7
 7
!! 10
 7
! 10
 10
 !!!!11
 11
 !!!!16
 16
 !!!!26
 26
Outputarray:
[
[1,3,4,5,5,7,7,10,11,16,26,]
]
Inputarray1:
[26,29,33,34,35,]
Inputarray2:
[27,30,32,36,]
! 26
 26
!! 29
 27
! 29
 29
!! 33
 30
!! 33
 32
! 33
 33
! 34
 34
! 35
 35
 !!!!36
 36
Outputarray:
[
[7,7,10,11,16,26,-1751738988,-1684366952,-6382180,]
]
Originalinputarray-tmp:
[0,0,4,6,7,8,8,11,15,15,17,19,26,29,29,30,33,38,39,39,1,3,4,5,5,7,7,10,11,16,26,-1751738988,-1684366952,-6382180,-1549622880,-1482250844,-1414878808,-1347506772,-1280134736,-1212762700,]

UpperBinarysearchrankfinder [1,3,4,5,5,7,7,10,11,16,26,-1751738988,-1684366952,-6382180,-1549622880,-1482250844,-1414878808,-1347506772,-1280134736,-1212762700,]
L:0,middle:10,R:20,value:17
L:0,middle:5,R:10,value:17
L:5,middle:7,R:10,value:17
L:7,middle:8,R:10,value:17
L:8,middle:9,R:10,value:17
L:9,R:10,value:17
L:9,R:10,result:10,index:30
10,30
Halffirst:17,Halfsecond:26
,Inputarray1:
[0,0,4,6,7,8,8,11,15,15,]
Inputarray2:
[1,3,4,5,5,7,7,10,11,16,]
! 0
 0
! 0
 0
!! 4
 1
!! 4
 3
! 4
 4
!! 6
 4
!! 6
 5
!! 6
 5
! 6
 6
! 7
 7
!! 8
 7
!! 8
 7
! 8
 8
! 8
 8
!! 11
 10
! 11
 11
!! 15
 11
! 15
 15
! 15
 15
 !!!!16
 16
Outputarray:
[
[0,0,1,3,4,4,5,5,6,7,7,7,8,8,10,11,11,15,15,16,]
]
Inputarray1:
[17,19,26,29,29,30,33,38,39,39,]
Inputarray2:
[26,-1751738988,-1684366952,-6382180,-1549622880,-1482250844,-1414878808,-1347506772,-1280134736,-1212762700,]
! 17
 17
! 19
 19
! 26
 26
!! 29
 26
!! 29
 -1751738988
!! 29
 -1684366952
!! 29
 -6382180
!! 29
 -1549622880
!! 29
 -1482250844
!! 29
 -1414878808
!! 29
 -1347506772
!! 29
 -1280134736
!! 29
 -1212762700
!!! 29
 29
!!! 29
 29
!!! 30
 30
!!! 33
 33
!!! 38
 38
!!! 39
 39
!!! 39
 39
Outputarray:
[
[7,7,8,8,10,11,11,15,15,16,17,19,26,26,-1751738988,-1684366952,-6382180,-1549622880,-1482250844,-1414878808,]
]
done, took 0.046000 sec. Verification...
Falscher Index:1
 FAILED.

【问题讨论】:

  • 据我所知,您永远不会为 tmp 数组的任何元素分配任何值,但您肯定正在阅读它们。通过访问未初始化的对象,您的程序会表现出未定义的行为。

标签: c++ arrays algorithm sorting mergesort


【解决方案1】:

如果你想并行化它,你应该使用线程并行进行排序部分。我在您的代码中看不到任何并行化。

正常的合并排序是:

mergesort(arr, l, r):
1. middle = (l + r)/2
2. mergesort(arr, l, middle)
3. mergesort(arr, middle + 1, r)
4. merge(arr, l, middle, r) //merges the subarray

如果你想并行化,你可以创建一个线程来做 2 和另一个线程来做 3,然后加入它们,一旦它们完成,你就可以进行合并。

mergesort(arr, l, r):
1. middle = (l + r)/2
2. create thread to execute mergesort(arr, l, middle)
3. create thread to execute mergesort(arr, middle + 1, r)
4. wait until both threads are done
5. merge(arr, l, middle, r) //merges the subarray

时间复杂度为 T(n) = T(n/2) + O(n),如果你求解,则为 n + n/2 + n/4 + ... 1 = O(n)。

【讨论】:

  • 如果系统可以并行运行n个线程,则时间复杂度会降低到线性,这是极不可能的。
  • 确实如此。理论上,理论和实践是相同的,但实际上它们不是:)
  • 对于归并排序,通常初始调用是 mergesort(arr, 0, n),其中 n 是 arr[] 中的元素数,在这种情况下递归调用是 mergesort(arr, l , 中间) |合并排序(arr,中间,r)。 {not middle + 1} .
【解决方案2】:

我创建了一个自上而下合并排序的简化顺序示例,使用二分搜索进行合并。我验证了代码是使用修改版本的稳定排序。 MergeSortAtoA 和 MergeSortAtoB 是相互调用的相互递归函数,而不是传递就地标志。其要点是函数 BinSrchLo、BinSrchHi 和 Merge。 BinSrchxx(a, ll, rr, v) 返回将 v 插入到 a[ll,rr] 的索引(好像 a[] 扩展了 1 个元素以为插入的元素腾出空间)。例如,如果 v a[rr-1],则返回 rr。请注意,merge 在两个 for 循环中都分配了 b[i+j-rr] = a[i]。检查您的代码与此工作示例之间的差异。

void MergeSortAtoA(int a[], int b[], size_t ll, size_t ee);
void MergeSortAtoB(int a[], int b[], size_t ll, size_t ee);
void Merge(int a[], int b[], size_t ll, size_t rr, size_t ee);

void MergeSort(int a[], int b[], size_t n)
{
    MergeSortAtoA(a, b, 0, n);
}

void MergeSortAtoA(int a[], int b[], size_t ll, size_t ee)
{
    if (ee-ll > 1) {
        size_t rr = (ll + ee)>>1;       /* midpoint, start of right half */
        MergeSortAtoB(a, b, ll, rr);
        MergeSortAtoB(a, b, rr, ee);
        Merge(b, a, ll, rr, ee);        /* merge b to a */
    }
}

void MergeSortAtoB(int a[], int b[], size_t ll, size_t ee)
{
    if (ee-ll > 1) {
        size_t rr = (ll+ee)/2;          /* midpoint, start of right half */
        MergeSortAtoA(a, b, ll, rr);
        MergeSortAtoA(a, b, rr, ee);
        Merge(a, b, ll, rr, ee);        /* merge a to b */
    } else if (ee-ll == 1) {
        b[ll] = a[ll];
    }
}

size_t BinSrchLo(int a[], size_t ll, size_t rr, int v)
{
    while(ll < rr){
        size_t i = (ll+rr)/2;
        if(a[i] < v)
            ll = i+1;
        else
            rr = i;
    }
    return ll;
}

size_t BinSrchHi(int a[], size_t ll, size_t rr, int v)
{
    while(ll < rr){
        size_t i = (ll+rr)/2;
        if(a[i] <= v)
            ll = i+1;
        else
            rr = i;
    }
    return ll;
}

void Merge(uint64_t a[], uint64_t b[], size_t ll, size_t rr, size_t ee)
{
    for(size_t i = ll; i < rr; i++){
        size_t j = BinSrchLo(a, rr, ee, a[i]);
        b[i+j-rr] = a[i];
    }
    for(size_t i = rr; i < ee; i++){
        size_t j = BinSrchHi(a, ll, rr, a[i]);
        b[i+j-rr] = a[i];
    }
}

【讨论】:

    【解决方案3】:

    代码中有一些错误,因此变量 halffirst 没有分配给它应该分配的索引。底部的叶子数组也充满了垃圾值。固定代码在这里。大小为 10^7 的数组的运行时间为 0.2 秒,大小为 10^8 的数组的运行时间为 2.4 秒,假设计算机有 256 个线程

    #include <stdio.h>
    #include <stdlib.h>
    #include <errno.h>
    #include <sys/time.h>
    
    #include <iostream>
    #include <algorithm>
    
    #include <cstdlib>
    #include <cstdio>
    
    #include <cmath>
    #include <ctime>
    #include <cstring>
    #include <omp.h>
    // Constants.h
    #if !defined(MYLIB_CONSTANTS_H)
    #define MYLIB_CONSTANTS_H 1
    
    const int CUTOFF =11;
    
    #endif
    
    
    
    int binarysearchfinduperrank(int *in,int n,int value, int projection){
    
        int* array= in+projection;
        int L=0;
        int R=n;
    
          /*printf*/("\nUpperBinarysearchrankfinder [");
    //       for(int i=0;i<n; i++){
    //           printf("%d,",array[i]);
    
    //      }
    //      printf("]\n");
        while(R-L>1){
            int middle = (R+L)/2;
    //          printf("L:%d,middle:%d,R:%d,value:%d\n",L,middle,R,value);
            if(array[middle]==value){
                while(array[middle]==value&&middle<n){
                    middle=middle+1;
    
                }
    //              printf("L:%d,R:%d,result:%d,index:%d\n",L,R,middle,middle+projection);
                return middle;
    
            }
            if(array[middle]<value){
                L=middle;
    
            }
            if(array[middle]>value){
                R=middle;
    
            }
        }
    
    //       printf("L:%d,R:%d,value:%d\n",L,R,value);
    
         if(n==1){
             if(array[0]> value){
    //               printf(" result:0\n,index:%d\n",0+projection);
                 return 0;
    
            }
            else{
    //              printf(" result:1,index:%d\n",1+projection);
                return 1;
    
            }
    
        }
    
        if(L==0&&array[L]>value){
    //          printf("L:%d,R:%d,result:%d,index:%d\n",L,R,0,projection);
            return 0;
    
        }
        if(R==n && array[R-1]<= value){
    //           printf("L:%d,R:%d,result:%d,index:%d\n",L,R,n,n+projection);
            return n;
    
        }
        if(R==n&& array[R-1]>value){
    //           printf("L:%d,R:%d,result:%d,index:%d\n",L,R,R-1,((R-1)+projection));
            return R-1;
    
        }
        if(array[R]<=value){
    //           printf("L:%d,R:%d,result:%d,index:%d\n",L,R,R+1,(R+1+projection));
            return R+1;
    
        }
        if(array[L]<=value){
    //          printf("L:%d,R:%d,result:%d,index:%d\n",L,R,R,R+projection);
            return R;
    
        }
    
    //      printf("L:%d,R:%d,result:%d,index:%d\n",L,R,L,L+projection);
        return L;
    
    
    }
    
    void printarray(int *array,int n){
        printf("\n[");
        for(int i=0;i<n;i++){
            printf("%d,",array[i]);
        }
        printf("]\n");
    }
    
    
    void MsMergeSequential(int *out, int *in, int begin1, int end1, int begin2, int end2, int outBegin) {
        int left = begin1;
        int right = begin2;
        int idx = outBegin;
    //     printf("Inputarray1:");
    //     printarray(in+begin1,(end1-begin1));
    
    //     printf("Inputarray2:");
    //     printarray(in+begin2,(end2-begin2));
    
    
        while (left < end1 && right < end2) {
            if (in[left] <= in[right]) {
                out[idx] = in[left];
    //             printf("! %d \n",in[left]);
    //             printf(" %d \n",out[idx]);
                left++;
            } else {
    
                out[idx] = in[right];
    //             printf("!! %d \n",in[left]);
    //             printf(" %d \n",out[idx]);
                right++;
            }
            idx++;
            }
    
        while (left < end1) {
    
            out[idx] = in[left];
    //         printf("!!! %d \n",in[left]);
    
    //         printf(" %d \n",out[idx]);
            left++, idx++;
        }
        while (right < end2) {
            out[idx] = in[right];
    //         printf(" !!!!%d \n",in[right]);
    //         printf(" %d \n",out[idx]);
            right++, idx++;
        }
    
    //  printf("Outputarray:\n[");
    //     printarray(out+outBegin,end2-outBegin);
    //     printf("]\n");
    }
    
    
    bool myfunc (long i , long j){return (i<j);}
    
    void MsSequential(int *array, int *tmp, bool inplace, long begin, long end) {
      if( end <= begin + CUTOFF -1){
    
        std::sort(array+begin,array + end, myfunc);
        std::copy(array+begin,array+end,tmp+begin);
      }
      else  if (begin < (end - 1)) {
               long half =(begin+end) / 2;
               int halffirst= (half+begin)/2;
    
    
                #pragma omp taskgroup
             {
               #pragma omp task shared(array) untied if(end-begin >= (1<<15))
    
                 MsSequential(array, tmp, !inplace, begin, half);
    
                 MsSequential(array, tmp, !inplace, half, end);
                  }
            if (inplace){
    //             printf("Originalinputarray-tmp:");
    //             printarray(tmp+begin,(end-begin));
                int halfsecond=binarysearchfinduperrank(tmp,(end-half),tmp[halffirst],half)+half;
    //             printf("%d,%d\n",halffirst,halfsecond);
    //             printf("Halffirst:%d,Halfsecond:%d\n,",tmp[halffirst],tmp[halfsecond]);
                #pragma omp taskgroup
             {
               #pragma omp task shared(tmp) untied if(end-begin >= (1<<15))
    
                 MsMergeSequential(array, tmp, begin, halffirst, half, halfsecond, begin);
    
    
                MsMergeSequential(array, tmp, halffirst, half,halfsecond, end, begin+(halffirst-begin)+(halfsecond-half));
                  }
    
    
    
            }
            else {
                int halfsecond=binarysearchfinduperrank(array,(end-half),array[halffirst],half)+half;
    //             printf("%d,%d\n",halffirst,halfsecond);
    //             printf("Originalinputarray-arr:");
    //             printarray(array+begin,(end-begin));
    //             printf("Halffirst:%d,Halfsecond:%d\n,",array[halffirst],array[halfsecond]);
    
    
                 #pragma omp taskgroup
             {
               #pragma omp task shared(array) untied if(end-begin >= (1<<15))
    
                 MsMergeSequential(tmp, array, begin, halffirst, half,halfsecond, begin);
    
                MsMergeSequential(tmp, array, halffirst, half,halfsecond, end, begin+(halffirst-begin)+(halfsecond-half));
                  }
    
    
    
    
            }
    
        } else if (!inplace) {
    
            tmp[begin] = array[begin];
        }
    }
    
    void MsParallel(int *array, int *tmp, const size_t size) {
    
        MsSequential(array, tmp, true, 0, size);
    }
    
    
    
    
    bool isSorted(int ref[], int data[], const size_t size){
        std::sort(ref, ref + size);
        for (size_t idx = 0; idx < size; ++idx){
            if (ref[idx] != data[idx]) {
    //             printf("\nFalscher Index:%d\n",idx);
                return false;
            }
        }
        return true;
    }
    
    /**
    
    /**
      * @brief program entry point
      */
    int main(int argc, char* argv[]) {
        // variables to measure the elapsed time
        struct timeval t1, t2;
        double etime;
    
        // expect one command line arguments: array size
        if (argc != 2) {
    //         printf("Usage: MergeSort.exe <array size> \n");
    //         printf("\n");
            return EXIT_FAILURE;
        }
        else {
            const size_t stSize = strtol(argv[1], NULL, 10);
            int *data = (int*) malloc(stSize * sizeof(int));
            int *tmp = (int*) malloc(stSize * sizeof(int));
            int *ref = (int*) malloc(stSize * sizeof(int));
            printf("Initialization...\n");
    
            srand(95);
    
            #pragma omp parallel for num_threads(100) schedule(static)
            {
            for (size_t idx = 0; idx < stSize; ++idx){
                data[idx] = (int) (stSize * (double(rand()) / RAND_MAX));
            }
            }
            std::copy(data, data + stSize, ref);
            double dSize = (stSize * sizeof(int)) / 1024 / 1024;
            printf("Sorting %u elements of type int (%f MiB)...\n", stSize, dSize);
            gettimeofday(&t1, NULL);
            // Mergesort starts
            #pragma omp parallel num_threads(80)
            {
            #pragma omp single
            {
            MsParallel(data, tmp, stSize);
            }
            }
    
    
    
            gettimeofday(&t2, NULL);
            etime = (t2.tv_sec - t1.tv_sec) * 1000 + (t2.tv_usec - t1.tv_usec) / 1000;
            etime = etime / 1000;
            printf("done, took %f sec. Verification...", etime);
            if (isSorted(ref, data, stSize)) {
                printf(" successful.\n");
            }
            else {
                printf(" FAILED.\n");
            }
            free(data);
            //delete[] data;
            free(tmp);
            //delete[] tmp;
            free(ref);
            //delete[] ref;
        }
        return EXIT_SUCCESS;
    }
    

    【讨论】:

    • 在笔记本电脑 4 核 Intel Core i7-10510U 上使用 8 个线程,排序 10^8 个伪随机整数需要 1.9 秒。这没有二进制搜索。将 8 个部分排序后,然后使用 4 个线程将 8 个运行合并为 4 个运行,2 个线程将 4 个运行合并为 2 个运行,1 个线程将 2 个运行合并为 1 个运行。由于高速缓存和内存与 4 核上的 8 线程与普通 L3 缓存和主内存冲突,8 线程版本仅比 4 线程版本快约 23.5%。
    猜你喜欢
    • 2022-01-10
    • 1970-01-01
    • 2015-05-30
    • 2011-01-02
    • 2012-11-28
    • 1970-01-01
    • 1970-01-01
    • 2013-08-30
    • 1970-01-01
    相关资源
    最近更新 更多