由于错误处理程序支持退避和重试,因此在侦听器级别使用重试模板大多是多余的。
解决您的特定问题的一种方法是启用deliveryAttemptHeaders。
https://docs.spring.io/spring-kafka/docs/current/reference/html/#delivery-header
然后,在您的侦听器错误处理程序中,检查标头,当达到特定尝试次数时,将消息发布到死信主题并返回错误结果。在达到计数之前重新抛出异常,以便SeekToCurrentErrorHandler 重新传递记录。
只需确保 STCEH 有足够的重试次数,使其始终重试,从而使侦听器错误处理程序能够完成其工作。
编辑
这是一个示例,展示了如何通过在标头中添加原始 ConsumerRecord 来使用侦听器错误处理程序中的 DLPR...
@SpringBootApplication
public class So66982480Application {
public static void main(String[] args) {
SpringApplication.run(So66982480Application.class, args);
}
@Bean
ReplyingKafkaTemplate<String, String, String> rkt(ProducerFactory<String, String> pf,
ConcurrentKafkaListenerContainerFactory<String, String> factory,
KafkaTemplate<String, String> template) {
factory.getContainerProperties().setDeliveryAttemptHeader(true);
factory.setReplyTemplate(template);
ConcurrentMessageListenerContainer<String, String> container = factory.createContainer("so66982480-replies");
container.getContainerProperties().setGroupId("so66982480-replies");
return new ReplyingKafkaTemplate<>(pf, container);
}
@Bean
RecordMessageConverter converter() {
MessagingMessageConverter converter = new MessagingMessageConverter() {
@Override
public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
Consumer<?, ?> consumer, Type type) {
Message<?> message = super.toMessage(record, acknowledgment, consumer, type);
return MessageBuilder.fromMessage(message)
.setHeader(KafkaHeaders.RAW_DATA, record)
.build();
}
};
return converter;
}
@Bean
KafkaTemplate<String, String> template(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf);
}
@Bean
NewTopic topic1() {
return TopicBuilder.name("so66982480").partitions(1).replicas(1).build();
}
@Bean
NewTopic topic2() {
return TopicBuilder.name("so66982480-replies").partitions(1).replicas(1).build();
}
@Bean
NewTopic topic3() {
return TopicBuilder.name("so66982480.DLT").partitions(1).replicas(1).build();
}
@Bean
KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
return (msg, ex) -> {
if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
return "FAILED";
}
throw ex;
};
}
@Bean
DeadLetterPublishingRecoverer recoverer(KafkaOperations<String, String> template) {
return new DeadLetterPublishingRecoverer(template);
}
@KafkaListener(id = "so66982480", topics = "so66982480", errorHandler = "eh")
@SendTo
public String listen(String in) {
throw new RuntimeException("test");
}
@KafkaListener(id = "so66982480.DLT", topics = "so66982480.DLT")
public void dlt(String in) {
System.out.println("From DLT:" + in);
}
@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
return args -> {
RequestReplyFuture<String, String, String> future =
template.sendAndReceive(new ProducerRecord<String, String>("so66982480", 0, null, "test"),
Duration.ofSeconds(30));
System.out.println(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata());
System.out.println(future.get(30, TimeUnit.SECONDS).value());
};
}
}
spring.kafka.consumer.auto-offset-reset=earliest
From DLT:test
FAILED