【问题标题】:Spring Boot Kafka with MANUAL_IMMEDIATE ack带有 MANUAL_IMMEDIATE ack 的 Spring Boot Kafka
【发布时间】:2020-07-10 18:04:31
【问题描述】:

我有一个 Springboot 应用程序。使用此配置:

  kafka:
    consumer:
      listener:
        ack-mode: MANUAL_IMMEDIATE

广告此消费者:

@KafkaListener(topics = "test", groupId = "group_id")
    public void consume(String message, Acknowledgment ack) throws IOException {
            ack.acknowledge();
    }

但收到消息时出现此错误:

org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [org.springframework.kafka.support.Acknowledgment] for GenericMessage 

【问题讨论】:

  • 删除consumer 元素。您的 IDE 应该可以帮助您进行属性验证。

标签: spring-boot apache-kafka kafka-consumer-api spring-kafka


【解决方案1】:

您的 YAML 似乎没有被读取;我刚刚测试过

更正

您的 YAML 无效 - 删除 consumer 元素。

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest
    listener:
      ack-mode: MANUAL_IMMEDIATE

而且效果很好;删除 ack-mode 会出现与您看到的相同的错误。

@SpringBootApplication
public class So60929385Application {

    public static void main(String[] args) {
        SpringApplication.run(So60929385Application.class, args);
    }

    @KafkaListener(id = "so60929385", topics = "so60929385")
    public void listen(String in, Acknowledgment ack) {
        System.out.println(in);
        ack.acknowledge();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> template.send("so60929385", "foo");
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so60929385").partitions(1).replicas(1).build();
    }
}

【讨论】:

  • 我在启动 Consumer 时出现此错误,无法配置为 ackMode MANUAL_IMMEDIATE 的自动提交
  • 您必须使用旧版本。设置 spring.kafka.consumer.enable-auto-commit: false - 自 2.3 版(即 Spring Boot 2.2 使用的版本)起默认为 false
猜你喜欢
  • 2019-01-14
  • 2020-07-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-10-05
  • 2017-10-28
  • 2018-07-23
  • 2020-07-21
相关资源
最近更新 更多