【问题标题】:C - Fastest way to sort a large 2D integer arrayC - 对大型二维整数数组进行排序的最快方法
【发布时间】:2017-09-14 18:06:48
【问题描述】:

我有一个二维数组,其中每行包含 6 个整数,这些整数已经按升序排序。示例:

1  2  3  4  5  6
6  8  9 10 13 15
1  4  5  6  7  9
1  4  5  6  7  8
3 18 19 20 25 34

预期输出:

1  2  3  4  5  6
1  4  5  6  7  8
1  4  5  6  7  9
3 18 19 20 25 34
6  8  9 10 13 15

实际数据包含 8m 到 33m 条这样的记录。我正在尝试确定对该数组进行排序的最快方法。我目前有一些使用 qsort 的工作代码:

qsort 调用:

qsort(allRecords, lineCount, sizeof(int*), cmpfunc);

cmpfunc:

int cmpfunc (const void * a, const void * b)
{
  const int *rowA = *(const int **)a;
  const int *rowB = *(const int **)b;

  if (rowA[0] > rowB[0]) return 1;
  if (rowA[0] < rowB[0]) return -1;

  if (rowA[1] > rowB[1]) return 1;
  if (rowA[1] < rowB[1]) return -1;

  if (rowA[2] > rowB[2]) return 1;
  if (rowA[2] < rowB[2]) return -1;

  if (rowA[3] > rowB[3]) return 1;
  if (rowA[3] < rowB[3]) return -1;

  if (rowA[4] > rowB[4]) return 1;
  if (rowA[4] < rowB[4]) return -1;

  if (rowA[5] > rowB[5]) return 1;
  if (rowA[5] < rowB[5]) return -1;

  return 0;
}

对于 3300 万条记录的样本,大约需要 35.6 秒 (gcc -O1),这非常快,但我想知道是否有更快的方法来处理每行中预先排序的值。

这自然适用于最常见的第一个数字是 1 的数据,因此在 33m 记录文件中,可能有 12m 记录以 1 开头,然后 8m 记录以 2 开头,5m 记录以 3 开头,等等...我不确定这是否适合一种特定类型的排序而不是另一种(例如堆排序)。

我的理解是 qsort 有相当多的开销,因为它必须调用函数的所有时间,所以我希望有更快的性能。

我通常不编写 C 代码,因此我非常愿意接受建议和批评,因为我是从教程和其他 StackOverflow 问题/答案中拼凑起来的。

编辑: 根据要求,我的初始化代码:

// Empty record
int recArray[6] = {0,0,0,0,0,0};

// Initialize allRecords
int** allRecords;
allRecords = (int**) malloc(lineCount*sizeof(int*));
for(i=0; i < lineCount; i++)
{
    allRecords[i] = (int*) malloc(6*sizeof(int));
}

// Zero-out all records
for(i=0; i < lineCount; i++)
{
  memcpy(allRecords[i], recArray, 6 * sizeof(int));
}

我仍在学习做事的正确方法,所以如果我做错了,我不会感到惊讶。我们将不胜感激。

其他人询问了值的范围 - 我不确定该范围将来是否会改变,但目前的值在 1 到 99 之间。

另外,对于分析 - 我构建了一个小函数,它使用 gettimeofday() 来提取秒/微秒,然后比较前后。我愿意接受更好的方法。输出如下:

// <-- Here I capture the gettimeofday() structure output
Sorting...
Sorted.
Time Taken: 35.628882s // <-- Capture it again, show the difference

编辑: 根据@doynax - 我现在将每行的 6 个值“打包”到一个 unsigned long long int 中:

// Initialize allRecords
unsigned long long int* allRecords;
allRecords = (unsigned long long int*) malloc(lineCount*sizeof(unsigned long long int));
for(i=0; i < lineCount; i++)
{
    allRecords[i] = 0;
}

...

// "Pack" current value (n0) into an unsigned long long int
if(recPos == 0) { lineSum += n0 * UINT64_C(1); }
else if(recPos == 1) { lineSum += n0 * UINT64_C(100); }
else if(recPos == 2) { lineSum += n0 * UINT64_C(10000); }
else if(recPos == 3) { lineSum += n0 * UINT64_C(1000000); }
else if(recPos == 4) { lineSum += n0 * UINT64_C(100000000); }
else if(recPos == 5) { lineSum += n0 * UINT64_C(10000000000); }
...
allRecords[linecount] = lineSum;
lineSum = 0;

我也可以稍后将其中一个 unsigned long long int 值成功地“解包”回原来的 6 个整数。

但是,当我尝试排序时:

qsort(allRecords, lineCount, sizeof(unsigned long long int), cmpfunc);

...

int cmpfunc (const void * a, const void * b)
{
  if (*(unsigned long long int*)a > *(unsigned long long int*)b) return 1;
  if (*(unsigned long long int*)a < *(unsigned long long int*)b) return -1;
  return 0;
}

...结果未按预期排序。如果我在排序前后显示第一行和最后一行:

printf("[%i] = %llu = %i,%i,%i,%i,%i,%i\n", j, lineSum, recArray[0]...recArray[5]);

输出是:

First and last 5 rows before sorting:
[#] = PACKED INT64 = UNPACKED
[0] = 462220191706 = 6,17,19,20,22,46
[1] = 494140341005 = 5,10,34,40,41,49
[2] = 575337201905 = 5,19,20,37,53,57
[3] = 504236262316 = 16,23,26,36,42,50
[4] = 534730201912 = 12,19,20,30,47,53
[46] = 595648302516 = 16,25,30,48,56,59
[47] = 453635251108 = 8,11,25,35,36,45
[48] = 403221161202 = 2,12,16,21,32,40
[49] = 443736310604 = 4,6,31,36,37,44
[50] = 575248312821 = 21,28,31,48,52,57

First and last 5 rows after sorting:
[0] = 403221161202 = 2,12,16,21,32,40
[1] = 413218141002 = 2,10,14,18,32,41
[2] = 443736310604 = 4,6,31,36,37,44
[3] = 444127211604 = 4,16,21,27,41,44
[4] = 453028070302 = 2,3,7,28,30,45
[46] = 585043260907 = 7,9,26,43,50,58
[47] = 593524170902 = 2,9,17,24,35,59
[48] = 595248392711 = 11,27,39,48,52,59
[49] = 595251272612 = 12,26,27,51,52,59
[50] = 595648302516 = 16,25,30,48,56,59

我猜我以某种方式比较了错误的值(例如指针值而不是实际值),但我不太确定正确的语法是什么。

从好的方面来说,这种方式的速度非常快。 :)

对 33m 64 位整数进行排序大约需要 4-5 秒(至少在当前的错误形式中)。

【问题讨论】:

  • 您应该提供数组的声明,因为您提供的qsort() 调用和比较函数对于真正二维数组的排序是完全错误的。不过,您可以使用它们对指针数组进行排序。目前尚不清楚您实际拥有哪个(如果有的话)。 (虽然如果它真的有效,那么我猜你的一定是一个指针数组。)
  • 你有更多关于值范围的信息吗?例如。对只有值 1..10 的数组进行排序可能与所有值都在整数范围内一致时非常不同。
  • @jhilgeman:不,您从原始数组中预先计算了这些新的宽整数键,并且只对新消息进行排序以代替旧消息。最后,您可以通过连续除以 100 重新创建数据(其中余数产生旧列值)。这个想法是单个 64 位整数比多个更小的整数更快、更容易比较。
  • 您以相反的顺序打包。对于每个数字 10 位,我会使用类似: (long long)a[5]|((long long)a[4]
  • 正如@AntonínLejsek 指出的那样,您的数据排列得很漂亮,但在最后一个数字上,而不是第一个数字上。您需要重新订购包装。您应该能够使用循环而不是 N 路 if 语句来驱动打包。当然,您将对解包进行补充更改。

标签: c arrays performance sorting


【解决方案1】:

我无法抗拒一个好的优化挑战。

初步想法

我看到问题的第一个想法是使用radix sort,特别是带有内部计数排序的基数排序。 (很高兴看到一些 cmets 也同意我的看法!)我在其他项目中使用了带有内部计数排序的基数排序,虽然快速排序在小型数据集上的表现优于它,但对于大型数据集而言,它让快速排序望尘莫及数据集。

我选择了基数排序,因为像快速排序这样的比较排序有一个众所周知的 O(n lg n) 的性能限制。因为这里的 n 相当大,所以“lg n”部分也相当大——对于 3300 万条记录,您可以合理地预计它将花费“一些恒定的时间”倍lg(33000000) 的 33000000 倍,lg(33000000) 约为 25。基数排序和计数排序等排序是“非比较”排序,因此它们可以在 O(n lg n) 边界通过对数据有特殊的了解——在这种情况下,它们可以走得更快,因为我们知道数据是小整数。

我使用与其他人相同的 mash-the-values-together 代码,将每组 int 值组合成一个 BigInt,但我使用的是位移和掩码而不是乘法和模数,因为位操作总体上会更快。

我的实现(见下文)执行一些恒定时间设置计算,然后它准确地迭代数据 7 次以对其进行排序——“7 次”不仅在算法上击败了“25 次”,而且我的实现时要小心避免不必要地频繁接触内存,以便它与 CPU 缓存一样发挥作用。

为了合理的源数据,我编写了一个小程序,它生成了 3300 万条记录,这些记录看起来有点像您的原始数据。每行都按排序顺序排列,并略微偏向较小的值。 (您可以在下面找到我的sixsample.c 示例数据集程序。)

性能比较

使用与您的 qsort() 实现非常接近的东西,我在 3 次运行中得到了这些数字,它们与您得到的数字相差不远(显然是不同的 CPU):

qsort:

Got 33000000 lines. Sorting.
Sorted lines in 5 seconds and 288636 microseconds.
Sorted lines in 5 seconds and 415553 microseconds.
Sorted lines in 5 seconds and 242454 microseconds.

使用我的 radix-sort-with-counting-sort 实现,我在 3 次运行中得到了这些数字:

Radix sort with internal counting sort, and cache-optimized:

Got 33000000 lines. Sorting.
Sorted lines in 0 seconds and 749285 microseconds.
Sorted lines in 0 seconds and 833474 microseconds.
Sorted lines in 0 seconds and 761943 microseconds.

5.318 秒与 0.781 秒对相同的 3300 万条记录进行排序。速度提高了 6.8 倍!

为了确定,我diffed 每次运行的输出以保证结果是相同的——你真的可以在一秒钟内对数千万条记录进行排序!

深入探索

radix sortcounting sort 的基本原理在维基百科和我最喜欢的一本书Introduction to Algorithms 中都有很好的描述,所以我不会在这里重复这些基本原理。相反,我将描述一下我的实现与教科书形式的不同之处在于运行得更快。

这些算法的正常实现是这样的(在 Python 式的伪代码中):

def radix-sort(records):
    temp = []
    for column = 0 to columns:
         counting-sort(records, temp, column)
         copy 'temp' back into 'records'

def counting-sort(records, dest, column):

    # Count up how many of each value there is at this column.
    counts = []
    foreach record in records:
        counts[record[column]] += 1

    transform 'counts' into offsets

    copy from 'records' to 'dest' using the calculated offsets

这当然有效,但也有一些丑陋之处。 “计算值”步骤涉及计算整个数据集中每一列的值。 “将临时复制回记录”步骤涉及复制每一列的整个数据集。这些是“按排序顺序从记录复制到目的地”所需步骤的补充。我们对每一列的数据旋转 3 次;最好少这样做!

我的实现将这些混合在一起:不是单独计算每列的计数,而是将它们一起计算为数据的第一次传递。然后将计数批量转换为偏移量;最后,数据被整理到位。此外,我不是在每个阶段从temp 复制到records,而是简单地翻页:数据每隔一次迭代就会交替“来自”哪里。我的伪代码更像这样:

def radix-sort(records):

    # Count up how many of each value there is in every column,
    # reading each record only once.
    counts = [,]
    foreach record in records:
        foreach column in columns:
            counts[column, record[column]] += 1

    transform 'counts' for each column into offsets for each column

    temp = []
    for column = 0 to columns step 2:
         sort-by-counts(records, temp, column)
         sort-by-counts(temp, records, column+1)

def sort-by-counts(records, dest, counts, column):
    copy from 'records' to 'dest' using the calculated offsets

这避免了不必要的数据旋转:一次设置,六次排序,全部按顺序进行。你不能让 CPU 缓存更快乐。

C 代码

这是我对sixsort.c 的完整实现。它还包括您的 qsort 解决方案,已注释掉,因此您可以更轻松地比较两者。它包含大量文档,以及所有 I/O 和指标代码,因此有点长,但它的完整性应该可以帮助您更好地理解解决方案:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <malloc.h>
#include <sys/time.h>

/* Configuration. */
#define NUM_COLUMNS 6       /* How many columns to read/sort */
#define MAX_VALUE 100       /* Maximum value allowed in each column, plus one */
#define BITSHIFT 7      /* How far to shift bits when combining values */

/* Note: BITSHIFT should be as small as possible, but MAX_VALUE must be <= 2**BITSHIFT. */

/* Highest value representable under the given BITSHIFT. */
#define BITMASK ((1 << BITSHIFT) - 1)

/* The type we're going to pack the integers into.  The highest bits will get the
   leftmost number, while the lowest bits will get the rightmost number. */
typedef unsigned long long BigInt;

/*-------------------------------------------------------------------------------------------------
** Radix Sorting.
*/

/* Move a set of items from src to dest, using the provided counts to decide where each
   item belongs.  src and dest must not be the same arrays. */
static void SortByCounts(BigInt *src, BigInt *dest, size_t *counts, size_t totalCount, int bitshift)
{
    BigInt *temp, *end;

    for (temp = src, end = src + totalCount; temp < end; temp++) {
        BigInt value = *temp;
        int number = (int)((value >> bitshift) & BITMASK);
        int offset = counts[number]++;
        dest[offset] = value;
    }
}

/* Use a radix sort with an internal counting sort to sort the given array of values.
   This iterates over the source data exactly 7 times, which for large
   'count' (i.e., count > ~2000) is typically much more efficient than O(n lg n)
   algorithms like quicksort or mergesort or heapsort.  (That said, the recursive O(n lg n)
   algorithms typically have better memory locality, so which solution wins overall
   may vary depending on your CPU and memory system.) */
static void RadixSortWithInternalCountingSort(BigInt *values, size_t count)
{
    size_t i, j;
    BigInt *temp, *end;
    size_t counts[NUM_COLUMNS][MAX_VALUE];
    size_t oldCount;

    /* Reset the counts of each value in each column to zero, quickly.
       This takes MAX_VALUE * NUM_COLUMNS time (i.e., constant time). */
    memset(counts, 0, sizeof(counts));

    /* Count up how many there are of each value in this column.  This iterates over
       the whole dataset exactly once, processing each set of values in full, so it
       takes COUNT * NUM_COLUMNS operations, or theta(n). */
    for (temp = values, end = values + count; temp < end; temp++) {
        BigInt value = *temp;
        for (i = 0; i < NUM_COLUMNS; i++) {
            counts[i][(int)((value >> (i * BITSHIFT)) & BITMASK)]++;
        }
    }

    /* Transform the counts into offsets.  This only transforms the counts array,
       so it takes MAX_VALUE * NUM_COLUMNS operations (i.e., constant time). */
    size_t totals[NUM_COLUMNS];
    for (i = 0; i < NUM_COLUMNS; i++) {
        totals[i] = 0;
    }
    for (i = 0; i < MAX_VALUE; i++) {
        for (j = 0; j < NUM_COLUMNS; j++) {
            oldCount = counts[j][i];
            counts[j][i] = totals[j];
            totals[j] += oldCount;
        }
    }

    temp = malloc(sizeof(BigInt) * count);

    /* Now perform the actual sorting, using the counts to tell us how to move
       the items.  Each call below iterates over the whole dataset exactly once,
       so this takes COUNT * NUM_COLUMNS operations, or theta(n). */
    for (i = 0; i < NUM_COLUMNS; i += 2) {
        SortByCounts(values, temp, counts[i  ], count,  i    * BITSHIFT);
        SortByCounts(temp, values, counts[i+1], count, (i+1) * BITSHIFT);
    }

    free(temp);
}

/*-------------------------------------------------------------------------------------------------
** Built-in Quicksorting.
*/

static int BigIntCompare(const void *a, const void *b)
{
    BigInt av = *(BigInt *)a;
    BigInt bv = *(BigInt *)b;

    if (av < bv) return -1;
    if (av > bv) return +1;
    return 0;
}

static void BuiltInQuicksort(BigInt *values, size_t count)
{
    qsort(values, count, sizeof(BigInt), BigIntCompare);
}

/*-------------------------------------------------------------------------------------------------
** File reading.
*/

/* Read a single integer from the given string, skipping initial whitespace, and
   store it in 'returnValue'.  Returns a pointer to the end of the integer text, or
   NULL if no value can be read. */
static char *ReadInt(char *src, int *returnValue)
{
    char ch;
    int value;

    /* Skip whitespace. */
    while ((ch = *src) <= 32 && ch != '\r' && ch != '\n') src++;

    /* Do we have a valid digit? */
    if ((ch = *src++) < '0' || ch > '9')
        return NULL;

    /* Collect digits into a number. */
    value = 0;
    do {
        value *= 10;
        value += ch - '0';
    } while ((ch = *src++) >= '0' && ch <= '9');
    src--;

    /* Return what we did. */
    *returnValue = value;
    return src;
}

/* Read a single line of values from the input into 'line', and return the number of
   values that were read on this line. */
static int ReadLine(FILE *fp, BigInt *line)
{
    int numValues;
    char buffer[1024];
    char *src;
    int value;
    BigInt result = 0;

    if (fgets(buffer, 1024, fp) == NULL) return 0;
    buffer[1023] = '\0';

    numValues = 0;
    src = buffer;
    while ((src = ReadInt(src, &value)) != NULL) {
        result |= ((BigInt)value << ((NUM_COLUMNS - ++numValues) * BITSHIFT));
    }

    *line = result;
    return numValues;
}

/* Read from file 'fp', which should consist of a sequence of lines with
   six decimal integers on each, and write the parsed, packed integer values
   to a newly-allocated 'values' array.  Returns the number of lines read. */
static size_t ReadInputFile(FILE *fp, BigInt **values)
{
    BigInt line;
    BigInt *dest;
    size_t count, max;

    count = 0;
    dest = malloc(sizeof(BigInt) * (max = 256));

    while (ReadLine(fp, &line)) {
        if (count >= max) {
            size_t newmax = max * 2;
            BigInt *temp = malloc(sizeof(BigInt) * newmax);
            memcpy(temp, dest, sizeof(BigInt) * count);
            free(dest);
            max = newmax;
            dest = temp;
        }
        dest[count++] = line;
    }

    *values = dest;
    return count;
}

/*-------------------------------------------------------------------------------------------------
** File writing.
*/

/* Given a number from 0 to 999 (inclusive), write its string representation to 'dest'
   as fast as possible.  Returns 'dest' incremented by the number of digits written. */
static char *WriteNumber(char *dest, unsigned int number)
{
    if (number >= 100) {
        dest += 3;
        dest[-1] = '0' + number % 10, number /= 10;
        dest[-2] = '0' + number % 10, number /= 10;
        dest[-3] = '0' + number % 10;
    }
    else if (number >= 10) {
        dest += 2;
        dest[-1] = '0' + number % 10, number /= 10;
        dest[-2] = '0' + number % 10;
    }
    else {
        dest += 1;
        dest[-1] = '0' + number;
    }

    return dest;
}

/* Write a single "value" (one line of content) to the output file. */
static void WriteOutputLine(FILE *fp, BigInt value)
{
    char buffer[1024];
    char *dest = buffer;
    int i;

    for (i = 0; i < NUM_COLUMNS; i++) {
        if (i > 0)
            *dest++ = ' ';
        int number = (value >> (BITSHIFT * (NUM_COLUMNS - i - 1))) & BITMASK;
        dest = WriteNumber(dest, (unsigned int)number);
    }
    *dest++ = '\n';
    *dest = '\0';

    fwrite(buffer, 1, dest - buffer, fp);
}

/* Write the entire output file as a sequence of values in columns. */
static void WriteOutputFile(FILE *fp, BigInt *values, size_t count)
{
    while (count-- > 0) {
        WriteOutputLine(fp, *values++);
    }
}

/*-------------------------------------------------------------------------------------------------
** Timeval support.
*/

int timeval_subtract(struct timeval *result, struct timeval *x, struct timeval *y)  
{  
    /* Perform the carry for the later subtraction by updating y. */  
    if (x->tv_usec < y->tv_usec) {  
        int nsec = (y->tv_usec - x->tv_usec) / 1000000 + 1;  
        y->tv_usec -= 1000000 * nsec;  
        y->tv_sec += nsec;  
    }  
    if (x->tv_usec - y->tv_usec > 1000000) {  
        int nsec = (y->tv_usec - x->tv_usec) / 1000000;  
        y->tv_usec += 1000000 * nsec;  
        y->tv_sec -= nsec;  
    }  

    /* Compute the time remaining to wait. tv_usec is certainly positive. */  
    result->tv_sec = x->tv_sec - y->tv_sec;  
    result->tv_usec = x->tv_usec - y->tv_usec;  

    /* Return 1 if result is negative. */  
    return x->tv_sec < y->tv_sec;  
}

/*-------------------------------------------------------------------------------------------------
** Main.
*/

int main(int argc, char **argv)
{
    BigInt *values;
    size_t count;
    FILE *fp;
    struct timeval startTime, endTime, deltaTime;

    if (argc != 3) {
        fprintf(stderr, "Usage: sixsort input.txt output.txt\n");
        return -1;
    }

    printf("Reading %s...\n", argv[1]);

    if ((fp = fopen(argv[1], "r")) == NULL) {
        fprintf(stderr, "Unable to open \"%s\" for reading.\n", argv[1]);
        return -1;
    }
    count = ReadInputFile(fp, &values);
    fclose(fp);

    printf("Got %d lines. Sorting.\n", (int)count);

    gettimeofday(&startTime, NULL);
    RadixSortWithInternalCountingSort(values, count);
    /*BuiltInQuicksort(values, count);*/
    gettimeofday(&endTime, NULL);
    timeval_subtract(&deltaTime, &endTime, &startTime);

    printf("Sorted lines in %d seconds and %d microseconds.\n",
        (int)deltaTime.tv_sec, (int)deltaTime.tv_usec);

    printf("Writing %d lines to %s.\n", (int)count, argv[2]);

    if ((fp = fopen(argv[2], "w")) == NULL) {
        fprintf(stderr, "Unable to open \"%s\" for writing.\n", argv[2]);
        return -1;
    }
    WriteOutputFile(fp, values, count);
    fclose(fp);

    free(values);
    return 0;
}

另外,作为参考,这是我的蹩脚的小样本数据集生产者sixsample.c,使用可预测的 LCG 随机数生成器,因此它总是生成相同的样本数据:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <malloc.h>
#include <math.h>

#define NUM_COLUMNS 6
#define MAX_VALUE 35 

unsigned int RandSeed = 0;

/* Generate a random number from 0 to 65535, inclusive, using a simple (fast) LCG.
   The numbers below are the same as used in the ISO/IEC 9899 proposal for C99/C11. */
static int Rand()
{
    RandSeed = RandSeed * 1103515245 + 12345;
    return (int)(RandSeed >> 16);
}

static int CompareInts(const void *a, const void *b)
{
    return *(int *)a - *(int *)b;
}

static void PrintSortedRandomLine()
{
    int values[NUM_COLUMNS];
    int i;

    for (i = 0; i < NUM_COLUMNS; i++) {
        values[i] = 1 + (int)(pow((double)Rand() / 65536.0, 2.0) * MAX_VALUE);
    }
    qsort(values, NUM_COLUMNS, sizeof(int), CompareInts);

    /* Lame. */
    printf("%d %d %d %d %d %d\n",
        values[0], values[1], values[2], values[3], values[4], values[5]);
}

int main(int argc, char **argv)
{
    unsigned int numLines;
    unsigned int i;

    if (argc < 2 || argc > 3) {
        fprintf(stderr, "Usage: sixsample numLines [randSeed]\n");
        return -1;
    }

    numLines = atoi(argv[1]);
    if (argc > 2)
        RandSeed = atoi(argv[2]);

    for (i = 0; i < numLines; i++) {
        PrintSortedRandomLine();
    }

    return 0;
}

最后,这是一个 Makefile,它可以像我在 Ubuntu 16 上使用 gcc 5.4.0 一样构建它们:

all: sixsort sixsample

sixsort: sixsort.c
    gcc -O6 -Wall -o sixsort sixsort.c

sixsample: sixsample.c
    gcc -O -Wall -o sixsample sixsample.c

clean:
    rm -f sixsort sixsample

进一步可能的优化

我考虑通过在纯汇编中编写它们来进一步优化其中的某些部分,但是 gcc 在内联函数、展开循环以及通常使用我本来会考虑自己做的大多数优化方面做得非常出色在组装中,因此很难以这种方式改进它。核心操作不太适合 SIMD 指令,因此 gcc 的版本可能会尽可能快。

如果我将它编译为 64 位进程,这可能会运行得更快,因为 CPU 可以将每个完整的记录保存在一个寄存器中,但我的测试 Ubuntu VM 是 32 位的,所以 c'est la竞争。

鉴于这些值本身都相当小,您可以通过组合代码点来进一步改进我的解决方案:因此,您可以除执行六次计数排序,而是对每一个七位列进行一个子排序,而不是执行以其他方式增加相同的 42 位:您可以对每个 14 位列执行三个子排序。这将需要在内存中存储 3*2^14 (=3*16384=49152) 计数和偏移量,而不是 6*2^7 (=6*128=768),但由于内存相当便宜,而且数据仍然会适合 CPU 缓存,可能值得一试。而且由于原始数据是两位十进制整数,您甚至可以将其从中间切开,并对两个 6 位十进制数字列中的每一个值执行一对子排序(需要 2*10^6=2000000 个计数和偏移量,这仍然可能适合您的 CPU 缓存,具体取决于您的 CPU)。

你也可以并行化它!我的实现很难点亮一个 CPU,但其余部分闲置。因此,另一种提高速度的技术可能是将源数据分成块,每个 CPU 内核一个,独立地对每个块进行排序,然后在最后执行类似合并排序的“合并”操作以将结果组合在一起。如果您发现自己经常运行此计算,则可能需要研究并行化工作。

结论和其他想法

虽然我自己没有测试过,但比较手写快速排序的性能可能是公平的,因为qsort() 在比较值时包含函数调用开销。我怀疑手写的解决方案会超过 qsort() 的 5.3 秒,但它可能不会与基数排序竞争,这都是由于快速排序的非线性增长及其不太有利的 CPU 缓存特性(它会读取每个值平均超过 7 次,因此从 RAM 中提取数据可能会花费更多)。

此外,排序的成本现在与将文本文件读入内存并将结果再次写回的 I/O 成本相形见绌。即使使用我的自定义解析代码,读取 3300 万条记录仍然需要几秒钟,对它们进行排序只需几分之一秒,然后再将它们写回几秒钟。如果您的数据已经在 RAM 中,这可能无关紧要,但如果它在某个磁盘上,您也需要开始寻找优化方法

但由于问题的目标是优化排序本身,我认为仅在四分之三秒内对 3300 万条记录进行排序并不算太糟糕。

【讨论】:

  • 这肯定值得'A'来表现努力和好奇心:)
  • @David,我只希望一年半前@jhilgeman 第一次问这个问题时我偶然发现了这个问题;据我所知,他已经很久不需要答案了:)
  • 对于那些毫无疑问会在未来找到该帖子的人来说仍然是一个非常有用的补充。 Introduction to Algorithms - 3rd Edition 可在各个地方在线获取。我也很喜欢那个。
  • 自从第一版是我 90 年代中期的大学教科书以来,我的办公桌上就放着算法简介。无论是平装本、精装本、PDF、Kindle、holo-tome、direct mental booklink,还是他们接下来推出的任何花哨的新格式,我都无法高度推荐它。
  • 您可以轻松地将计数部分并行化,这样可以在不合并的情况下加快速度。
