【发布时间】:2017-12-07 06:41:33
【问题描述】:
我正在使用spring Kafka来消费LinkedIn large message supported Kafka client产生的消息
鉴于此 Kafka 客户端始终将 AUTO_OFFSET_RESET_CONFIG 覆盖为 none,如其构造函数所示。
private LiKafkaConsumerImpl(LiKafkaConsumerConfig configs,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
Deserializer<LargeMessageSegment> largeMessageSegmentDeserializer,
Auditor<K, V> consumerAuditor) {
_kafkaConsumer = new KafkaConsumer<>(configs.configForVanillaConsumer(),
byteArrayDeserializer,
byteArrayDeserializer);
}
Map<String, Object> configForVanillaConsumer() {
Map<String, Object> newConfigs = new HashMap<>();
newConfigs.putAll(this.originals());
newConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
newConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
return newConfigs;
}
所以一旦我开始使用批量提交并将ENABLE_AUTO_COMMIT_CONFIG 设置为false,它就会引发以下错误:
[org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] 错误 OakcciConsumerCoordinator - 用户为组文档事件消费者提供的侦听器 com.linkedin.kafka.clients.consumer.LiKafkaConsumerRebalanceListener 在分区分配组织上失败.apache.kafka.clients.consumer.NoOffsetForPartitionException:未定义的偏移量,没有分区的重置策略:DocumentEvents-2 在 org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:369) 在 org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:247) 在 org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1602) 在 org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1265) 在 com.linkedin.kafka.clients.consumer.LiKafkaConsumerImpl.position(LiKafkaConsumerImpl.java:403) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$1.onPartitionsAssigned(KafkaMessageListenerContainer.java:447) 在 com.linkedin.kafka.clients.consumer.LiKafkaConsumerRebalanceListener.onPartitionsAssigned(LiKafkaConsumerRebalanceListener.java:62) 在 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) 在 org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) 在 org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) 在 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) 在 org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) 在 org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) 在 com.linkedin.kafka.clients.consumer.LiKafkaConsumerImpl.poll(LiKafkaConsumerImpl.java:231) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:558) 在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 在 java.util.concurrent.FutureTask.run(FutureTask.java:266) 在 java.lang.Thread.run(Thread.java:745)
出现这个问题是因为这个消费者组是第一次消费来自这个主题的消息,所以它会尝试使用偏移重置策略。
虽然我将其设置为“最早”,但它被底层的 LinkedIn kafka 客户端覆盖为“无”
在这种情况下,我也尝试覆盖 ConsumerRebalanceListener 以手动寻找开头,但实际上并没有达到这一点。
我该如何解决这个问题?
【问题讨论】: