【问题标题】:Spring-kafka error handling with DeadLetterPublishingRecoverer使用 DeadLetterPublishingRecoverer 处理 Spring-kafka 错误
【发布时间】:2020-07-17 20:27:23
【问题描述】:

我正在尝试在 Spring Boot kafa 中实现错误处理。在我的 Kafka 侦听器中,我抛出了一个运行时异常,如下所示:

@KafkaListener(topics= "Kafka-springboot-example", groupId="group-employee-json")
    public void consumeEmployeeJson(Employee employee) {
        logger.info("Consumed Employee JSON: "+ employee);

        if(null==employee.getEmployeeId()) {
            throw new RuntimeException("failed");
            //throw new ListenerExecutionFailedException("failed");
        }
    }

并且我已经按照以下配置了错误处理:

@Configuration
@EnableKafka
public class KafkaConfiguration {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory,
            KafkaTemplate<Object, Object> template){

        ConcurrentKafkaListenerContainerFactory<Object, Object> factory= new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        factory.setErrorHandler(new SeekToCurrentErrorHandler(
                new DeadLetterPublishingRecoverer(template)));

        return factory;

    }
}

我的 DLT 监听器如下:

@KafkaListener(topics= "Kafka-springboot-example.DLT", groupId="group-employee-json")
    public void consumeEmployeeErrorJson(Employee employee) {
        logger.info("Consumed Employee JSON frpm DLT topic: "+ employee);
    }

但我的消息没有发布到 DLT 主题。

知道我做错了什么吗?

已编辑:

application.properties

server.port=8088

#kafka-producer-config
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer


#Kafka consumer properties
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=group-employee-json
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*

【问题讨论】:

  • 显示您的application.properties/yml。你有...consumer.auto-offset-reset: earliest吗?打开org.springframework.kafka 的 DEBUG 日志记录,看看它是否提供任何线索。编辑问题;不要在评论中添加代码/配置。
  • application.properties
  • 我指的是内容;但请看我的回答。

标签: spring spring-boot error-handling apache-kafka spring-kafka


【解决方案1】:

public ConcurrentKafkaListenerContainerFactory&lt;Object, Object&gt; containerFactory(

如果容器工厂使用非标准的bean名称,则需要在containerFactory属性中的@KafkaListener上进行设置。

默认的 bean 名称是 kafkaListenerContainerFactory,它由 Boot 自动配置。您需要覆盖该 bean 或将侦听器配置为指向您的非标准 bean 名称。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2015-05-24
    • 1970-01-01
    • 2023-04-10
    • 2018-12-21
    • 1970-01-01
    • 2021-10-26
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多