【发布时间】:2016-03-01 00:36:39
【问题描述】:
我有 kafka 集群接收消息。 message 是 zip 文件的字节数组。 zip 文件包含二进制 protobuf 数据文件作为条目。我正在阅读 zip 文件并尝试反序列化 protobuf 条目,这就是我的代码遇到 "protocol message has invalid UTF-8,invalid tag" 异常的地方。
我能够在将二进制 protobuf 文件作为压缩字节数组发送给代理之前对其进行反序列化。
但是当我压缩这些二进制 protobuf 文件,生成消息到 kafka,使用它然后尝试反序列化 zip 流中的条目时,我遇到了问题。
我不确定哪个是这里的罪魁祸首。
由于这些二进制协议缓冲区是 GZipped,所以再次压缩它们会搞砸吗?
谁能解释一下。
谢谢
**************编辑**************
Producer Side:
public byte[] getZipfileBytes() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ZipOutputStream zipOut = new ZipOutputStream(baos);
CheckedOutputStream checkSum = new CheckedOutputStream(zipOut, new Adler32());
try {
ZipEntry zipEntry = new ZipEntry(testFile.getName());
byte[] protoBytes = IOUtils.toByteArray(new FileInputStream(testFile));
System.out.println("bytes length:\t"+protoBytes.length);
zipEntry.setSize(protoBytes.length);
zipOut.putNextEntry(zipEntry);
zipOut.write(protoBytes);
zipOut.close();
System.out.println("checksum:"+checkSum.getChecksum().getValue());
zipBytes = baos.toByteArray();
} catch (IOException e) {
e.printStackTrace();
}
return zipBytes;
}
Consumer Side:
processConsumerRecord(ConsumerRecord<String, byte[]> record) {
String key = record.key();
byte[] dataPacket = record.value();
ZipInputStream zipIn = new ZipInputStream(new ByteArrayInputStream(dataPacket));
CheckedInputStream checkSum = new CheckedInputStream(zipIn,
new Adler32());
ZipEntry zipEntry;
try {
zipEntry = zipIn.getNextEntry();
while (zipEntry != null) {
String name = zipEntry.getName();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
IOUtils.copy(zipIn, baos);
byte[] protoBytes = baos.toByteArray();
二进制 protobuf 字节被 gzip 压缩,所以我需要 gunzip
如果我执行 gunzip,它抛出的不是 gzip 格式。
如果我跳过 gunzip 并执行 parseFrom,我会得到无效标签异常。
GZIPInputStream gzip = new GZIPInputStream(
new ByteArrayInputStream(baos.toByteArray()));
MyProtoObject mpo = null;
try {
mpo = MyProtoObject.parseFrom(protoBytes);
} catch (InvalidProtocolBufferException e1) {
e1.printStackTrace();
}
} catch (IOException e1) {
e1.printStackTrace();
}
checkSum.getChecksum().getValue() 在生成和使用 zip 字节数组时返回 1
以下是调试时 zipEntry 变量的值:
producer
zipEntry ZipEntry (id=44)
comment null
crc 2147247736
csize 86794
extra null
flag 2056
method 8
name "test.dat" (id=49)
size 92931
time 1214084891
consumer
zipEntry ZipEntry (id=34)
comment null
crc 2147247736
csize 86794
extra null
flag 0
method 8
name "test.dat" (id=39)
size 92931
time 1214084891
我什至测试了另一种方式,而不是处理内存中的protobytes,我将zip文件写入磁盘,通过winzip手动解压,然后反序列化提取的二进制proto文件,它工作!!!
我是否以错误的方式进行压缩/解压缩, 告诉我
【问题讨论】:
标签: java zip protocol-buffers apache-kafka