【解决方案2】:

Jonathan Leffler 关于重新排序包装的评论是正确的,我在查看您的代码时也有同样的想法。以下是我的方法:

#include <stdlib.h>
#include <stdio.h>
#include <time.h>
#include <string.h> // for memcpy

#define ROW_LENGTH      6
#define ROW_COUNT       15

// 1, 2, 3, 4, 5, 6, 6, 8, 9, 10, 13, 15, 1, 4, 5, 6, 7, 9, 1, 4, 5, 6, 7, 8, 3, 18, 19, 20, 25, 34

/*
    1  2  3  4  5  6
    6  8  9 10 13 15
    1  4  5  6  7  9
    1  4  5  6  7  8
    3 18 19 20 25 34
*/

// Insertion sorting taken from https://stackoverflow.com/a/2789530/2694511  with modification

static __inline__ int sortUlliArray(unsigned long long int *d, int length){
        int i, j;
        for (i = 1; i < length; i++) {
                unsigned long long int tmp = d[i];
                for (j = i; j >= 1 && tmp < d[j-1]; j--)
                        d[j] = d[j-1];
                d[j] = tmp;
        }

        return i; // just to shutup compiler
}


int cmpfunc (const void * a, const void * b)
{
  if (*(unsigned long long int*)a > *(unsigned long long int*)b) return 1;
  if (*(unsigned long long int*)a < *(unsigned long long int*)b) return -1;
  return 0;
}

