【问题标题】:How to optimise the re-ordering of an array?如何优化数组的重新排序?
【发布时间】:2019-04-18 16:21:26
【问题描述】:

我想优化一些包含大约 400 万个无符号短裤的数据数组的重新排序。目的是通过使应该彼此相似的值彼此接近来处理数据流。伪代码是这样的:

  for( i=0; i<n; i++)
    dest[i] = src[ idx[i] ] ;

为了优化 idx[i] 的特定列表的代码,我尝试编译一个 400 万行的 c 函数,其中填充了 idx 值:

void reorder( unsigned short * restrict i, unsigned short * restrict o) {
  o[0]=i[2075723];
  o[1]=i[2075724];
  o[2]=i[2075722];
  ...
  o[4194301]=i[4192257];
  o[4194302]=i[4192256];
  o[4194303]=i[4190208];
 }

我曾希望让 GCC 创建一个巧妙的 pshufw/pblend/unpack 指令流……但它在用完大量内存(7 GB)后挂起。我试图制作基于副本的版本以避免就地进行交换的复杂性。

有没有人能提出好的方法来为这个问题生成优化的代码?到目前为止我试过了:

  • 有序读取,随机写入:60 毫秒(openmp 没有帮助)
  • 有序写入,随机读取:20 ms (openmp -> 4 ms)

我希望最终能更接近内存带宽(大约 0.4 毫秒)。考虑到缓存大小并进行阻塞的方案应该会有所帮助,但我不知道从哪里开始设计一个来做它。我也想知道是否有一种利用 SIMD 指令的简单方法?

用转置制作一个玩具示例我什至无法让 gcc 输出 SIMD 版本,请参阅:

https://godbolt.org/z/bzGWad

这对编译器来说是个难题还是我遗漏了一些简单的问题?

编辑 21/11/2018 添加了一个完整但最小的问题示例

这是我试图优化的问题的完整示例。实际上排序是一个更复杂的函数,但重点只是根据像素与图像中心的距离对数据像素进行排序,就像展开螺旋一样。

#include <omp.h>
#include <vector>
#include <cmath>
#include <iostream>
#include <iomanip>
#include <algorithm>

#define N 2048

// Sorting on output, one core
void reorder_simple( const std::vector<size_t> &indices,
             const unsigned short input[],
             unsigned short output[]){
  for( int i=0; i<N*N; i++)
    output[i] = input[ indices[i] ];
}
// Sorting on output write, many cores
void reorder_omp( const std::vector<size_t> &indices,
          const unsigned short input[],
          unsigned short output[]){
#pragma omp parallel for
  for( int i=0; i<N*N; i++)
    output[i] = input[ indices[i] ];
}
// Benchmark for memory throughput, one core
void copy_simple(  const std::vector<size_t> &indices,
           const unsigned short input[],
           unsigned short output[]){
  for( int i=0; i<N*N; i++)
    output[i] = input[i];
}
// Benchmark for memory throughput, many cores
void copy_omp (  const std::vector<size_t> &indices,
         const unsigned short input[],
         unsigned short output[]){
#pragma omp parallel for
  for( int i=0; i<N*N; i++)
    output[i] = input[i];
}

// Macro to avoid retyping
#define bench(func)                                          \
  func( indices, input, output);                             \
  start = omp_get_wtime();                                   \
  for( size_t i=0; i<100; i++)                               \
      func( indices, input, output );                        \
  end =  omp_get_wtime();                                    \
  std:: cout << std::setw(15) << #func <<                    \
     ", Time taken: "  << (end-start)/100 << " /s\n";

int main()
{
  std::vector<float> sort_order(N*N);
  std::vector<size_t> indices(N*N);
  float radius, azimuth, ci, cj;
  double start, end;
  unsigned short *input, *output;

  ci = N*0.496;  // changes according to calibration
  cj = N*0.4985;  // reality is more complicated (tilts etc)
  for( size_t i=0; i<N; i++){
    for( size_t j=0; j<N; j++){
      radius  = sqrt( (i-ci)*(i-ci) + (j-cj)*(j-cj) );
      azimuth = atan2( i-ci, j-cj ); // from -pi to pi
      sort_order[i*N+j] = round( radius ) + azimuth/2/M_PI;
      indices[i*N+j] = i*N+j;
    }
  }
  // Find the order to sort data onto a spiral 
  std::sort( indices.begin(), indices.end(),
         [&sort_order](int i, int j){
           return sort_order[i] < sort_order[j]; });
  // Invent some test data
  input = new unsigned short [N*N];
  output = new unsigned short [N*N];
  for( size_t i=0 ; i<N*N; i++){
    input[i] = i;
    output[i]= 0;
  }
  // some timing:
  bench(reorder_simple);
  bench(reorder_omp)   ;
  bench(copy_simple)   ;
  bench(copy_omp)      ;
}


   % g++ reorder.cpp -o reorder -std=c++11 -O3 -march=native -fopenmp -Wall
   % ./reorder
     reorder_simple, Time taken: 0.0179023 /s
        reorder_omp, Time taken: 0.00349932 /s
        copy_simple, Time taken: 0.00140805 /s
           copy_omp, Time taken: 0.000250205 /s

