用c#實現Protocol Buffers的變長字節整形編碼
摘要
int在.net里固定占4個字節,如果我們存儲和傳輸大量的int數據,并且大部分數的值比較小,我們就會浪費很多的網絡流量和磁盤存儲。Protocol Buffers對整數的編碼是讓值小的數占少量幾個的字節,值大的數占多個字節。
編碼算法
首先看如下鏈接,了解Protocol Buffers對整形的編碼算法。http://code.google.com/intl/zh-CN/apis/protocolbuffers/docs/encoding.html
它舉了個對300的編碼,編碼后是兩個字節:
它這個例子是每個字節左邊是高位,可以看到每個字節的最高位是一個標識位,從左到右第一個字節是10101100,最高位是1,說明后面還有字節需要解碼,然后第二個字節是00000010,最高位是0,后面沒字節了。所以這兩個字節就需要解碼成一個整數,再往下看。

→ 010 1100 000 0010 //每個字節去掉最高位
→ 000 0010 010 1100 //字節序轉換,兩個字節互換位置
→ 000 0010 ++ 010 1100 //兩個字節進行鏈接操作(不是相加)
→ 100101100 //合并后的結果,高位位0的部分截取掉
→ 256 + 32 + 8 + 4 = 300 //每個值為1的bit位乘以以2為底的冪,得出編碼后的值
這個例子是以一個正數為例的,負數也是一樣的,不過以補碼表示負數的話,有個符號位,而且高位都是1,編碼比較復雜,它沒有給舉例。不過后面咱們用c#實現的變字節編碼是支持對負數進行編碼的,只是對負數的編碼沒有壓縮的效果。
c#的的二進制基本操作
我們先來熟悉下c#的二進制的位操作,位了查看方便,我們先寫一個按二進制打印字節數組的輔助函數:

{
BitArray array = new BitArray(bytes);
StringBuilder sb = new StringBuilder();
for (int i = 0; i < array.Length; i++)
{
sb.Append(array[i] ? '1' : '0');
if ((i + 1) % 8 == 0)
sb.Append(' ');
}
return sb.ToString();
}
來測試查看幾個常見的位操作,具體情況詳見注釋:

{
//16進制換算成二進制,每個16進制位轉換成4個二進制位
//高位的3是0011,低位的2是0010,因為.net下是小字節序
//所以0x32的二進制表示是01001100
byte[] temp = new byte[] { 0x32 };
Console.Write("0x32的原始二進制表示:");
Console.WriteLine(ByteToString(temp));
//左移運算符 (<<) 將第一個操作數向左移動第二個操作數指定的位數。
//第一個操作數的高序位被放棄,低序空位用 0 填充。移位操作從不導致溢出。
//01001100左移3位應該是00001001
temp[0] <<= 3;
Console.Write("左移3位表示:");
Console.WriteLine(ByteToString(temp));
//00001001右移2位應該是01001000
temp[0] >>= 3;
Console.Write("右移2位表示:");
Console.WriteLine(ByteToString(temp));
//判斷高位第4位是否為1,0x80二進制表示是00000001
//右移3位是00001000,高位(從右向左以1開始數)第4位是1
//如果要檢查的數高位第4位是1的話,那么兩個數按位與后
//得出的數就只有這位是1,也就是還是00001000
int checkInt = 0x80 >> 3;
Console.WriteLine("高位第4位是:{0}", (temp[0] & checkInt) == checkInt);
//把高位第4位設置為0,0x80右移3位是00001000
//取反后是11110111,和原始字節01001000按位與后
//就是01000000
checkInt = ~(0x80 >> 3);
temp[0] &= (byte)checkInt;
Console.Write("把高位第4位設置為0:");
Console.WriteLine(ByteToString(temp));
//把高位第3位設置為1,0x80右移2位是00000100
//和原始字節01000000按位或后就是01000100
checkInt = 0x80 >> 2;
temp[0] |= (byte)checkInt;
Console.Write("把高位第3位設置為1:");
Console.WriteLine(ByteToString(temp));
}
輸出結果:
左移3位表示:00001001
右移2位表示:01001000
高位第4位是:True
把高位第4位設置為0:01000000
把高位第3位設置為1:01000100
實現編碼及單元測試
響應刀總號召,先寫單元測試,我們設計了GetBytesFromInt和GetIntFromBytes分別用來對int數據進行編碼和解碼,然后單元測試里對各種整數進行編碼,然后解碼查看是否還是原始數據。

{
UnitTest(300);
UnitTest(-1);
UnitTest(int.MaxValue);
UnitTest(255);
UnitTest(1024);
UnitTest(12234234);
UnitTest(-12234234);
}
private static void UnitTest(int input)
{
byte[] bytes = GetBytesFromInt(input);
if (_debug)
Console.WriteLine(ByteToString(bytes));
int result = GetIntFromBytes(bytes);
System.Diagnostics.Debug.Assert(input == result);
if (_debug)
Console.WriteLine(result);
}
代碼實現,詳細算法見注釋:

{
//1、得到int的原始字節,.net的整數的二進制表示高位在右邊
byte[] bytes = BitConverter.GetBytes(input);
BitArray array = new BitArray(bytes);
if (_debug)
Console.WriteLine(ByteToString(bytes));
//2、得到多字節編碼的字節數,并初始化輸出數組
//原始二進制的每7位要編碼到一個字節里,根據這個信息
//計算出需要多少個字節編碼,因為高位在右邊,我們就
//從右往左數,找到第一個為1的二進制位,從這位往
//左數就都是有效數據位了。
int firstOneIndex = 0;
for (firstOneIndex = array.Length - 1; firstOneIndex >= 0; firstOneIndex--)
{
if (array[firstOneIndex])
break;
}
int resultBytesLength = (int)Math.Ceiling((double)(firstOneIndex + 1) / 7);
byte[] resultBytes = new byte[resultBytesLength];
//3、設置多字節編碼中的每一位,
//每個字節的低7位表示數據位,最高位如果是1表示后面還有數據需要解碼
for (int i = 0; i <= firstOneIndex; i++)
{
//3.1、計算出本次要編碼的目標字節
int byteindex = i / 7;
byte b = resultBytes[byteindex];
//3.2、設置本次編碼字節的每一位
int checkInt = i % 7;
b |= array[i] ? (byte)(0x01 << checkInt) : (byte)0;
//3.3、設置最高位,如果后面還有字節編碼則為1,否則為0
if (byteindex < resultBytes.Length - 1)
b |= 0x80;
resultBytes[byteindex] = b;
}
return resultBytes;
}
static int GetIntFromBytes(byte[] input)
{
//1、初始化目標整數的字節數組,
//因為編碼數據里有冗余數據,所以要多出一個字節來
byte[] result = new byte[5];
for (int i = 0; i < input.Length; i++)
{
//2、先把多字節編碼的每個字節拷貝到目標整數字節中
//并完成移位,移位是因為多字節編碼每字節是7個有效
//數據位,而.NET的整數的二進制表示是每個字節8個有效
//數據位,這樣目標字節填充幾次后,高位就會空出多個
//位的無效數據
result[i] = input[i];
result[i] >>= i;
//3、從下一個字節的最低位拷貝N位到本次編碼的最高位
//因為本字節右移了i位,所以要從下一個字節拷貝i位數據
//到本字節
for (int j = 0; j <= i; j++)
{
//3.1、得到下一個字節要拷貝的位
int checkInt = 0x80 >> (7 - j);
if (i + 1 < input.Length)
{
//3.2、如果要拷貝的位是1,則設置本字節相應的位為1
if ((input[i + 1] & checkInt) == checkInt)
{
checkInt = 0x80 >> (i - j);
result[i] |= (byte)checkInt;
}
else //3.3、如果要拷貝的位是0,則設置本字節相應的位為1
{
checkInt = ~(0x80 >> (i - j));
result[i] &= (byte)checkInt;
}
}
}
}
if (_debug)
Console.WriteLine(ByteToString(result));
return BitConverter.ToInt32(result, 0);
}
單元測試均通過。
存儲測試
我們假設要編碼的數據都是幾萬,幾千的值,只有很少是大于ushort.max的,隨機生成一些整數,用變長字節編碼看看是否能起到壓縮的效果。

{
Random rnd = new Random();
const int MAX = 1000;
int sum = 0;
for (int i = 0; i < MAX; i++)
{
sum += GetBytesFromInt(rnd.Next(0, ushort.MaxValue)).Length;
}
Console.WriteLine(".net編碼{0}個int需要{1}字節", MAX, MAX * 4);
Console.WriteLine("Protocol Buffer編碼{0}個int需要{1}字節", MAX, sum);
Console.WriteLine("節省了{0:p}的存儲空間", 1 - (double)sum / (MAX * 4));
}
測試輸出:
Protocol Buffer編碼1000個int需要2787字節
節省了30.33%的存儲空間
當然這個測試數據并不具有代表性,如果要編碼的整數大多都是負數,或者很大的正數,那么這種編碼不僅不會比原始的int編碼小,反而會比.net對整形的定長編碼要大。
小節
計算機組成原理講的知識離我們并不遙遠,沒準什么時候有個需求來了,你就得用到這些知識。