【发布时间】:2021-01-07 04:25:15
【问题描述】:
在我的Kafka Listener 被击中之前,我遇到了一堆反序列化失败。我正在研究 Gary Russel 构建的东西,但在让它工作时遇到了问题。我所有的东西都是通过属性文件配置的。
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=io.confluent.kafka.serializers.KafkaAvroDeserializer
所以如果我添加这些,我的理解是它在消费者记录的标题中包含错误吗?我的最终目标是让任何反序列化异常命中我拥有的某个自定义类,以便我可以处理我想要用它做的事情。 IE,转发到我的死信处理程序,它将失败的数据上传到 s3。
我尝试将错误处理程序标志添加到 kafkalistener,但这也没有做任何事情。
更新的属性配置
我已经更新了我的配置,我仍然不清楚这是否正确。它不起作用,所以我认为不会。
没有调用任何自定义代码
spring.kafka.consumer.properties.value.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
spring.kafka.consumer.properties.key.deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.function=com.thing.cyclic.service.FailedFooProvider
spring.kafka.consumer.properties.spring.deserializer.key.delegate.class=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicNameStrategy
spring.kafka.consumer.properties.specific.avro.reader=true
spring.kafka.consumer.properties.auto.register.schemas=false
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.listener.ack-mode=manual_immediate
BadFoo
public class BadFoo {
private final FailedDeserializationInfo failedDeserializationInfo;
public BadFoo(FailedDeserializationInfo failedDeserializationInfo) {
this.failedDeserializationInfo = failedDeserializationInfo;
}
public FailedDeserializationInfo getFailedDeserializationInfo() {
return this.failedDeserializationInfo;
}
}
FailedFooProvider
public class FailedFooProvider implements Function<FailedDeserializationInfo, String> {
@Override
public String apply(FailedDeserializationInfo info) {
System.out.println("");
return "";
}
}
【问题讨论】:
标签: java spring apache-kafka spring-kafka