【问题标题】:Error handling in Spring Kafka with properties file?使用属性文件在 Spring Kafka 中处理错误?
【发布时间】:2021-01-07 04:25:15
【问题描述】:

在我的Kafka Listener 被击中之前,我遇到了一堆反序列化失败。我正在研究 Gary Russel 构建的东西,但在让它工作时遇到了问题。我所有的东西都是通过属性文件配置的。

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=io.confluent.kafka.serializers.KafkaAvroDeserializer

所以如果我添加这些,我的理解是它在消费者记录的标题中包含错误吗?我的最终目标是让任何反序列化异常命中我拥有的某个自定义类,以便我可以处理我想要用它做的事情。 IE,转发到我的死信处理程序,它将失败的数据上传到 s3。

我尝试将错误处理程序标志添加到 kafkalistener,但这也没有做任何事情。

更新的属性配置

我已经更新了我的配置,我仍然不清楚这是否正确。它不起作用,所以我认为不会。

没有调用任何自定义代码

spring.kafka.consumer.properties.value.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
spring.kafka.consumer.properties.key.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.function=com.thing.cyclic.service.FailedFooProvider

spring.kafka.consumer.properties.spring.deserializer.key.delegate.class=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicNameStrategy
spring.kafka.consumer.properties.specific.avro.reader=true
spring.kafka.consumer.properties.auto.register.schemas=false
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.listener.ack-mode=manual_immediate

BadFoo

public class BadFoo {

    private final FailedDeserializationInfo failedDeserializationInfo;

    public BadFoo(FailedDeserializationInfo failedDeserializationInfo) {
        this.failedDeserializationInfo = failedDeserializationInfo;
    }

    public FailedDeserializationInfo getFailedDeserializationInfo() {
        return this.failedDeserializationInfo;
    }
}

FailedFooProvider

public class FailedFooProvider implements Function<FailedDeserializationInfo, String> {
    @Override
    public String apply(FailedDeserializationInfo info) {
        System.out.println("");
        return "";
    }
}

【问题讨论】:

    标签: java spring apache-kafka spring-kafka


    【解决方案1】:

    请参阅 the documentation herehere

    还可以查看DeadLetterPublishingRecoverer 代码,该代码可用于将失败的记录发布到其他主题。您可以在此之后对您的代码进行建模以获得包含失败的byte[] 的标头。

    https://github.com/spring-projects/spring-kafka/blob/fa5c35e9b15c4cecfc6ea2bbbf9e7745bc5d9f75/spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java#L169-L178

    恢复器与SeekToCurrentErrorHandler结合使用。

    将错误处理程序配置为@Bean,Spring Boot 会自动将其连接到容器中。

    【讨论】:

    • 价值函数被弃用了吗?我已经查看了代码和文档,但我仍然不清楚我应该如何去做。我已经用我的配置更新了原始问题。这看起来正确吗?我仍然对如何指定我拥有的获取失败记录的类感到困惑
    • 目前在 2.2.8
    • &gt;Is value function deprecated? 我不知道那是什么意思;什么“价值函数”?声明 @Bean 类型的 SeekToCurrentErrorHandler - 例如public SeekToCurrentErrorHandler(BiConsumer&lt;ConsumerRecord&lt;?, ?&gt;, Exception&gt; recoverer) - 从BiConsumer 调用您的代码。例如(rec, ex) -&gt; {...}。 2.2 不再支持;您应该至少更新到 2.3。我指给你的恢复器中的代码直到 2.3 才引入;我不记得 2.2.x 中有多少可用。
    猜你喜欢
    • 2020-07-17
    • 2020-07-14
    • 2015-05-24
    • 1970-01-01
    • 2023-04-10
    • 1970-01-01
    • 2021-10-26
    • 2018-12-21
    • 2022-01-06
    相关资源
    最近更新 更多