【问题标题】:Radix Sort C code to sort uint64_t looking only at 32 MSB bits?基数排序 C 代码对 uint64_t 进行排序,仅查看 32 个 MSB 位?
【发布时间】:2018-04-15 06:36:48
【问题描述】:

我使用 Louis Ricci 提供的 uint64_t Radix 排序(2015 年 8 月 24 日 18:00 回答)Radix_Sort_Uint64。快得惊人。

我有一个包含 2 个 uint32_t 项的数据结构,并且想要对一个仅查看前 32 位或后 32 位的大数组(20+ 百万)进行排序,但我希望排序例程将整个 64 位包移动为一个单位。

是否有 C 语言 uint64 基数排序基于整个 64 位量子的子集进行排序,就好像数据被 0X1111111100000000 屏蔽一样?

【问题讨论】:

  • 使用类似的位移和掩码技术来调整您找到的答案中的代码应该相当容易,除了您只需要 4 次排序通道而不是 8 次(加上初始通道到 pre -计算计数和偏移量),并且只使用 4 个字节的值而不是 8 个。
  • 我查看了代码,并不清楚要子集哪些变量:o* t* 计数或大小。一个简单的解决方案是编写一个 qsort 函数将 uint64_t 指针转换为 uint32_t 并比较它们。但是,我的数据上的 qsort 被证明比基数排序慢得多。这似乎是一个相当通用的工具;仅查看前 32 位对 64 位块进行排序。
  • 我认为你需要 c1c4counts[256 * 4]o1o4t1t4。将 t4 设置为 t1 为您排序所依据的值的字节。例如。对于最高有效 32 位,t4 = (array[x] >> 32) & 0xff;t3 = (array[x] >> 40) & 0xff;t2 = (array[x] >> 48) & 0xff;t1 = (array[x] >> 56) & 0xff;
  • 速度差异的主要原因可能是因为您找到的排序代码使用内存空间换取额外的速度,使用与原始数据大小相同的临时缓冲区。这允许它用简单的赋值操作替换交换操作,在每次传递时在临时缓冲区和原始数组之间来回复制项目。
  • 在 Gcc 上,使用以下选项,我看到 qsort 在正双数据上比 Radix 花费约 80% 的时间。而且,我已经检查了我的数据的排序协议。
    gcc -Ofast -ffast-math -march=native -m64 -fopenmp -funroll-loops -flto // Time=1.543 sec = 24.289%, Event RADIX_SORT_64 , hits=3, 0.514 sec each
    // Time =2.746 秒 = 43.212%,事件 QSORT_DOUBLE_ARRAY ,点击数 = 3,每个 0.915 秒
    // 时间 = 1.522 秒 = 24.120%,事件 RADIX_SORT_64 ,点击数 = 3,每个 0.507 秒
    // 时间 = 2.743 秒 = 43.454%,事件 QSORT_DOUBLE_ARRAY,点击次数=3,每次 0.914 秒

标签: c performance sorting


【解决方案1】:

示例 C 代码。它使用的局部变量比原始帖子中链接的示例少,允许 C 编译器为这些变量使用寄存器。该程序在我的系统(Intel 3770K 3.5ghz cpu,Windows 7 Pro 64 位)上按高 32 位对 2000 万个(20*1024*1024 = 20971520)64 位无符号整数进行排序只需不到 0.5 秒。

/*  radix sort via upper 32 bits of 64 bit unsigned integers */

#include <stdio.h>
#include <stdlib.h>
#include <time.h>

typedef unsigned long long uint64_t;