int main(){
    int array[ROW_COUNT][ROW_LENGTH],
        decodedResultsArray[ROW_COUNT][ROW_LENGTH];

    const int rawData[] = {     1, 2, 3, 4, 5, 6,
                                6, 8, 9, 10, 13, 15,
                                1, 4, 5, 6, 7, 9,
                                1, 4, 5, 6, 7, 8,
                                3, 18, 19, 20, 25, 34,
                                6,17,19,20,22,46,
                                5,10,34,40,41,49,
                                5,19,20,37,53,57,
                                16,23,26,36,42,50,
                                12,19,20,30,47,53,
                                16,25,30,48,56,59,
                                8,11,25,35,36,45,
                                2,12,16,21,32,40,
                                4,6,31,36,37,44,
                                21,28,31,48,52,57
                    };

    memcpy(array, rawData, sizeof(rawData)/sizeof(*rawData)); // copy elements into array memory

    // Sort
    // precompute keys
    unsigned long long int *rowSums = calloc(ROW_COUNT, sizeof(unsigned long long int));
    unsigned long long int *sortedSums = rowSums ? calloc(ROW_COUNT, sizeof(unsigned long long int)) : NULL; // if rowSums is null, don't bother trying to allocate.
    if(!rowSums || !sortedSums){
        free(rowSums);
        free(sortedSums);
        fprintf(stderr, "Failed to allocate memory!\n");
        fflush(stderr); // should be unnecessary, but better to make sure it gets printed
        exit(100);
    }

    int i=0, j=0, k=0;
    for(; i < ROW_COUNT; i++){
        rowSums[i] = 0; // this should be handled by calloc, but adding this for debug
        for(j=0; j < ROW_LENGTH; j++){
            unsigned long long int iScalar=1;
            for(k=ROW_LENGTH-1; k > j; --k)
                iScalar *= 100; // probably not the most efficient way to compute this, but this is meant more as an example/proof of concept

            unsigned long long int iHere = array[i][j];
            rowSums[i] += (iHere * iScalar);

            // printf("DEBUG ITERATION REPORT\n\t\tRow #%d\n\t\tColumn #%d\n\t\tiScalar: %llu\n\t\tiHere: %llu\n\t\tCurrent Sum for Row: %llu\n\n", i, j, iScalar, iHere, rowSums[i]);
            fflush(stdout);
        }
    }

    memcpy(sortedSums, rowSums, sizeof(unsigned long long int)*ROW_COUNT);

    // Some debugging output:
    /*

    printf("Uncopied Sums:\n");
    for(i=0; i < ROW_COUNT; i++)
        printf("SortedRowSums[%d] = %llu\n", i, rowSums[i]);

    printf("Memcopyed sort array:\n");
    for(i=0; i < ROW_COUNT; i++)
        printf("SortedRowSums[%d] = %llu\n", i, sortedSums[i]);

    */

    clock_t begin = clock();

    //qsort(sortedSums, ROW_COUNT, sizeof(unsigned long long int), cmpfunc);
    sortUlliArray(sortedSums, ROW_COUNT);

    clock_t end = clock();
    double time_spent = (double)(end - begin) / CLOCKS_PER_SEC;

    printf("Time for sort: %lf\n", time_spent);
    printf("Before sort array:\n");
    for(i=0; i<ROW_COUNT; i++){
        for(j=0; j < ROW_LENGTH; j++){
            printf("Unsorted[%d][%d] = %d\n", i, j, array[i][j]);
        }
    }

    printf("Values of sorted computed keys:\n");
    for(i=0; i < ROW_COUNT; i++)
        printf("SortedRowSums[%d] = %llu\n", i, sortedSums[i]);

    // Unpack:
    for(i=0; i < ROW_COUNT; i++){
        for(j=0; j < ROW_LENGTH; j++){
            unsigned long long int iScalar=1;
            for(k=ROW_LENGTH-1; k > j; --k)
                iScalar *= 100;

            unsigned long long int removalAmount = sortedSums[i]/iScalar;

            decodedResultsArray[i][j] = removalAmount;
            sortedSums[i] -= (removalAmount*iScalar);
            // DEBUG:
            // printf("Decoded Result for decoded[%d][%d] = %d\n", i, j, decodedResultsArray[i][j]);
        }
    }

    printf("\nFinal Output:\n");
    for(i=0; i < ROW_COUNT; i++){
        printf("Row #%d: %d", i, decodedResultsArray[i][0]);
        for(j=1; j < ROW_LENGTH; j++){
            printf(", %d", decodedResultsArray[i][j]);
        }
        puts("");
    }
    fflush(stdout);

    free(rowSums);
    free(sortedSums);

    return 1;
}

