这是 C++/17 中的 AVX2 版本,未经测试。
#include <immintrin.h>
#include <stdint.h>
#include <array>
#include <assert.h>
using Accumulators = std::array<__m256d, 4>;
// Conditionally accumulate 4 lanes, with source data from a register
template<int i>
inline void conditionalSum4( Accumulators& acc, __m256d vals, __m256i broadcasted )
{
// Unless the first register in the batch, shift mask values by multiples of 4 bits
if constexpr( i > 0 )
broadcasted = _mm256_srli_epi64( broadcasted, i * 4 );
// Bits 0-3 in 64-bit lanes of `broadcasted` correspond to the values being processed
// Compute mask from the lowest 4 bits of `broadcasted`, each lane uses different bit
const __m256i bits = _mm256_setr_epi64x( 1, 2, 4, 8 );
__m256i mask = _mm256_and_si256( broadcasted, bits );
mask = _mm256_cmpeq_epi64( mask, bits ); // Expand bits into qword-s
// Bitwise AND to zero out masked out lanes: integer zero == double 0.0
// BTW, if your mask has 1 for values you want to ignore, _mm256_andnot_pd
vals = _mm256_and_pd( vals, _mm256_castsi256_pd( mask ) );
// Accumulate the result
acc[ i ] = _mm256_add_pd( acc[ i ], vals );
}
// Conditionally accumulate 4 lanes, with source data from memory
template<int i>
inline void conditionalSum4( Accumulators& acc, const double* source, __m256i broadcasted )
{
constexpr int offset = i * 4;
const __m256d vals = _mm256_loadu_pd( source + offset );
conditionalSum4<i>( acc, vals, broadcasted );
}
// Conditionally accumulate lanes from memory, for the last potentially incomplete vector
template<int i>
inline void conditionalSumPartial( Accumulators& acc, const double* source, __m256i broadcasted, size_t count )
{
constexpr int offset = i * 4;
__m256d vals;
__m128d low, high;
switch( count - offset )
{
case 1:
// Load a scalar, zero out other 3 lanes
vals = _mm256_setr_pd( source[ offset ], 0, 0, 0 );
break;
case 2:
// Load 2 lanes
low = _mm_loadu_pd( source + offset );
high = _mm_setzero_pd();
vals = _mm256_setr_m128d( low, high );
break;
case 3:
// Load 3 lanes
low = _mm_loadu_pd( source + offset );
high = _mm_load_sd( source + offset + 2 );
vals = _mm256_setr_m128d( low, high );
break;
case 4:
// The count of values was multiple of 4, load the complete vector
vals = _mm256_loadu_pd( source + offset );
break;
default:
assert( false );
return;
}
conditionalSum4<i>( acc, vals, broadcasted );
}
// The main function implementing the algorithm.
// maskBytes argument is densely packed mask values with 1 bit per double, the size must be ( ( count + 7 ) / 8 )
// Each byte of the mask packs 8 boolean values, the first value of the byte is in the least significant bit.
double conditionalSum( const double* source, const uint8_t* maskBytes, size_t count )
{
// Zero-initialize all 4 accumulators
std::array<__m256d, 4> acc;
acc[ 0 ] = acc[ 1 ] = acc[ 2 ] = acc[ 3 ] = _mm256_setzero_pd();
// The main loop consumes 16 scalars, and 16 bits of the mask, per iteration
for( ; count >= 16; source += 16, maskBytes += 2, count -= 16 )
{
// Broadcast 16 bits of the mask from memory into AVX register
// Technically, C++ standard says casting pointers like that is undefined behaviour.
// Works fine in practice; alternatives exist, but they compile into more than 1 instruction.
const __m256i broadcasted = _mm256_set1_epi16( *( (const short*)maskBytes ) );
conditionalSum4<0>( acc, source, broadcasted );
conditionalSum4<1>( acc, source, broadcasted );
conditionalSum4<2>( acc, source, broadcasted );
conditionalSum4<3>( acc, source, broadcasted );
}
// Now the hard part, dealing with the remainder
// The switch argument is count of vectors in the remainder, including incomplete ones.
switch( ( count + 3 ) / 4 )
{
case 0:
// Best case performance wise, the count of values was multiple of 16
break;
case 1:
{
// Note we loading a single byte from the mask instead of 2 bytes. Same for case 2.
const __m256i broadcasted = _mm256_set1_epi8( (char)*maskBytes );
conditionalSumPartial<0>( acc, source, broadcasted, count );
}
case 2:
{
const __m256i broadcasted = _mm256_set1_epi8( (char)*maskBytes );
conditionalSum4<0>( acc, source, broadcasted );
conditionalSumPartial<1>( acc, source, broadcasted, count );
break;
}
case 3:
{
const __m256i broadcasted = _mm256_set1_epi16( *( (const short*)maskBytes ) );
conditionalSum4<0>( acc, source, broadcasted );
conditionalSum4<1>( acc, source, broadcasted );
conditionalSumPartial<2>( acc, source, broadcasted, count );
break;
}
case 4:
{
const __m256i broadcasted = _mm256_set1_epi16( *( (const short*)maskBytes ) );
conditionalSum4<0>( acc, source, broadcasted );
conditionalSum4<1>( acc, source, broadcasted );
conditionalSum4<2>( acc, source, broadcasted );
conditionalSumPartial<3>( acc, source, broadcasted, count );
break;
}
}
// At last, compute sum of the 16 accumulated values
const __m256d r01 = _mm256_add_pd( acc[ 0 ], acc[ 1 ] );
const __m256d r23 = _mm256_add_pd( acc[ 2 ], acc[ 3 ] );
const __m256d sum4 = _mm256_add_pd( r01, r23 );
const __m128d sum2 = _mm_add_pd( _mm256_castpd256_pd128( sum4 ), _mm256_extractf128_pd( sum4, 1 ) );
const __m128d sum1 = _mm_add_sd( sum2, _mm_shuffle_pd( sum2, sum2, 0b11 ) );
return _mm_cvtsd_f64( sum1 );
}
几个有趣的点。
我将循环展开 16 个值,并使用 4 个独立的累加器。因为流水线增加了带宽。减少循环退出检查的频率,即更多的指令用于计算有用的东西。降低广播掩码值的频率,将数据从标量单位移动到矢量单位有一些延迟。请注意,我每 16 个元素仅从掩码加载一次,并通过位移重用向量。还可以提高精度,当您将小浮点值添加到大浮点值时,精度会丢失,16 个标量累加器会有所帮助。
正确处理这些余数,无需将数据从寄存器移动到内存再返回,这很复杂,需要部分加载等。
如果你将整数值从模板参数移到普通整数参数中,代码可能会停止编译,编译器会说类似“预期的常量表达式”。原因是,许多 SIMD 指令,包括_mm256_srli_epi64,将常量编码为指令的一部分,因此编译器需要知道这些值。另一件事,数组索引需要是 constexpr ,否则编译器会将数组从 4 个寄存器驱逐到 RAM 中,以便在您访问值时能够进行指针数学运算。累加器需要一直留在寄存器中,否则性能会大幅下降,即使 L1D 缓存比寄存器慢得多。
Here’s the output of gcc。该程序集似乎是合理的,编译器已成功内联所有内容,并且主循环中唯一的内存访问是源值。主循环在.L3 标签的下方。