void RadixSort(uint64_t * pData, uint64_t * pTemp, size_t count)
{
    size_t mIndex[4][256] = { 0 };      /* index matrix */
    size_t * pmIndex;                   /* ptr to row of matrix */
    size_t i, j, m, n;
    uint64_t u;

    for (i = 0; i < count; i++) {       /* generate histograms */
        u = pData[i];
        mIndex[3][(u >> 32) & 0xff]++;
        mIndex[2][(u >> 40) & 0xff]++;
        mIndex[1][(u >> 48) & 0xff]++;
        mIndex[0][(u >> 56) & 0xff]++;
    }

    for (j = 0; j < 4; j++) {           /* convert to indices */
        pmIndex = mIndex[j];
        n = 0;
        for (i = 0; i < 256; i++) {
            m = pmIndex[i];
            pmIndex[i] = n;
            n += m;
        }
    }

    for (i = 0; i < count; i++) {       /* radix sort */
        u = pData[i];
        pTemp[mIndex[3][(u >> 32) & 0xff]++] = u;
    }
    for (i = 0; i < count; i++) {
        u = pTemp[i];
        pData[mIndex[2][(u >> 40) & 0xff]++] = u;
    }
    for (i = 0; i < count; i++) {
        u = pData[i];
        pTemp[mIndex[1][(u >> 48) & 0xff]++] = u;
    }
    for (i = 0; i < count; i++) {
        u = pTemp[i];
        pData[mIndex[0][(u >> 56) & 0xff]++] = u;
    }
}

#define COUNT (20*1024*1024)            /* number of elements */

static clock_t dwTimeStart;             /* clock values */
static clock_t dwTimeStop;

int main( )
{
uint64_t * pData;
uint64_t * pTemp;
uint64_t r;
size_t i;

    /* allocate memory */
    pData  = (uint64_t *)malloc(COUNT*sizeof(uint64_t));
    if(pData == NULL){
        return 0;
    }
    pTemp  = (uint64_t *)malloc(COUNT*sizeof(uint64_t));
    if(pTemp == NULL){
        free(pData);
        return 0;
    }

    for(i = 0; i < COUNT; i++){         /* generate test data */
        r  = (((uint64_t)((rand()>>4) & 0xff))<< 0);
        r |= (((uint64_t)((rand()>>4) & 0xff))<< 8);
        r |= (((uint64_t)((rand()>>4) & 0xff))<<16);
        r |= (((uint64_t)((rand()>>4) & 0xff))<<24);
        r |= (((uint64_t)((rand()>>4) & 0xff))<<32);
        r |= (((uint64_t)((rand()>>4) & 0xff))<<40);
        r |= (((uint64_t)((rand()>>4) & 0xff))<<48);
        r |= (((uint64_t)((rand()>>4) & 0xff))<<56);
        pData[i] = r;
    }

    dwTimeStart = clock();
    RadixSort(pData, pTemp, COUNT);     /* sort array */
    dwTimeStop = clock();
    printf("Number of ticks %d\n", dwTimeStop-dwTimeStart);
    for(i = 1; i < COUNT; i++){         /* check sort */
        if((pData[i-1]>>32) > (pData[i]>>32)){
            break;
        }
    }
    if(i != COUNT)
        printf("sort error\n");
    free(pData);
    return(0);
}

