【问题标题】:Error while staring spring boot Kafka project启动 Spring Boot Kafka 项目时出错
【发布时间】:2019-08-25 03:22:41
【问题描述】:

启动spring boot kafka项目时出错。

弹簧靴:2.1.2.RELEASE 春季卡夫卡版本:2.2.5.RELEASE

消费者不能为 ackMode MANUAL_IMMEDIATE 配置自动提交

消费者配置

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<String, Object>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return props;
}

Kafka Listener 容器工厂

@Bean("kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(RetryTemplate retryTemplate) {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    factory.setRetryTemplate(retryTemplate);
    factory.setRecoveryCallback(context -> {
        log.error("Maximum retry policy has been reached {}", context.getAttribute("record"));
        Acknowledgment ack = (Acknowledgment) context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT);
        ack.acknowledge();
        return null;
    });
    factory.setConcurrency(Integer.parseInt(kafkaConcurrency));
    return factory;
}

【问题讨论】:

  • Exception Caused by: java.lang.IllegalStateException: Consumer cannot be configured for auto commit for ackMode MANUAL_IMMEDIATE

标签: spring-boot kafka-consumer-api spring-kafka


【解决方案1】:

对于MANUAL_IMMEDIATE ack 模式(基本上对于任何手动模式),必须关闭ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 消费者属性。

这就是那个异常的原因。

我猜你不能在你的application.properties 中使用spring.kafka.consumer.enableAutoCommit

【讨论】:

  • 仍然出现同样的错误。谢谢我更新了我的消费者配置。非常感谢
  • 是时候分享一个简单的项目让我们重现和玩耍了吗?
  • 是的,请分享我想知道重试配置和确认是如何处理的..
  • 不,您分享了关于autoCommit 的问题。重试和确认是完全不同的故事,值得拥有自己的 SO 线程。感谢您的理解!
  • 这部分已经完成..我还有一个疑问,我正在阅读一本书,当您成功使用消息时,您会执行由 Kafka 自己处理的 asyncCommit() 来编写。如果出现异常,我们需要执行 syncCommit() ..但是当我在这种情况下使用 spring 重试时,当我执行确认时,它会执行 asyncCommit() ..
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-06-15
  • 2019-05-29
  • 2022-01-02
  • 2019-10-04
  • 1970-01-01
相关资源
最近更新 更多