【问题标题】:Optimizing variable-length encoding优化可变长度编码
【发布时间】:2011-05-02 14:51:45
【问题描述】:

我有一个案例需要压缩很多通常很小的值。因此我用可变长度字节编码(ULEB128,具体来说)压缩它们:

size_t
compress_unsigned_int(unsigned int n, char* data)
{
  size_t size = 0;
  while (n  > 127)
  {
    ++size;
    *data++ = (n & 127)|128;
    n >>= 7;
  }
  *data++ = n;
  return ++size;
}

有没有更有效的方法来做到这一点(也许使用 SSE)?

编辑:压缩后,结果存储到data,占用size字节。然后,在下一个 unsigned int 上调用压缩函数。

【问题讨论】:

  • 技术上这称为 ULEB128 编码。
  • 我认为您应该添加一些关于 n 变量类型的内容,因为它的确切范围限制可能会影响使用 SSE 和类似扩展的可能性。
  • 您可能想查看其他编码方法。在 protobuf 的开发过程中,Google 执行了各种基准测试,他们发现前导位指标很慢,因为您对每个数据字节都有一个测试(基本上是不可预测的)。发现将结构分成两部分:首先指示每个“包”的长度,然后推挤包,速度更快。
  • 您可以尝试将循环转换为 bitscan(n)/7 上的开关。此外,不要进行 1 字节存储,而是使用临时变量,然后将其 memcpy()
  • @Alexandre:我正在阅读 Jeffrey Dean,他是 Google 的开发人员之一 (research.google.com/people/jeff/index.html)。我再也找不到 protobuf 的二进制格式规范了。我记得他们使用了 4 种不同的压缩技术,具体取决于消息的部分。 (我认为有些人使用 ULEB128,尽管他们称之为 varint)。

标签: c++ c assembly sse


【解决方案1】:

您要做的第一件事是针对您当前的代码测试任何可能的解决方案。

我想你可能想尝试摆脱数据依赖,让处理器同时做更多的工作。

什么是数据依赖关系? 当数据流经您的函数时,n 的当前值取决于 n 的先前值,而这取决于之前的值...是一长串的数据依赖关系。在下面的代码中,n 永远不会被修改,因此处理器可以“跳过”并同时执行几项不同的操作,而无需等待新的n 被计算出来。

// NOTE: This code is actually incorrect, as caf noted.
// The byte order is reversed.
size_t
compress_unsigned_int(unsigned int n, char *data)
{
    if (n < (1U << 14)) {
        if (n < (1U << 7)) {
            data[0] = n;
            return 1;
        } else {
            data[0] = (n >> 7) | 0x80;
            data[1] = n & 0x7f;
            return 2;
        }
    } else if (n < (1U << 28)) {
        if (n < (1U << 21)) {
            data[0] = (n >> 14) | 0x80;
            data[1] = ((n >> 7) & 0x7f) | 0x80;
            data[2] = n & 0x7f;
            return 3;
        } else {
            data[0] = (n >> 21) | 0x80;
            data[1] = ((n >> 14) & 0x7f) | 0x80;
            data[2] = ((n >> 7) & 0x7f) | 0x80;
            data[3] = n & 0x7f;
            return 4;
        }
    } else {
        data[0] = (n >> 28) | 0x80;
        data[1] = ((n >> 21) & 0x7f) | 0x80;
        data[2] = ((n >> 14) & 0x7f) | 0x80;
        data[3] = ((n >> 7) & 0x7f) | 0x80;
        data[4] = n & 0x7f;
        return 5;
    }
}

我通过在 0..UINT_MAX 的紧密循环中执行它来测试性能。在我的系统上,执行时间是:

(Lower is better)
Original: 100%
caf's unrolled version: 79%
My version: 57%

一些小的调整可能会产生更好的结果,但我怀疑除非你去组装,否则你会得到更多的改进。如果您的整数倾向于在特定范围内,那么您可以使用分析让编译器将正确的分支预测添加到每个分支。这可能会使您的速度提高几个百分点。 (编辑:我从重新排序分支中获得了 8%,但这是一种不正当的优化,因为它依赖于每个数字 0...UINT_MAX 以相同的频率出现的事实。我不推荐这样做。 )