【讨论】:

    【解决方案2】:
    // =============================================================================
    // Create 2 identical uint64[]s, mirror the first 32 bits in the last, sort one
    // with qsort and the other with RadixSort11110000() and compare the results
    int test_uint64_32_sort(EV_TIME_STR *tsa)  {
    uint64_t *la1, *la2;    
    int ii, lcount=3615232, zmem=0, debug=0, ucount=1, ri, er12=0, ep=63;
    time_t tt;  // This Time! 
    float rssec=2.0f;  // RadixSort_SEConds elapsed time from Radixsort()
        time_event(E_GENERATE_RANDOM_ARRAY, tsa, E_TIME_EVENT, 1, 0); 
        srand((unsigned) time(&tt));
        la1=(uint64_t *)alloc_check((lcount)*8, zmem=0, "LLong_ara_1", debug);
        la2=(uint64_t *)alloc_check((lcount)*8, zmem=0, "LLong_ara_2", debug);
    
        for(ii=0; ii < lcount; ii++)  {  // Look only SHORT range
            ri = rand();  // Random int
            la1[ii] = ri << 32 + (0XFFFFFFFF - ri);  // Reflect val in lower 32
        }
    
        // Make identical copies in the other []
        time_event(E_MEMCPY_DOUBLE_ARRAY, tsa, E_TIME_EVENT, 1, 0); 
        memcpy((void *) la2, (void *) la1, lcount * sizeof(uint64_t));
    
        time_event(E_QSORT_UINT64_ARRAY, tsa, E_TIME_EVENT, 1, 0); 
        qsort(la1, lcount, sizeof(uint64_t), comp_uint6411110000);
    
        time_event(E_RADIX_SORT64_11110000, tsa, E_TIME_EVENT, 1, 0); 
        radixSort11110000(la2, lcount, &rssec, &ucount);
    
        time_event(E_SPLIT_RGBJ_MEM,   tsa, E_TIME_EVENT, 1, 0); 
        for(ii=er12=0; ii < lcount; ii++)  { 
            if(la1[ii] != la2[ii])  {  
                er12++;  
                if((--ep) > 0)  {
                    printf("II %d) Er%d, l1=0X%016llX, l2=0X%016llX\n", 
                        ii, ep, la1[ii], la2[ii]);  FF_SO;  }
            }  // Count Error Mismatches
            if(!(ii%100000))  {  
                printf("II %d) l1=0X%016llX, l2=0X%016llX\n", 
                    ii, la1[ii], la2[ii]);  FF_SO;  }
        }
        printf("T63S: Er1/2 = %d \n", er12);  FF_SO;
        if(ucount)  {
            printf("T63S: RadixSort time = %.3f ms, unique=%d = %.3f%%\n",
                rssec*1.0E3f, ucount, (100.0f * ucount / lcount));  FF_SO;
        }
        free_bb(la1);  free_bb(la2); 
    }
    
    
    
    // =============================================================================
    // Based on original code from https://ideone.com/JHI0d9
    // and suggestions from  Ian Abbott
    // https://stackoverflow.com/questions/47080353/radix-sort-c-code-to-sort-uint64-t-looking-only-at-32-msb-bits
    // Hacked code may be unsuitable for any use and should not be used by anyone
    // Sort uint64[] by looking ONLY at the 
    uint64_t *radixSort11110000(uint64_t *arrayA, uint32_t size)  {
    register uint64_t *array=arrayA;  // Slam arg into Register!
    register uint64_t std;  // STanDard to compare others for uniqueness
    register int dist=0;  // Distinct, unique values found if *UCount arg is TRUE
    register int ii;  // Loop control
    int64_t rtime, mtns;  // Time in NanoSeconds!!!
        rscounts4_t counts;
        memset(&counts, 0, 256 * 4 * sizeof(uint32_t));
        uint64_t * cpy = (uint64_t *)malloc(size * sizeof(uint64_t));
        uint32_t o4=0, o3=0, o2=0, o1=0;
        uint32_t t4, t3, t2, t1;
        register uint32_t x;
        // calculate counts
        for(x = 0; x < size; x++) {
            t4 = (array[x] >> 32) & 0xff;
            t3 = (array[x] >> 40) & 0xff;
            t2 = (array[x] >> 48) & 0xff;
            t1 = (array[x] >> 56) & 0xff;
            counts.c4[t4]++;
            counts.c3[t3]++;
            counts.c2[t2]++;
            counts.c1[t1]++;
        }
        // convert counts to offsets
        for(x = 0; x < 256; x++) {
            t4 = o4 + counts.c4[x];
            t3 = o3 + counts.c3[x];
            t2 = o2 + counts.c2[x];
            t1 = o1 + counts.c1[x];
            counts.c4[x] = o4;
            counts.c3[x] = o3;
            counts.c2[x] = o2;
            counts.c1[x] = o1;
            o4 = t4; 
            o3 = t3; 
            o2 = t2; 
            o1 = t1;
        }
        // radix
        for(x = 0; x < size; x++) {
            t4 = (array[x] >> 32) & 0xff;
            cpy[counts.c4[t4]] = array[x];
            counts.c4[t4]++;  }
        for(x = 0; x < size; x++) {
            t3 = (cpy[x] >> 40) & 0xff;
            array[counts.c3[t3]] = cpy[x];
            counts.c3[t3]++;  }
        for(x = 0; x < size; x++) {
            t2 = (array[x] >> 48) & 0xff;
            cpy[counts.c2[t2]] = array[x];
            counts.c2[t2]++;  }
        for(x = 0; x < size; x++) {
            t1 = (cpy[x] >> 56) & 0xff;
            array[counts.c1[t1]] = cpy[x];
            counts.c1[t1]++;  }
        free(cpy);
        return array;
    }  // End radixSort_11110000().
    
    
    // #############################################################################
    // From: http://ideone.com/JHI0d9
    // RadixSort---
    typedef union {
        struct {
            uint32_t c4[256];
            uint32_t c3[256];
            uint32_t c2[256];
            uint32_t c1[256];
        };
        uint32_t counts[256 * 4];
    }  rscounts4_t;
    
    
    // =============================================================================
    // Compare only the MSB 4 bytes of a uint64 by masking each with 
    // 0XFFFFFFFF00000000 before comparison
    int comp_uint6411110000(const void *a, const void *b)  {
        return (
          ( 
            ( ( *( (uint64_t *)a ) ) & 0XFFFFFFFF00000000ULL ) > 
            ( ( *( (uint64_t *)b ) ) & 0XFFFFFFFF00000000ULL ) 
          )
         - 
          (  
            ( ( *( (uint64_t *)a ) ) & 0XFFFFFFFF00000000ULL ) < 
            ( ( *( (uint64_t *)b ) ) & 0XFFFFFFFF00000000ULL )
          )
        );
    }  // End Comp_Uint64_11110000().
    
    
    
    // Both sorted arrays were identical. 
    // T63S: Er1/2 = 0
    // TE: Top 90% events in desc order (7/312):
    //  Time=  0.157 sec = 71.282%, QSORT_UINT64_ARRAY   , hits=1
    //  Time=  0.029 sec = 13.119%, RADIX_SORT64_11110000, hits=1
    //  Time=  0.026 sec = 11.872%, GENERATE_RANDOM_ARRAY, hits=1
    
    // mult 71.282  /13.119  -> 5.433493
    // The RadixSort is over 5x faster that the qsort
    
    Perhaps the QSort comparitor could be handled more efficiently
    

    【讨论】:

      【解决方案3】:

      来自 RGCLGR ["RADIX_SORT_UINT64_32R()"] 的寄存器优化代码明显快于仅对 32 个最高有效位进行排序的原始代码:

        Time= 1.851 sec = 15.648%, RADIX_SORT_FFFF0000  , hits=4, 0.463 sec each  << OLD
        Time= 1.552 sec = 13.120%, RADIX_SORT_UINT64_32R, hits=4, 0.388 sec each  << NEW
      

      对于一个包含 3620 万个值的数组,平均运行 4 次运行平均需要 83.8% 的时间,其中 100% 是不同的。 每次运行后,每次排序的结果都会移位&gt;&gt; 32,进行比较并且是相同的。

      在所有 64 位上与 QSort 和以前版本的 Radix_Sort() 相比:

      Time=  6.125 sec = 62.359%, QSORT_UINT64_ARRAY   , hits=1
      Time=  0.832 sec =  8.468%, RADIX_SORT_64        , hits=1  << OLD
      Time=  0.770 sec =  7.842%, RADIX_SORT_UINT64_REG, hits=1  << NEW
      

      QSort 几乎是这个新版本的 8 倍。 原来的 'RADIX_SORT_64()' 函数慢了 8%。

      将代码从 RGCLGR 推广到 48 位和 64 位,并比较使用 RGCLGR 方法对 64、48 和 32 位上的 uint64_t[32M] 进行排序:

      Time=  3.130 sec = 20.342%, RADIX_SORT_UINT64_REG, hits=4, 0.782 sec each
      Time=  2.336 sec = 15.180%, RADIX_SORT_UINT64_48R, hits=4, 0.584 sec each
      Time=  1.540 sec = 10.007%, RADIX_SORT_UINT64_32R, hits=4, 0.385 sec each
      

      请注意,考虑排序的 64、48 和 32 位的 20%、15% 和 10% 编程时间之间的预期线性度。

      【讨论】:

        猜你喜欢
        • 2019-01-07
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多