【问题标题】:Request/Reply and Retry Policy for Kafka ListenersKafka 侦听器的请求/回复和重试策略
【发布时间】:2021-04-07 08:42:38
【问题描述】:

我目前正在为 Kafka 侦听器实现错误处理机制。这就是我想要实现的目标:当 Kafka 侦听器中发生异常时,我想重试处理我的记录 10 次,如果仍然失败,则将其发送到死信队列并在有请求时通知发件人/回复场景(例如返回一个特定的 RecordFailure 对象)。 我首先定义了一个 ErrorHandler bean,它工作得很好,但没有提供在失败时返回值的可能性。然后我开始定义一个 KafkaListenerErrorHandler bean 并将其指定为我的 Kafka 侦听器的参数。它允许我返回一个特定的值,但我丢失了 ErrorHandler 定义的重试和死信队列转发策略。最终,我开始使用 RetryTemplate 和 RecoveryCallback 配置我的容器工厂。我认为它会按预期工作,但是当使用 ReplyingKafkaTemplate 发送消息时,我总是最终收到超时异常并意识到,因为我定义了 RetryTemplate,所以调用 RetryingMessageListenerAdapter 中的 onMessage() 方法而不是 ReplyingKafkaTemplate 方法.我现在质疑场景本身:将 RetryTemplate、RecoveryCallback 和 ReplyingKafkaTemplate 结合起来使用 Kafka 侦听器启用请求/回复和重试策略是否有意义?如果是这样,我在这里错过了什么?

感谢您的宝贵时间。

【问题讨论】:

    标签: spring-boot spring-kafka spring-retry


    【解决方案1】:

    由于错误处理程序支持退避和重试,因此在侦听器级别使用重试模板大多是多余的。

    解决您的特定问题的一种方法是启用deliveryAttemptHeaders。

    https://docs.spring.io/spring-kafka/docs/current/reference/html/#delivery-header

    然后,在您的侦听器错误处理程序中,检查标头,当达到特定尝试次数时,将消息发布到死信主题并返回错误结果。在达到计数之前重新抛出异常,以便SeekToCurrentErrorHandler 重新传递记录。

    只需确保 STCEH 有足够的重试次数,使其始终重试,从而使侦听器错误处理程序能够完成其工作。

    编辑

    这是一个示例,展示了如何通过在标头中添加原始 ConsumerRecord 来使用侦听器错误处理程序中的 DLPR...

    @SpringBootApplication
    public class So66982480Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So66982480Application.class, args);
        }
    
        @Bean
        ReplyingKafkaTemplate<String, String, String> rkt(ProducerFactory<String, String> pf,
                ConcurrentKafkaListenerContainerFactory<String, String> factory,
                KafkaTemplate<String, String> template) {
    
            factory.getContainerProperties().setDeliveryAttemptHeader(true);
            factory.setReplyTemplate(template);
            ConcurrentMessageListenerContainer<String, String> container = factory.createContainer("so66982480-replies");
            container.getContainerProperties().setGroupId("so66982480-replies");
            return new ReplyingKafkaTemplate<>(pf, container);
        }
    
        @Bean
        RecordMessageConverter converter() {
            MessagingMessageConverter converter = new MessagingMessageConverter() {
    
                @Override
                public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
                        Consumer<?, ?> consumer, Type type) {
    
                    Message<?> message = super.toMessage(record, acknowledgment, consumer, type);
                    return MessageBuilder.fromMessage(message)
                            .setHeader(KafkaHeaders.RAW_DATA, record)
                            .build();
                }
    
            };
            return converter;
        }
    
        @Bean
        KafkaTemplate<String, String> template(ProducerFactory<String, String> pf) {
            return new KafkaTemplate<>(pf);
        }
    
        @Bean
        NewTopic topic1() {
            return TopicBuilder.name("so66982480").partitions(1).replicas(1).build();
        }
    
        @Bean
        NewTopic topic2() {
            return TopicBuilder.name("so66982480-replies").partitions(1).replicas(1).build();
        }
    
        @Bean
        NewTopic topic3() {
            return TopicBuilder.name("so66982480.DLT").partitions(1).replicas(1).build();
        }
    
        @Bean
        KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
            return (msg, ex) -> {
                if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
                    recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
                    return "FAILED";
                }
                throw ex;
            };
        }
    
        @Bean
        DeadLetterPublishingRecoverer recoverer(KafkaOperations<String, String> template) {
            return new DeadLetterPublishingRecoverer(template);
        }
    
        @KafkaListener(id = "so66982480", topics = "so66982480", errorHandler = "eh")
        @SendTo
        public String listen(String in) {
            throw new RuntimeException("test");
        }
    
        @KafkaListener(id = "so66982480.DLT", topics = "so66982480.DLT")
        public void dlt(String in) {
            System.out.println("From DLT:" + in);
        }
    
        @Bean
        public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
            return args -> {
                RequestReplyFuture<String, String, String> future =
                        template.sendAndReceive(new ProducerRecord<String, String>("so66982480", 0, null, "test"),
                                Duration.ofSeconds(30));
                System.out.println(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata());
                System.out.println(future.get(30, TimeUnit.SECONDS).value());
            };
        }
    
    }
    
    spring.kafka.consumer.auto-offset-reset=earliest
    
    From DLT:test
    FAILED
    

    【讨论】:

    • 我加了一个例子。
    • 感谢您澄清情况。我尝试了您在容器中设置 DeliveryAttemptHeaders 的解决方案,并结合使用 KafkaListenerErrorHandler 和自定义 STCEH 来设置我的重试策略,它解决了问题!非常感谢!
    • 嗨,Gary,您的解决方案仍然可以正常工作!我现在改进了我的解决方案,并且正在使用带有 @Transactional 注释的 Kafka 侦听器。您提出的解决方案在这种情况下仍然有效吗?我的印象是 DefaultAfterRollbackProcessor 接管并总是重试,即使我在 KafkaListenerErrorHandler 中返回一个值。
    • 我需要更多细节来了解发生了什么;我建议你问一个显示相关代码和配置(和日志)的新问题。
    猜你喜欢
    • 1970-01-01
    • 2021-09-24
    • 1970-01-01
    • 2020-06-13
    • 2021-05-26
    • 1970-01-01
    • 1970-01-01
    • 2020-09-16
    • 1970-01-01
    相关资源
    最近更新 更多