SSE 无济于事。 SSE 旨在同时处理具有相同宽度的多条数据,众所周知,要让 SIMD 加速具有可变长度编码的任何内容是非常困难的。 (这不一定是不可能的,但它可能是不可能的,而且你必须非常聪明才能弄清楚。)

【讨论】:

  • 是的,这实际上只是进一步展开了展开过程。您可以根据输入的频率分布对分支重新排序 - 我首先测试了&lt; 0x80,因为 OP 说这些值“通常很小”
  • 我很确定你可以使用一些模板 voodoo 来做同样的事情(编译时固定大小的循环展开)。
  • 顺便说一句,这是不正确的 - 它应该首先输出最低有效位。
  • @Alexandre:很有趣。我用于测试的系统有一个 AMD Phenom II,我的版本在使用 GCC 和 clang 时都明显更快……而且 GCC 版本都比 clang 版本快。 (GCC 生成的代码执行时间是 clang 的 86%。)
  • @valdo:我不只是在做循环展开,我在做数据依赖消除。如果您有把握,请向我们展示代码。
【解决方案2】:

如果您的 unsigned int 值限制在特定范围内 - 例如 32 位 - 您可以展开循环:

size_t
compress_unsigned_int(unsigned int n, char* data)
{
  size_t size;

  if (n < 0x00000080U) {
    size = 1;
    goto b1;
  }
  if (n < 0x00004000U) {
    size = 2;
    goto b2;
  }
  if (n < 0x00200000U) {
    size = 3;
    goto b3;
  }
  if (n < 0x10000000U) {
    size = 4;
    goto b4;
  }
  size = 5;

  *data++ = (n & 0x7f) | 0x80;
  n >>= 7;
b4:
  *data++ = (n & 0x7f) | 0x80;
  n >>= 7;
b3:
  *data++ = (n & 0x7f) | 0x80;
  n >>= 7;
b2:
  *data++ = (n & 0x7f) | 0x80;
  n >>= 7;
b1:
  *data = n;
  return size;
}

【讨论】:

  • 一个好的优化编译器(gcc、msvc、intel's)也应该为你做这件事。
  • @Blindy:在我的测试中,gcc 4.4.5 没有展开这个特定的例子。
  • 两个注意事项:1)您可能应该尝试将这些单字节存储合并为多字节存储(例如 5->1+4、4->4、3->1+2、2 ->2, 1->1),这也将迫使你在跳转后摆脱对 n 的数据依赖 - 2) 用 bitscan(n)/7 上的直通开关替换 ifs 和跳转跨度>
  • @CAFxX:你有解释或测量来支持这一点吗?编译器使用与 if 语句相同的分支操作来实现 case 语句,并按照它们认为合适的方式重新排列代码块,因此您不太可能看到“if”和“switch”之间的一致差异。此外,未对齐的多字节存储在各种架构上都非常慢,并且会在 SPARC 或(某些?)ARM 目标上使您的程序崩溃。
  • @Dietrich:编译器不一定将switch 语句实现为单独的if 语句。您可能会看到对于小型交换机,但对于具有大量分支的交换机,我已经看到 gcc 执行二叉树搜索,然后执行少量显式测试。
【解决方案3】:

您可能会在 google 协议缓冲区中找到快速实现:

http://code.google.com/p/protobuf/

查看 CodedOutputStream::WriteVarintXXX 方法。

第一种方法可以改写为:

char *start = data;
while (n>=0x80)
{
    *data++=(n|0x80);
    n>>=7;
}
*data++=n;
return data-start;

根据我的测试,谷歌缓冲区实现是最好的,然后是其他实现。然而我的测试是相当人为的,最好在你的应用程序中测试每种方法并选择最好的。提出的优化在特定数值上效果更好。

这是我的测试应用程序的代码。 (请注意,我已从 compress_unsigned_int_google_buf 中删除了代码。您可能会在 google 缓冲区协议的以下文件中找到实现:coded_stream.cc 方法 CodedOutputStream::WriteVarint32FallbackToArrayInline)

size_t compress_unsigned_int(unsigned int n, char* data)
{
    size_t size = 0;
    while (n  > 127)
    {
        ++size;
        *data++ = (n & 127)|128;
        n >>= 7;
    }
    *data++ = n;
    return ++size;
}

