【问题标题】:Kafka adding prefix to messageKafka为消息添加前缀
【发布时间】:2013-11-30 19:48:46
【问题描述】:

使用kafka 7.2,当使用生产者发送消息时,我发现一旦使用它,消息就会在消息的开头带有附加部分。

例如,当向 kafka 发送一个简单的字符串“King Daniel”时,它在字节数组中如下所示:

4B 69 6E 67 20 44 61 6E 69 65 6C

但是当我出于某种原因消耗它时,我得到:

00 00 00 00 00 11 01 00 C2 C4 1E 7C 4B 69 6E 67 20 44 61 6E 69 65 6C

字符串“........ֲִ.|King Daniel”是什么

所以我在消息的开头多了 12 个字符。 这是某种标题吗?如何获取我的原始消息?

这是我的消费者代码:

public void start() {
initConsumer();
LOG.info("Starting kafka consumer for topic " + topic);
try {
    long offset = 0;
    while (true) {
    // create a fetch request for partition 0, current offset, and
    // fetch size of 1MB
    FetchRequest fetchRequest = new FetchRequest(topic, 0, offset, 1000000);
    ByteBufferMessageSet messages = consumer.fetch(fetchRequest);

    for (MessageAndOffset msg : messages) {
        ByteBuffer payload = msg.message().payload();
        writer.writeToFile(payload.array());
        // advance the offset after consuming each message
        offset = msg.offset();
    }
    }
} catch (Exception e) {
    LOG.error("Error occured while consuming from kafka", e);
}
}

所以我将msg.message().payload().array() 写入一个文件,然后当我打开这个文件时,我可以看到原始内容,并在开头添加了 12 个额外字符。

我怎样才能得到我原来的确切消息?

【问题讨论】:

  • 你试过Utils.toString(msg.message.payload(), "UTF-8") 在他们的手册中给出的吗?
  • 使用 UTF-8 解码对于简单的字符串是可以的,但我的真实消息不适合它,无论如何这不是问题。谢谢

标签: apache-kafka


【解决方案1】:

问题在于ByteBuffer.array() 方法返回一个数组,该数组支持此缓冲区(请参阅http://docs.oracle.com/javase/7/docs/api/java/nio/ByteBuffer.html#array())。

ByteBuffer 可能只占用后备数组的一部分。此外,此方法不适用于只读 ByteBuffers 和直接 ByteBuffers:如果数组是只读的,则会抛出 ReadOnlyBufferException,如果 ByteBuffer 没有后备数组,则会抛出 UnsupportedOperationException

您可以使用以下代码 sn-p 将ByteBuffer 的内容读入数组:

ByteBuffer payload = msg.message().payload();
byte[] contents = new byte[payload.remaining()];
payload.get(contents);
writer.writeToFile(contents);

但是,可能值得扩展您的 writer 以直接从 ByteBuffer 写入数据并避免额外的复制。

【讨论】:

    猜你喜欢
    • 2011-05-26
    • 2018-11-01
    • 2016-02-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-06-22
    • 2012-04-03
    相关资源
    最近更新 更多