【问题标题】:Kafka Java SimpleConsumer strange encodingKafka Java SimpleConsumer 奇怪的编码
【发布时间】:2016-10-02 14:12:54
【问题描述】:

我正在尝试使用 Kafka 9 中的 SimpleConsumer 来允许用户从时间偏移中重播事件 - 但我从 Kafka 收到的消息采用非常奇怪的编码:

7icf-test-testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7\�W>8������{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819330373,"context":{"userid":0,"username":"testUser"}}�!}�a�����{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819331637,"context":{"userid":1,"username":"testUser"}}���r�����{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819332754,"context":{"userid":2,"username":"testUser"}}��������{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819333868,"context":{"userid":3,"username":"testUser"}}�p=
                            ������{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819334997,"context":{"userid":4,"username"

使用 KafkaConsumer 可以很好地解析此消息。这是我使用 SimpleConsumer 检索消息的代码:

    for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) {
        long currentOffset = messageAndOffset.offset();
        if (currentOffset < readOffset) {
            log.debug("Found an old offset - skip");
            continue;
        }

        readOffset = messageAndOffset.nextOffset();

        int payloadOffset = 14 + messageAndOffset.message().keySize(); // remove first x bytes, schema Id
        byte[] data = messageAndOffset.message().payload().array();
        byte[] realData = Arrays.copyOfRange(data, payloadOffset, data.length - payloadOffset);
        log.debug("Read " + new String(realData, "UTF-8"));
}

在我不断收到有关字节太高的 UTF-32 错误后,我添加了代码以跳过前 x 个字节,我认为这是因为 Kafka 将消息大小等信息预先添加到了有效负载中。这是 Avro 神器吗?

【问题讨论】:

  • 看起来不像 Avro——至少不是二进制 Avro 编码。在二进制编码中,您不会在记录中获得架构信息。
  • 我的代码略有不同——我没有使用payload().array(),而是按照这里的方式进行操作:cwiki.apache.org/confluence/display/KAFKA/… 例如:payload().get(bytes) 其中bytes 的类型为@987654327 @。 get() 方法复制数据,而 array() 返回实际数组,在 ByteBuffer 的 Javadocs 中它说:“修改此缓冲区的内容将导致修改返回的数组的内容,反之亦然。”也许这样的事情正在发生?
  • @Gandalf 请您仅在记事本++ 中打开您的消息。如果您使用其他写字板或记事本打开它,那么它看起来会很危险。所以用记事本++打开它,让我们知道。

标签: java apache-kafka kafka-consumer-api


【解决方案1】:

我从来没有找到一个好的答案——但我转而使用SimpleConsumer 查询 Kafka 的偏移量我需要(每个分区......虽然实现很差)然后使用原生 KafkaConsumer 使用 seek(TopicPartition, offset)seekToBeginning(TopicPartition) 检索消息。希望他们会在下一个版本中为本地客户端添加从给定时间戳检索消息的功能。

【讨论】:

    【解决方案2】:

    你在找这个吗?

    readOffset = messageAndOffset.nextOffset();
    ByteBuffer payload = messageAndOffset.message().payload();
    
        if(payload == null) {
            System.err.println("Message is null : " + readOffset);
            continue;
        }
    
    final byte[] realData = new byte[payload.limit()];
    payload.get(realData);
    System.out.println("Read " + new String(realData, "UTF-8"));
    

    【讨论】:

      【解决方案3】:

      您可以使用消息的时间戳定期记录分区的偏移量(可能不是每次提交),然后您可以在未来采取一些措施来设置您的消费者偏移量。我想这是为了生产调试。

      我怀疑他们会添加这样的功能,考虑到 Kafka 的工作原理,这似乎是不可行的,尽管我可能弄错了,总会有天才的东西在发生。我会做日志记录的事情。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2023-04-01
        • 1970-01-01
        • 1970-01-01
        • 2016-12-01
        • 2011-08-17
        • 2011-12-04
        相关资源
        最近更新 更多