【问题标题】:Spring kafka does not support Large message consumersSpring kafka 不支持大消息消费者
【发布时间】:2017-12-07 06:41:33
【问题描述】:

我正在使用spring Kafka来消费LinkedIn large message supported Kafka client产生的消息

鉴于此 Kafka 客户端始终将 AUTO_OFFSET_RESET_CONFIG 覆盖为 none,如其构造函数所示。

private LiKafkaConsumerImpl(LiKafkaConsumerConfig configs,
    Deserializer<K> keyDeserializer,
    Deserializer<V> valueDeserializer,
    Deserializer<LargeMessageSegment> largeMessageSegmentDeserializer,
    Auditor<K, V> consumerAuditor) {
        _kafkaConsumer = new KafkaConsumer<>(configs.configForVanillaConsumer(),
        byteArrayDeserializer,
        byteArrayDeserializer);
    }
Map<String, Object> configForVanillaConsumer() {
    Map<String, Object> newConfigs = new HashMap<>();
    newConfigs.putAll(this.originals());
    newConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    newConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
    return newConfigs;
}

所以一旦我开始使用批量提交并将ENABLE_AUTO_COMMIT_CONFIG 设置为false,它就会引发以下错误:

[org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] 错误 OakcciConsumerCoordinator - 用户为组文档事件消费者提供的侦听器 com.linkedin.kafka.clients.consumer.LiKafkaConsumerRebalanceListener 在分区分配组织上失败.apache.kafka.clients.consumer.NoOffsetForPartitionException:未定义的偏移量,没有分区的重置策略:DocumentEvents-2 在 org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:369) 在 org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:247) 在 org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1602) 在 org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1265) 在 com.linkedin.kafka.clients.consumer.LiKafkaConsumerImpl.position(LiKafkaConsumerImpl.java:403) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$1.onPartitionsAssigned(KafkaMessageListenerContainer.java:447) 在 com.linkedin.kafka.clients.consumer.LiKafkaConsumerRebalanceListener.onPartitionsAssigned(LiKafkaConsumerRebalanceListener.java:62) 在 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) 在 org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) 在 org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) 在 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) 在 org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) 在 org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) 在 com.linkedin.kafka.clients.consumer.LiKafkaConsumerImpl.poll(LiKafkaConsumerImpl.java:231) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:558) 在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 在 java.util.concurrent.FutureTask.run(FutureTask.java:266) 在 java.lang.Thread.run(Thread.java:745)

出现这个问题是因为这个消费者组是第一次消费来自这个主题的消息,所以它会尝试使用偏移重置策略。

虽然我将其设置为“最早”,但它被底层的 LinkedIn kafka 客户端覆盖为“无”

在这种情况下,我也尝试覆盖 ConsumerRebalanceListener 以手动寻找开头,但实际上并没有达到这一点。

我该如何解决这个问题?

【问题讨论】:

    标签: apache-kafka spring-kafka


    【解决方案1】:

    有趣;请在GitHub 中打开一个问题。

    如果策略是none,我们应该捕获该异常。

    与此同时,您可以通过使用常规客户端一次来解决它,为组实际设置初始偏移量(您实际上不必接收任何消息,只需获取分区分配并设置组的初始位置)。

    【讨论】:

    • 感谢 Gary,我打开了一个 Github issue 我使用消费者位置和搜索方法来初始化消费者偏移量,这是它第一次从主题分区消费。
    猜你喜欢
    • 2019-04-30
    • 1970-01-01
    • 2016-11-11
    • 2020-11-16
    • 1970-01-01
    • 2020-08-11
    • 2021-06-06
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多