【问题标题】:How to retry with spring kafka version 2..2如何使用spring kafka 2..2版重试
【发布时间】:2020-05-27 01:32:38
【问题描述】:

只是想找出一个使用 KafkaListener 的 spring-kafka 2.2 的简单示例,以重试最后一条失败的消息。如果消息失败,则应将消息重定向到将进行重试的另一个主题。 我们将有 4 个主题。 topicretryTopicsucessTopicerrorTopic 如果主题失败,应重定向到 retryTopic 将进行 3 次重试尝试。如果这些尝试失败,则必须重定向到 errorTopic。如果 topicretryTopic 都成功,应重定向到 sucessTopic

【问题讨论】:

    标签: spring-kafka spring-retry


    【解决方案1】:

    使用 Spring Boot 2.2.4 和 Spring for Apache Kafka 2.3.5 会更简单一些:

    (2.2.x 如下所示)。

    @SpringBootApplication
    public class So60172304Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So60172304Application.class, args);
        }
    
        @Bean
        public NewTopic topic() {
            return TopicBuilder.name("topic").partitions(1).replicas(1).build();
        }
    
        @Bean
        public NewTopic retryTopic() {
            return TopicBuilder.name("retryTopic").partitions(1).replicas(1).build();
        }
    
        @Bean
        public NewTopic successTopic() {
            return TopicBuilder.name("successTopic").partitions(1).replicas(1).build();
        }
    
        @Bean
        public NewTopic errorTopic() {
            return TopicBuilder.name("errorTopic").partitions(1).replicas(1).build();
        }
    
        @Bean
        public ApplicationRunner runner(KafkaTemplate<String, String> template) {
            return args -> {
                template.send("topic", "failAlways");
                template.send("topic", "onlyFailFirst");
                template.send("topic", "good");
            };
        }
    
        /*
         * A custom container factory is needed until 2.3.6 is released because the
         * container customizer was not applied before then.
         */
        @Bean
        ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
                ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
                ConsumerFactory<Object, Object> kafkaConsumerFactory,
                KafkaTemplate<Object, Object> template) {
    
            ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
                    new ConcurrentKafkaListenerContainerFactory<Object, Object>() {
    
                        @Override
                        protected void initializeContainer(ConcurrentMessageListenerContainer<Object, Object> instance,
                                KafkaListenerEndpoint endpoint) {
    
                            super.initializeContainer(instance, endpoint);
                            customizer(template).configure(instance);
                        }
    
            };
            configurer.configure(factory, kafkaConsumerFactory);
    //      factory.setContainerCustomizer(customizer(template)); // after 2.3.6
            return factory;
        }
    
        private ContainerCustomizer<Object, Object, ConcurrentMessageListenerContainer<Object, Object>>
                customizer(KafkaTemplate<Object, Object> template) {
    
            return container -> {
                if (container.getContainerProperties().getTopics()[0].equals("topic")) {
                    container.setErrorHandler(new SeekToCurrentErrorHandler(
                            new DeadLetterPublishingRecoverer(template,
                                    (cr, ex) -> new TopicPartition("retryTopic", cr.partition())),
                            new FixedBackOff(0L, 0L)));
                }
                else if (container.getContainerProperties().getTopics()[0].equals("retryTopic")) {
                    container.setErrorHandler(new SeekToCurrentErrorHandler(
                            new DeadLetterPublishingRecoverer(template,
                                    (cr, ex) -> new TopicPartition("errorTopic", cr.partition())),
                            new FixedBackOff(5000L, 2L)));
                }
            };
        }
    
    }
    
    @Component
    class Listener {
    
        private final KafkaTemplate<String, String> template;
    
        public Listener(KafkaTemplate<String, String> template) {
            this.template = template;
        }
    
        @KafkaListener(id = "so60172304.1", topics = "topic")
        public void listen1(String in) {
            System.out.println("topic: " + in);
            if (in.toLowerCase().contains("fail")) {
                throw new RuntimeException(in);
            }
            this.template.send("successTopic", in);
        }
    
        @KafkaListener(id = "so60172304.2", topics = "retryTopic")
        public void listen2(String in) {
            System.out.println("retryTopic: " + in);
            if (in.startsWith("fail")) {
                throw new RuntimeException(in);
            }
            this.template.send("successTopic", in);
        }
    
        @KafkaListener(id = "so60172304.3", topics = "successTopic")
        public void listen3(String in) {
            System.out.println("successTopic: " + in);
        }
    
        @KafkaListener(id = "so60172304.4", topics = "errorTopic")
        public void listen4(String in) {
            System.out.println("errorTopic: " + in);
        }
    
    }
    
    spring.kafka.consumer.auto-offset-reset=earliest
    

    结果:

    topic: failAlways
    retryTopic: failAlways
    topic: onlyFailFirst
    topic: good
    successTopic: good
    retryTopic: failAlways
    retryTopic: failAlways
    retryTopic: onlyFailFirst
    errorTopic: failAlways
    successTopic: onlyFailFirst
    

    使用 Spring Boot 2.1.12 和 Spring for Apache Kafka 2.2.12:

    @SpringBootApplication
    public class So601723041Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So601723041Application.class, args);
        }
    
        @Bean
        public NewTopic topic() {
            return new NewTopic("topic", 1, (short) 1);
        }
    
        @Bean
        public NewTopic retryTopic() {
            return new NewTopic("retryTopic", 1, (short) 1);
        }
    
        @Bean
        public NewTopic successTopic() {
            return new NewTopic("successTopic", 1, (short) 1);
        }
    
        @Bean
        public NewTopic errorTopic() {
            return new NewTopic("errorTopic", 1, (short) 1);
        }
    
        @Bean
        public ApplicationRunner runner(KafkaTemplate<String, String> template) {
            return args -> {
                template.send("topic", "failAlways");
                template.send("topic", "onlyFailFirst");
                template.send("topic", "good");
            };
        }
    
        @Bean
        ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
                ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
                ConsumerFactory<Object, Object> kafkaConsumerFactory,
                KafkaTemplate<Object, Object> template) {
    
            ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
                    new ConcurrentKafkaListenerContainerFactory<Object, Object>() {
    
                        @Override
                        protected void initializeContainer(ConcurrentMessageListenerContainer<Object, Object> instance,
                                KafkaListenerEndpoint endpoint) {
    
                            super.initializeContainer(instance, endpoint);
                            customize(instance, template);
                        }
    
            };
            configurer.configure(factory, kafkaConsumerFactory);
            return factory;
        }
    
        @Bean
        ConcurrentKafkaListenerContainerFactory<?, ?> retryKafkaListenerContainerFactory(
                ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
                ConsumerFactory<Object, Object> kafkaConsumerFactory,
                KafkaTemplate<Object, Object> template) {
    
            ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
                    new ConcurrentKafkaListenerContainerFactory<Object, Object>() {
    
                        @Override
                        protected void initializeContainer(ConcurrentMessageListenerContainer<Object, Object> instance,
                                KafkaListenerEndpoint endpoint) {
    
                            super.initializeContainer(instance, endpoint);
                            customize(instance, template);
                        }
    
            };
            configurer.configure(factory, kafkaConsumerFactory);
            RetryTemplate retryTemplate = new RetryTemplate();
            retryTemplate.setRetryPolicy(new SimpleRetryPolicy(3));
            FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
            backOffPolicy.setBackOffPeriod(5000L);
            retryTemplate.setBackOffPolicy(backOffPolicy);
            factory.setRetryTemplate(retryTemplate);
            return factory;
        }
    
        private void customize(ConcurrentMessageListenerContainer<Object, Object> container,
                KafkaTemplate<Object, Object> template) {
    
            if (container.getContainerProperties().getTopics()[0].equals("topic")) {
                container.setErrorHandler(new SeekToCurrentErrorHandler(
                        new DeadLetterPublishingRecoverer(template,
                                (cr, ex) -> new TopicPartition("retryTopic", cr.partition())),
                        0));
            }
            else if (container.getContainerProperties().getTopics()[0].equals("retryTopic")) {
                container.setErrorHandler(new SeekToCurrentErrorHandler(
                        new DeadLetterPublishingRecoverer(template,
                                (cr, ex) -> new TopicPartition("errorTopic", cr.partition())),
                        0)); // no retries here - retry template instead.
            }
        }
    
    }
    
    @Component
    class Listener {
    
        private final KafkaTemplate<String, String> template;
    
        public Listener(KafkaTemplate<String, String> template) {
            this.template = template;
        }
    
        @KafkaListener(id = "so60172304.1", topics = "topic")
        public void listen1(String in) {
            System.out.println("topic: " + in);
            if (in.toLowerCase().contains("fail")) {
                throw new RuntimeException(in);
            }
            this.template.send("successTopic", in);
        }
    
        @KafkaListener(id = "so60172304.2", topics = "retryTopic", containerFactory = "retryKafkaListenerContainerFactory")
        public void listen2(String in) {
            System.out.println("retryTopic: " + in);
            if (in.startsWith("fail")) {
                throw new RuntimeException(in);
            }
            this.template.send("successTopic", in);
        }
    
        @KafkaListener(id = "so60172304.3", topics = "successTopic")
        public void listen3(String in) {
            System.out.println("successTopic: " + in);
        }
    
        @KafkaListener(id = "so60172304.4", topics = "errorTopic")
        public void listen4(String in) {
            System.out.println("errorTopic: " + in);
        }
    
    }
    

    编辑

    要更改已发布记录中的有效负载,您可以使用类似这样的方法(调用 MyRepublisher.setNewValue("new value");)。

    public class MyRepublisher extends DeadLetterPublishingRecoverer {
    
        private static final ThreadLocal<String> newValue = new ThreadLocal<>();
    
        public MyRepublisher(KafkaTemplate<Object, Object> template,
                BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
    
            super(template, destinationResolver);
        }
    
        @Override
        protected ProducerRecord<Object, Object> createProducerRecord(ConsumerRecord<?, ?> record,
                TopicPartition topicPartition, RecordHeaders headers) {
    
            ProducerRecord<Object, Object> producerRecord = new ProducerRecord<>(topicPartition.topic(),
                            topicPartition.partition() < 0 ? null : topicPartition.partition(),
                            record.key(), newValue.get(), headers);
            newValue.remove();
            return producerRecord;
        }
    
        public static void setNewValue(String value) {
            newValue.set(value);
        }
    
    }
    

    【讨论】:

    • 我需要向errorTopic 发送一个特定的对象。这可能吗?
    • SeekToCurrentErrorHandler 的第一个参数可以是ConsumerRecordRecoverer 的任何实现。您可以在“恢复器”中做任何您想做的事情。 DeadLetterPublishingRecoverer 只是发送失败的入站数据。
    • 但我需要在创建“errorTopic”之前访问“topic”对象。在将消息发送到“errorTopic”之前如何访问“topic”对象?
    • 我不知道你所说的“主题对象”是什么意思。主题名称在record.topic() 中可用。
    • 我看到我可以通过 ConsumerRecord 中的 value() 方法访问您的示例中名为“Topic”的侦听器接收的对象。我怎样才能改变它?我们正在使用 spring-kafka-2.2.8.RELEASE。我想向 errorTopic 发送不同的消息。
    猜你喜欢
    • 2019-01-20
    • 2019-04-09
    • 1970-01-01
    • 2023-03-20
    • 1970-01-01
    • 1970-01-01
    • 2020-07-28
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多