【问题标题】:SeekToCurrentErrorHandler: DeadLetterPublishingRecoverer is not handling deserialize errorsSeekToCurrentErrorHandler:DeadLetterPublishingRecoverer 未处理反序列化错误
【发布时间】:2019-06-24 01:13:34
【问题描述】:

我正在尝试使用 spring-kafka 版本 2.3.0.M2 库编写 kafka 消费者。 为了处理运行时错误,我使用 SeekToCurrentErrorHandler.class 和 DeadLetterPublishingRecoverer 作为我的恢复器。这仅在我的消费者代码引发异常时才有效,但在无法反序列化消息时失败。

我尝试自己实现 ErrorHandler 并且我成功了,但是通过这种方法,我自己最终编写了 DLT 代码来处理我不想做的错误消息。

以下是我的 kafka 属性

spring:
   kafka:
     consumer:
        bootstrap-servers: localhost:9092
        group-id: group_id
        auto-offset-reset: latest
        key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
        value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
        properties:
          spring.json.trusted.packages: com.mypackage
          spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
          spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory,
        KafkaTemplate<Object, Object> template) {
      ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
      configurer.configure(factory, kafkaConsumerFactory);
    factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), maxFailures));}

【问题讨论】:

  • 请提供比“它失败”更多的信息。失败怎么办?
  • 我的意思是当我以错误的格式发送消息并且程序突然停止时,它会引发反序列化异常。异常是预期的,但我希望它被 ErrorHandler 捕获并将消息按原样放在 DLT 主题中
  • 框架中没有任何东西会“突然停止程序”。我会试着找点时间来测试一下。
  • 请忽略“突然”这个词,它只是抛出异常但不会停止应用程序

标签: spring-kafka


【解决方案1】:

它对我来说很好用(注意 Boot 会自动配置错误处理程序)...

@SpringBootApplication
public class So56728833Application {

    public static void main(String[] args) {
        SpringApplication.run(So56728833Application.class, args);
    }

    @Bean
    public SeekToCurrentErrorHandler errorHandler(KafkaTemplate<String, String> template) {
        SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 3);
        eh.setClassifier( // retry for all except deserialization exceptions
                new BinaryExceptionClassifier(Collections.singletonList(DeserializationException.class), false));
        return eh;
    }

    @KafkaListener(id = "so56728833"
            + "", topics = "so56728833")
    public void listen(Foo in) {
        System.out.println(in);
        if (in.getBar().equals("baz")) {
            throw new IllegalStateException("Test retries");
        }
    }

    @KafkaListener(id = "so56728833dlt", topics = "so56728833.DLT")
    public void listenDLT(Object in) {
        System.out.println("Received from DLT: " + (in instanceof byte[] ? new String((byte[]) in) : in));
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so56728833").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic dlt() {
        return TopicBuilder.name("so56728833.DLT").partitions(1).replicas(1).build();
    }

    public static class Foo {

        private String bar;

        public Foo() {
            super();
        }

        public Foo(String bar) {
            this.bar = bar;
        }

        public String getBar() {
            return this.bar;
        }

        public void setBar(String bar) {
            this.bar = bar;
        }

        @Override
        public String toString() {
            return "Foo [bar=" + this.bar + "]";
        }

    }

}
spring:
  kafka:
    consumer:
      auto-offset-reset: earliest
      enable-auto-commit: false
      key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
      properties:
        spring.json.trusted.packages: com.example
        spring.deserializer.key.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
        spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
        spring.json.value.default.type: com.example.So56728833Application$Foo
    producer:
      key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

logging:
  level:
    org.springframework.kafka: trace

我在该主题中有 3 条记录:

"badJSON"
"{\"bar\":\"baz\"}"
"{\"bar\":\"qux\"}"

我看到第一个直接进入 DLT,第二个在 3 次尝试后进入。

【讨论】:

  • 谢谢.. 我现在注意到消息被推送到 DLT,即使我的控制台中有异常日志。 org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'secod': was expecting ('true', 'false' or 'null') at [Source: (String)"second time"; line: 1, column: 7]
  • 是的,使用 DEBUG 日志记录,您将看到 Skipping seek of: ConsumerRecord(topic = so56728833, partition = 0, leaderEpoch = 0, offset = 3 ... 和 TRACE .Seeking: so56728833-0 to: 4。我们认为以 ERROR 级别记录仍然很重要,但如果您想禁止这些日志,请随时在 GuitHib 中打开新的活动请求。
  • 嗨,它非常适合我。在这种情况下,我们是否也可以在 @KafkaListener 中进行手动即时确认。
  • 当然可以(但不要在 cmets 中就旧答案提出新问题)。
  • 好的,让我发布一个新问题.. 谢谢
猜你喜欢
  • 1970-01-01
  • 2012-02-23
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-08-15
  • 1970-01-01
  • 1970-01-01
  • 2022-01-21
相关资源
最近更新 更多