使用ProtoBuffer 自带压缩功能

使用ProtoBuffer 自带压缩功能

起因和本文想要说明的内容

消息中心数据存储时候,是使用key-value的形式来存储邮件,但是protobuf在默认的序列化/反序列化的时候是不进行压缩的。
如果想使用 ProtoBuffer的压缩功能,还需要多做几步,构造一个InputStream/OutputStream传入传出内容。ProtoBufffer可选的 zlib压缩 gzip 压缩,压缩效率极高。对于当前项目来说,压缩之后的数据大小为原来大小的: 15~40%(消息体越大,压缩率越大)(目前正常的邮件信息都超过1k),实用性很强。
网上关于这个部分内容较少,有不少关于这方面的提问,但是回答都语焉不详,官方并没有提供这方面的例子,对于初学者来说想要用好并不容易。
另外,protobuf的这一个InputStream/OutputStream的设计也有非常好的参考价值,大家在写自己的IO函数实现的时候也可以参考这个设计。
因此,写一个走读笔记,供参考。

目标:

  1. 如何使用Protobuffer的压缩功能;
  2. ProtoBuffer io 模块设计方法理解。

Protobuf序列化函数列表

要使用ProtoBuf的压缩功能,则主要用到的bool SerializeToZeroCopyStream(io::ZeroCopyOutputStream* output) const; 函数。
需要实现一个派生之后的ZeroCopyOutputStream函数体。

// Write a protocol buffer of this message to the given output.  Returns
// false on a write error.  If the message is missing required fields,
// this may GOOGLE_CHECK-fail.
bool SerializeToCodedStream(io::CodedOutputStream* output) const;
// Like SerializeToCodedStream(), but allows missing required fields.
bool SerializePartialToCodedStream(io::CodedOutputStream* output) const;
// Write the message to the given zero-copy output stream.  All required
// fields must be set.
bool SerializeToZeroCopyStream(io::ZeroCopyOutputStream* output) const;
// Like SerializeToZeroCopyStream(), but allows missing required fields.
bool SerializePartialToZeroCopyStream(io::ZeroCopyOutputStream* output) const;
// Serialize the message and store it in the given string.  All required
// fields must be set.
bool SerializeToString(string* output) const;
// Like SerializeToString(), but allows missing required fields.
bool SerializePartialToString(string* output) const;
// Serialize the message and store it in the given byte array.  All required
// fields must be set.
bool SerializeToArray(void* data, int size) const;
// Like SerializeToArray(), but allows missing required fields.
bool SerializePartialToArray(void* data, int size) const;

// Make a string encoding the message. Is equivalent to calling
// SerializeToString() on a string and using that.  Returns the empty
// string if SerializeToString() would have returned an error.
// Note: If you intend to generate many such strings, you may
// reduce heap fragmentation by instead re-using the same string
// object with calls to SerializeToString().
string SerializeAsString() const;
// Like SerializeAsString(), but allows missing required fields.
string SerializePartialAsString() const;

// Like SerializeToString(), but appends to the data to the string's existing
// contents.  All required fields must be set.
bool AppendToString(string* output) const;
// Like AppendToString(), but allows missing required fields.
bool AppendPartialToString(string* output) const;

如何使用ProtoBuffer压缩功能

  1. 构造一个ZeroCopyOutputStream的派生函数,负责提供压缩解压的内存空间;
  2. 正确调用GzipInputStream,获取结果。

定义ZeroCopyOutputStream派生函数

class CPBufferInputStream: public google::protobuf::io::ZeroCopyInputStream
{
    public:
        CPBufferInputStream() {}
        virtual ~CPBufferInputStream();
        virtual bool Next(const void** data, int* size)
        {
            *data=m_szBuffer+m_idx;
            *size=m_size-m_idx;
            m_idx+=m_size;
            return true;
        }
        virtual void BackUp(int count)
        {
            m_idx-=count;
        }
        virtual bool Skip(int count)
        {
            if(m_idx+count==m_size)
            {
                m_idx+=count;
                return false;
            }
            else if(m_idx+count>m_size)
            {
                return false;
            }

            m_idx+=count;
            return true;

        }
        virtual int64 ByteCount()
        {
            return m_idx;
        }

    private:
        ::google::protobuf::int64 m_idx;
        ::google::protobuf::int64 m_size;
        char m_szBuffer[PROTOBUFFER_MAX_BUFFER_SIZE];
};

class CPBufferOutputStream: public google::protobuf::io::ZeroCopyOutputStream
{
    public:
        CPBufferOutputStream(){}
        ~CPBufferOutputStream(){}

        void Reset()
        {
            m_idx=0;
        }

        std::string ToString()
        {
            return std::string(m_szBuffer,m_idx);
        }

        virtual bool Next(void** data, int* size)
        {
            *data=m_szBuffer+m_idx;
            *size=sizeof(m_szBuffer)-m_idx;
            m_idx+=*size;
            return true;
        }
        virtual void BackUp(int count)
        {
            m_idx-=count;
        }

        virtual ::google::protobuf::int64 ByteCount() const
        {
            return m_idx;
        }

    private:
        ::google::protobuf::int64 m_idx;
        char m_szBuffer[PROTOBUFFER_MAX_BUFFER_SIZE];
};

使用并获取结果

CPBufferInputStream          m_pbinputstream;
CPBufferOutputStream         m_pboutputstream;

//将m_msg PB结构序列化、压缩并将结果传回 str_output 
bool Serialize(std::string* str_output) const
{
    CPBufferOutputStream   stream_output;
    ::google::protobuf::io::GzipOutputStream m_zip(stream_output);

    if(!m_msg.SerializeToZeroCopyStream(&m_zip))
    {
        return false;
    }

    if(!m_zip.Close())
    {
        return false;
    }

    *str_output=stream_output->ToString();
    return true;
}

//将 str_serial 反序列化成PB结构
bool Parse(const std::string& str_serial)
{
    CPBufferInputStream stream_input;
    stream_input->Reset((char*)str_serial.c_str(),(unsigned)str_serial.length());
    ::google::protobuf::io::GzipInputStream m_zip(&stream_input);
    if(!m_msg.ParseFromZeroCopyStream(&m_zip))
    {
        return false;
    }

    return true;
}

ProtoBuffer io 模块设计方法

  1. 在接口设计上,序列化函数需要用到的缓冲区可以是内部缓冲区和外部缓冲区,并没有做严格的限定。
  2. 当需要使用外部缓冲区的时候,PB定义了一个 ZeroCopyOutputStream 这样的一个输入流(这个和STL的IOStream有区别,请注意),指定了只需要Stream提供3个接口:Next(获取buffer地址),BackUp(返还buufer),ByteCount(查看一共使用了多少大小的buffer),接口小而完备。
  3. 压缩之后的序列化,主要是通过实现 ZeroCopyOutputStream 来进行,PB中已经实现了一个通过 GzipInputStream/GzipOutputStream 来进行实现,但是,GZipxxStream 这个接口仍然需要缓冲区,所以此时GzipxxxStream是作为一个适配器(或说是代理)来进行工作,使用时候需要提供一个 ZeroCopyOutputStream 的派生类来传入缓冲区。
  4. 简而言之,PB 在第一次序列化之后(未压缩数据),缓冲区放在 GzipxxxStream 的实现中(可以通过 GZipxxStream::Option()自己指定不指定则会new)。 而序列化完成之后,压缩时候使用的缓冲区,则有 ZeroCopyOutputStream 传入缓冲区来提供。