size_t compress_unsigned_int_improved(unsigned int n, char* data)
{
    size_t size;

    if (n < 0x00000080U) {
        size = 1;
        goto b1;
    }
    if (n < 0x00004000U) {
        size = 2;
        goto b2;
    }
    if (n < 0x00200000U) {
        size = 3;
        goto b3;
    }
    if (n < 0x10000000U) {
        size = 4;
        goto b4;
    }
    size = 5;

    *data++ = (n & 0x7f) | 0x80;
    n >>= 7;
b4:
    *data++ = (n & 0x7f) | 0x80;
    n >>= 7;
b3:
    *data++ = (n & 0x7f) | 0x80;
    n >>= 7;
b2:
    *data++ = (n & 0x7f) | 0x80;
    n >>= 7;
b1:
    *data = n;
    return size;
}

size_t compress_unsigned_int_more_improved(unsigned int n, char *data)
{
    if (n < (1U << 14)) {
        if (n < (1U << 7)) {
            data[0] = n;
            return 1;
        } else {
            data[0] = (n >> 7) | 0x80;
            data[1] = n & 0x7f;
            return 2;
        }
    } else if (n < (1U << 28)) {
        if (n < (1U << 21)) {
            data[0] = (n >> 14) | 0x80;
            data[1] = ((n >> 7) & 0x7f) | 0x80;
            data[2] = n & 0x7f;
            return 3;
        } else {
            data[0] = (n >> 21) | 0x80;
            data[1] = ((n >> 14) & 0x7f) | 0x80;
            data[2] = ((n >> 7) & 0x7f) | 0x80;
            data[3] = n & 0x7f;
            return 4;
        }
    } else {
        data[0] = (n >> 28) | 0x80;
        data[1] = ((n >> 21) & 0x7f) | 0x80;
        data[2] = ((n >> 14) & 0x7f) | 0x80;
        data[3] = ((n >> 7) & 0x7f) | 0x80;
        data[4] = n & 0x7f;
        return 5;
    }
}

size_t compress_unsigned_int_simple(unsigned int n, char *data)
{
    char *start = data;
    while (n>=0x80)
    {
        *data++=(n|0x80);
        n>>=7;
    }
    *data++=n;
    return data-start;
}

inline size_t compress_unsigned_int_google_buf(unsigned int value, unsigned char* target) {

          // This implementation might be found in google protocol buffers

}



#include <iostream>
#include <Windows.h>
using namespace std;

int _tmain(int argc, _TCHAR* argv[])
{
    char data[20];
    unsigned char udata[20];
    size_t size = 0;
    __int64 timer;

    cout << "Plain copy: ";

    timer = GetTickCount64();

    size = 0;

    for (int i=0; i<536870900; i++)
    {
        memcpy(data,&i,sizeof(i));
        size += sizeof(i);
    }

    cout << GetTickCount64() - timer << " Size: " << size <<  endl;

    cout << "Original: ";

    timer = GetTickCount64();

    size = 0;

    for (int i=0; i<536870900; i++)
    {
        size += compress_unsigned_int(i,data);
    }

    cout << GetTickCount64() - timer << " Size: " << size << endl;

    cout << "Improved: ";

    timer = GetTickCount64();

    size = 0;

    for (int i=0; i<536870900; i++)
    {
        size += compress_unsigned_int_improved(i,data);
    }

    cout << GetTickCount64() - timer << " Size: " << size <<  endl;

    cout << "More Improved: ";

    timer = GetTickCount64();

    size = 0;

    for (int i=0; i<536870900; i++)
    {
        size += compress_unsigned_int_more_improved(i,data);
    }

    cout << GetTickCount64() - timer << " Size: " << size <<  endl;

    cout << "Simple: ";

    timer = GetTickCount64();

    size = 0;

    for (int i=0; i<536870900; i++)
    {
        size += compress_unsigned_int_simple(i,data);
    }

    cout << GetTickCount64() - timer << " Size: " << size <<  endl;

    cout << "Google Buffers: ";

    timer = GetTickCount64();

    size = 0;

    for (int i=0; i<536870900; i++)
    {
        size += compress_unsigned_int_google_buf(i,udata);
    }

    cout << GetTickCount64() - timer << " Size: " << size <<  endl;

    return 0;
}

在我的带有 Visual C++ 编译器的机器上,我得到了以下结果:

普通副本:358 毫秒

原始:2497 毫秒

改进:2215 毫秒

更多改进:2231 毫秒

简单:2059 毫秒