请注意,这并非全部针对最大效率进行了优化,并且充斥着调试输出语句,但尽管如此,它是关于打包如何工作的概念证明。此外,考虑到您必须处理的行数,您可能会更好地使用qsort(),但我使用sortUlliArray(...)(这是this StackOverflow answer 的插入排序功能的修改版本)。您必须对其进行测试,看看哪种方法最适合您的情况。

总而言之,在 15 个硬编码行上运行此代码的最终输出是:

Row #0: 1, 2, 3, 4, 5, 6
Row #1: 1, 4, 5, 6, 7, 8
Row #2: 1, 4, 5, 6, 7, 9
Row #3: 2, 12, 16, 21, 32, 40
Row #4: 3, 18, 19, 20, 25, 34
Row #5: 4, 6, 31, 36, 37, 44
Row #6: 5, 10, 34, 40, 41, 49
Row #7: 5, 19, 20, 37, 53, 57
Row #8: 6, 8, 9, 10, 13, 15
Row #9: 6, 17, 19, 20, 22, 46
Row #10: 8, 11, 25, 35, 36, 45
Row #11: 12, 19, 20, 30, 47, 53
Row #12: 16, 23, 26, 36, 42, 50
Row #13: 16, 25, 30, 48, 56, 59
Row #14: 21, 28, 31, 48, 52, 57