我想让reorder_omp 函数的速度更接近copy_omp 函数的速度。检测器可以以每秒 500 帧的速度运行,因此 3.5 毫秒与 0.25 毫秒相比是糟糕的。

再次编辑:21/11/2018 编写无法编译的函数的代码

  //top of file
  #include <fstream>  
  ...
  //just before the end: 
  std::ofstream out;
  out.open("cfunc.c");
  out << "void cfunc( unsigned short * restrict input,\n" <<
         "            unsigned short * restrict output){ \n"; 
  for(int i=0;i<N;i++)
    for(int j=0;j<N;j++)
      out << "output[" << i*N+j << "] = input[" << indices[i*N+j] << "];\n";
  out << "}\n";
  out.close();

在另一台机器上测试这个我从 gcc (7.3.0) 和 clang (6.0.0) 得到编译器错误。它使用 tcc (0.9.27) 编译和运行,但完成速度比索引循环慢。

【问题讨论】:

  • 那么idx[] 是编译时常量吗?它有很多地方性吗?就像来自附近源元素的目标元素组一样?但是它没有模式,所以除了使用收集指令或标量之外,您不能制作循环?这是x86吗?您正在调整哪些微架构?
  • 模式是什么?显示的两个三元组具有output + (0, 1, 2) = input + (1, 2, 0)output + (0, 1, 2) = input + (1, 0, 2) 模式。将相同的代码写出 40 次,更不用说 4,000,000 次了,应该会让你不寒而栗(你应该写一个程序来写程序,至少!)。我感觉这里有一个XY Problem
  • 我不确定 gcc 是否知道如何从这样的标量负载/存储中构建洗牌。即使您使用short *__restrict dst 告诉它 src 和 dst 不会重叠以使其成为可能。 clang 可能会做一些事情,但我对编译时内存使用量很大并不感到惊讶。你的想法很好;某种缓存阻塞应该有助于将读取和写入分组到由几个(少于 8 个)缓存行组成的小集合中,最好也具有相对于 4k 页面的更粗略的局部性。
  • 顺便说一句:您的无符号短裤宽度是否超​​过 16 位?
  • idx 是一个编译时间常数。现在是 x86,但也许我们应该改用 gpu。 Idx 每天都会发生变化,但对于数据块(数百万张图像)保持不变。数据为 16 位。有图案,目标输出大致是探测器原始矩形图像的螺旋形也许我应该把这个信息放在上面?

标签: c++ arrays gcc optimization compiler-optimization


【解决方案1】:

(评论部分太短)

我会测试以下想法:

  1. 维护反向索引表,让朴素算法变成:

     for (i = 0; i<n; i++) {
       dest[index[i]] = src[i];
     }
    
  2. 而不是使用朴素的算法:

    2.1 创建对(value, destindex)的临时数组

    struct pair {
      int value;
      int destindex;
    };
    for (i = 0; i < n; i++) {
      pairs[i] = {.value=src[i], .destindex=index[i]};
    }
    

    2.2 使用合并或快速排序按.destindex 字段对数组进行排序

    2.3 将值从对数组中复制到dest 数组中

此算法中没有随机访问,因此没有随机访问页面错误。但是,由于大量的线性通道,我不确定它是否会比朴素算法更好。

【讨论】:

  • OP 已经考虑了您的 1. 想法,并发现“有序读取,随机写入”比“有序写入,随机读取”慢 3 倍。排序可以并行化并在更好的局部性下完成,但是是的,这可能不是一场胜利。
  • 我尝试了类似的方法,创建小块输出(64 个值)并根据最近读取的内容对它们进行排序,从而将运行时间提高了约 20%。它有所帮助,但可能排序对管道仍然不利,并且仍然没有矢量化。
猜你喜欢
  • 2018-09-26
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-08-17
  • 1970-01-01
  • 2014-01-04
  • 1970-01-01
相关资源
最近更新 更多