谷歌缓冲区:968 毫秒

【讨论】:

    【解决方案4】:

    经过更多的浏览,我发现了另一个在Sqlite3中常用的实现(代码版本3070900):

    inline int sqlite3PutVarint(unsigned char *p, unsigned __int64 v){
      int i, j, n;
      unsigned char buf[10];
      if( v & (((unsigned __int64)0xff000000)<<32) ){
        p[8] = (unsigned char)v;
        v >>= 8;
        for(i=7; i>=0; i--){
          p[i] = (unsigned char)((v & 0x7f) | 0x80);
          v >>= 7;
        }
        return 9;
      }    
      n = 0;
      do{
        buf[n++] = (unsigned char)((v & 0x7f) | 0x80);
        v >>= 7;
      }while( v!=0 );
      buf[0] &= 0x7f;
      for(i=0, j=n-1; j>=0; j--, i++){
        p[i] = buf[j];
      }
      return n;
    }
    

    还有针对 32 位 int 的略微优化版本:

    int sqlite3PutVarint32(unsigned char *p, unsigned int v){
    
      if( (v & ~0x7f)==0 ){
        p[0] = v;
        return 1;
      }
    
      if( (v & ~0x3fff)==0 ){
        p[0] = (unsigned char)((v>>7) | 0x80);
        p[1] = (unsigned char)(v & 0x7f);
        return 2;
      }
      return sqlite3PutVarint(p, v);
    }
    

    令人失望的是,Sqlite 实现在我的测试中表现最差。因此,如果您要使用 Sqlite,请考虑将默认实现替换为优化的实现。

    与此同时,我正在考虑进一步可能的优化。

    【讨论】:

      【解决方案5】:

      您可以通过将
      size_t size=0;...++size;...;return size++; 替换为
      char* base=data;...;return data-base; 来节省一些操作

      【讨论】:

      • 编译器通常会在启用优化时自动执行此操作。
      【解决方案6】:

      这是我在 x86 汇编语言(32 位)中的优化。您可以使用 NASM 和链接进行编译。我不知道它是快还是慢,我只是玩编码:)

      global compress_unsigned_int
      
      ;   bit fields:
      ;   31                              0
      ;    eeeedddddddcccccccbbbbbbbaaaaaaa
      
      
      compress_unsigned_int:
          mov     eax, [esp+4]    ; n
          mov     ecx, [esp+8]    ; data
      
          cmp     eax, 00001111111111111111111111111111b
          jbe     out4b
      
          shld    edx, eax, 11
          shl     eax, 10
          shld    edx, eax, 8
          shl     eax, 7
          shld    edx, eax, 8
          shl     eax, 7
          shld    edx, eax, 8
          or      edx, 10000000100000001000000010000000b
      
          mov     [ecx], edx
          mov     eax, [esp+4]
          shr     eax, 28
          mov     [ecx+4], al
      
          mov     eax, 5
          jmp     exit
      
      out4b:
          cmp     eax, 00000000000111111111111111111111b
          jbe     out3b
      
          shld    edx, eax, 11
          shl     eax, 10
          shld    edx, eax, 8
          shl     eax, 7
          shld    edx, eax, 8
          shl     eax, 7
          shld    edx, eax, 8
          or      edx, 00000000100000001000000010000000b
      
          mov     [ecx], edx
      
          mov     eax, 4
          jmp     exit
      
      out3b:
          cmp     eax, 00000000000000000011111111111111b
          jbe     out2b
      
          shld    edx, eax, 25
          shl     eax, 24
          shld    edx, eax, 8
      
          mov     eax, edx
      
          or      edx, 00000000000000001000000010000000b
      
          mov     [ecx], dx
          shr     eax, 15
          mov     [ecx+2], al
      
          mov     eax, 3
          jmp     exit
      
      out2b:
          cmp     eax, 00000000000000000000000001111111b
          jbe     out1b
      
          shld    edx, eax, 25
          shl     eax, 24
          shld    edx, eax, 8
          or      edx, 00000000000000000000000010000000b
      
          mov     [ecx], dx
      
          mov     eax, 2
          jmp     exit
      
      out1b:
          mov     [ecx], al
      
          mov     eax, 1
      
      exit:
          ret
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2017-02-05
        • 2011-04-03
        • 2017-04-18
        • 2014-11-01
        • 2016-04-29
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多