因此,这似乎确实可以处理数字非常相似的情况,这是一个可归因于数字打包顺序的问题。

无论如何,上面的代码应该可以工作,但它只是一个示例,所以我将把它留给你应用必要的优化。

代码在配备 1.6 GHz Intel Core i5 和 4 GB 1600 MHz DDR3 的 64 位 MacBook Air 上进行了测试。所以,一个相当弱的 CPU 和缓慢的内存,但它能够在 0.004 毫秒内完成 15 行的排序,在我看来,速度相当快。 (这只是对上述测试用例的排序函数速度的衡量,而不是预打包或拆包速度的衡量标准,因为它们可以使用一些优化。)

主要归功于 Doynax 和 Jonathan Leffler。

【讨论】:

  • 这可能是一个愚蠢的问题,但我能问一下为什么你的 sortUlliArray 函数返回一个值来“关闭编译器”而不是仅仅使用 void 作为返回类型吗?
  • 很简单,因为 the referenced answer 中的原始函数就是这样做的。如果我没记错的话,大多数编译器都需要返回类型来执行内联。因此,我保留了返回类型并抑制编译器警告,我只是返回了i。不一定是处理编译器警告的最佳实践,但它完成了工作。用宏定义替换sortUlliArray(...) 可能会更好。
  • 虽然插入排序对于小型数据集来说非常快,但它的 O(n^2) 增长非常糟糕:对于像 @jhilgeman 的 33,000,000 条记录这样的大型数据集,代码运行在大约 1 的基线每次操作微秒(这似乎与您在 MacBook Air 上测量的值有关),我有理由预计您的代码需要大约 35 年才能完成。使用算法上更高效的方法(例如快速排序)会好得多,或者更好的是,例如我在 my answer 中写的基数和计数排序。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2013-02-24
  • 1970-01-01
  • 2021-02-03
  • 2019-06-08
  • 1970-01-01
  • 2011-12-17
  • 2013-08-17
相关资源
最近更新 更多