【问题标题】:Unable to deserialize zipped protocol buffers无法反序列化压缩的协议缓冲区
【发布时间】:2016-03-01 00:36:39
【问题描述】:

我有 kafka 集群接收消息。 message 是 zip 文件的字节数组。 zip 文件包含二进制 protobuf 数据文件作为条目。我正在阅读 zip 文件并尝试反序列化 protobuf 条目,这就是我的代码遇到 "protocol message has invalid UTF-8,invalid tag" 异常的地方。

我能够在将二进制 protobuf 文件作为压缩字节数组发送给代理之前对其进行反序列化。

但是当我压缩这些二进制 protobuf 文件,生成消息到 kafka,使用它然后尝试反序列化 zip 流中的条目时,我遇到了问题。

我不确定哪个是这里的罪魁祸首。

由于这些二进制协议缓冲区是 GZipped,所以再次压缩它们会搞砸吗?

谁能解释一下。

谢谢

**************编辑**************

Producer Side:

public byte[] getZipfileBytes() {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        ZipOutputStream zipOut = new ZipOutputStream(baos);
        CheckedOutputStream checkSum = new CheckedOutputStream(zipOut, new Adler32());

        try {
            ZipEntry zipEntry = new ZipEntry(testFile.getName());
            byte[] protoBytes = IOUtils.toByteArray(new FileInputStream(testFile));
            System.out.println("bytes length:\t"+protoBytes.length);
            zipEntry.setSize(protoBytes.length);
            zipOut.putNextEntry(zipEntry);
            zipOut.write(protoBytes);
            zipOut.close();
            System.out.println("checksum:"+checkSum.getChecksum().getValue());
            zipBytes = baos.toByteArray();
        } catch (IOException e) {
            e.printStackTrace();
        }

        return zipBytes;
    }



    Consumer Side:
         processConsumerRecord(ConsumerRecord<String, byte[]> record) {
                String key = record.key();
                byte[] dataPacket = record.value();

                ZipInputStream zipIn = new ZipInputStream(new ByteArrayInputStream(dataPacket));

                CheckedInputStream checkSum = new CheckedInputStream(zipIn,
                        new Adler32());
                ZipEntry zipEntry;
                try {
                    zipEntry = zipIn.getNextEntry();
                    while (zipEntry != null) {
                        String name = zipEntry.getName();
                        ByteArrayOutputStream baos = new ByteArrayOutputStream();
                            try {
                                IOUtils.copy(zipIn, baos);
                                byte[] protoBytes = baos.toByteArray();

二进制 protobuf 字节被 gzip 压缩,所以我需要 gunzip

如果我执行 gunzip,它抛出的不是 gzip 格式。

如果我跳过 gunzip 并执行 parseFrom,我会得到无效标签异常。

   GZIPInputStream gzip = new GZIPInputStream(
                        new ByteArrayInputStream(baos.toByteArray()));
                        MyProtoObject mpo = null;
                        try {
                            mpo = MyProtoObject.parseFrom(protoBytes);
                        } catch (InvalidProtocolBufferException e1) {
                            e1.printStackTrace();
                        }
                    } catch (IOException e1) {
                        e1.printStackTrace();
                    }

checkSum.getChecksum().getValue() 在生成和使用 zip 字节数组时返回 1
以下是调试时 zipEntry 变量的值:

    producer 
        zipEntry    ZipEntry  (id=44)   
        comment null    
        crc 2147247736  
        csize   86794   
        extra   null    
        flag    2056    
        method  8   
        name    "test.dat" (id=49)  
        size    92931   
        time    1214084891  


    consumer
        zipEntry    ZipEntry  (id=34)   
        comment null    
        crc 2147247736  
        csize   86794   
        extra   null    
        flag    0   
        method  8   
        name    "test.dat" (id=39)  
        size    92931   
        time    1214084891  

我什至测试了另一种方式,而不是处理内存中的protobytes,我将zip文件写入磁盘,通过winzip手动解压,然后反序列化提取的二进制proto文件,它工作!!!

我是否以错误的方式进行压缩/解压缩, 告诉我

【问题讨论】:

    标签: java zip protocol-buffers apache-kafka


    【解决方案1】:

    这里有两个不同的东西在起作用:压缩/解压缩,以及处理 protobuf。听起来这里的问题是第一个问题,听起来它正在破坏 protobuf 数据。所以,现在:忘记 protobuf,只关注 zip/unzip。记录原始消息是什么(在压缩之前 - 可能是二进制文件或 base-64 块)。现在在接收端,在解压缩后跟踪您得到的二进制(同样,二进制文件或base-64 块)。如果它们不是绝对 100% 相同,则 所有其他赌注都关闭。在您成功复制原始原始二进制文件之前,protobuf 没有机会。

    如果这是问题所在:最好显示您的邮政编码/解压缩代码,以便我们查看。

    如果您正在正确地压缩/解压二进制文件,那么问题将出在您的 protobuf 代码中。

    如果这是问题所在:最好显示您的序列化/反序列化代码,以便我们查看。

    【讨论】:

    • 小/大端转换(或缺少它)可能会导致问题吗?
    • @Emil 您选择的 protobuf 库应该在内部处理字节序; protobuf 规范明确说明了这些细节,如果任何已建立的库出错了,我会感到惊讶。您的 zip/unzip 代码不应该关心字节顺序 - 它只需要在进程的开始和结束时以相同的顺序获取相同的字节。
    • 我完全同意。正如您所说,我认为忽略 protobuf 步骤并确保 zip/unzip 步骤按预期工作是个好主意。只是说字节序在传输数据时会导致问题
    • @MarcGravell,添加了编码的 sn-ps 和更多信息。我在发送之前和接收之后验证了二进制原型字节,它们是相同的。假设我以正确的方式做到了。请告诉我你的发现
    猜你喜欢
    • 1970-01-01
    • 2017-05-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-23
    • 1970-01-01
    • 2011-08-26
    • 1970-01-01
    相关资源
    最近更新 更多