【问题标题】:Exponential backoff with message order guarantee using spring-kafka使用 spring-kafka 保证消息顺序的指数退避
【发布时间】:2022-04-22 19:19:06
【问题描述】:

我正在尝试实现一个基于 Spring Boot 的 Kafka 消费者,它具有一些非常强大的消息传递保证,即使在出现错误的情况下也是如此。

  • 必须按顺序处理来自分区的消息,
  • 如果消息处理失败,应该暂停特定分区的消费,
  • 应使用退避重试处理,直到成功。

我们当前的实现满足这些要求:

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
  ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  factory.setConsumerFactory(consumerFactory());
  factory.setRetryTemplate(retryTemplate());

  final ContainerProperties containerProperties = factory.getContainerProperties();
  containerProperties.setAckMode(AckMode.MANUAL_IMMEDIATE);
  containerProperties.setErrorHandler(errorHandler());

  return factory;
}

@Bean
public RetryTemplate retryTemplate() {

  final ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
  backOffPolicy.setInitialInterval(1000);
  backOffPolicy.setMultiplier(1.5);

  final RetryTemplate template = new RetryTemplate();
  template.setRetryPolicy(new AlwaysRetryPolicy());    
  template.setBackOffPolicy(backOffPolicy);

  return template;
}

@Bean
public ErrorHandler errorHandler() {
  return new SeekToCurrentErrorHandler();
}

然而,在这里,记录被消费者永远锁定。在某些时候,处理时间将超过max.poll.interval.ms,服务器会将分区重新分配给其他消费者,从而创建一个副本。

假设max.poll.interval.ms 等于 5 分钟(默认)并且失败持续 30 分钟,这将导致消息被处理 ca. 6次。

另一种可能性是通过使用SimpleRetryPolicy 在 N 次重试(例如 3 次尝试)后将消息返回到队列。然后,消息将被重播(感谢SeekToCurrentErrorHandler),处理将从头开始,再次尝试最多 5 次。这会导致延迟形成一系列例如

10 secs -> 30 secs -> 90 secs -> 10 secs -> 30 secs -> 90 secs -> ...

这比不断上升的要少:)

是否有任何第三种情况可以保持延迟形成一个上升系列,同时在上述示例中不产生重复?

【问题讨论】:

  • 但是如果消费者的数量=主题的数量,这个消费者就变得闲置了,kafka 不能分配给其他消费者,因为所有其他消费者都很忙。 (每个消费者一个线程)。

标签: apache-kafka messaging spring-kafka spring-retry exponential-backoff


【解决方案1】:

可以通过有状态重试来完成 - 在这种情况下,每次重试后都会引发异常,但状态会保留在重试状态对象中,因此该消息的下一次传递将使用下一个延迟等。

这需要消息中的某些内容(例如标头)来唯一标识每条消息。幸运的是,使用 Kafka,主题、分区和偏移量为状态提供了唯一的键。

但是,目前RetryingMessageListenerAdapter 不支持有状态重试。

您可以在侦听器容器工厂中禁用重试,并在您的侦听器中使用有状态的RetryTemplate,使用带有RetryState 参数的execute 方法之一。

随意为框架添加一个 GitHub 问题以支持有状态重试;欢迎投稿! - pull request issued.

编辑

我刚刚写了一个测试用例来演示如何使用@KafkaListener...

/*
 * Copyright 2018 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.kafka.annotation;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.retry.RetryState;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.support.DefaultRetryState;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * @author Gary Russell
 * @since 5.0
 *
 */
@RunWith(SpringRunner.class)
@DirtiesContext
public class StatefulRetryTests {

    private static final String DEFAULT_TEST_GROUP_ID = "statefulRetry";

    @ClassRule
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 1, "sr1");

    @Autowired
    private Config config;

    @Autowired
    private KafkaTemplate<Integer, String> template;

    @Test
    public void testStatefulRetry() throws Exception {
        this.template.send("sr1", "foo");
        assertThat(this.config.listener1().latch1.await(10, TimeUnit.SECONDS)).isTrue();
        assertThat(this.config.listener1().latch2.await(10, TimeUnit.SECONDS)).isTrue();
        assertThat(this.config.listener1().result).isTrue();
    }

    @Configuration
    @EnableKafka
    public static class Config {

        @Bean
        public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler());
            return factory;
        }

        @Bean
        public DefaultKafkaConsumerFactory<Integer, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }

        @Bean
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> consumerProps =
                    KafkaTestUtils.consumerProps(DEFAULT_TEST_GROUP_ID, "false", embeddedKafka);
            consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            return consumerProps;
        }

        @Bean
        public KafkaTemplate<Integer, String> template() {
            KafkaTemplate<Integer, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
            return kafkaTemplate;
        }

        @Bean
        public ProducerFactory<Integer, String> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        }

        @Bean
        public Map<String, Object> producerConfigs() {
            return KafkaTestUtils.producerProps(embeddedKafka);
        }

        @Bean
        public Listener listener1() {
            return new Listener();
        }

    }

    public static class Listener {

        private static final RetryTemplate retryTemplate = new RetryTemplate();

        private static final ConcurrentMap<String, RetryState> states = new ConcurrentHashMap<>();

        static {
            ExponentialBackOffPolicy backOff = new ExponentialBackOffPolicy();
            retryTemplate.setBackOffPolicy(backOff);
        }

        private final CountDownLatch latch1 = new CountDownLatch(3);

        private final CountDownLatch latch2 = new CountDownLatch(1);

        private volatile boolean result;

        @KafkaListener(topics = "sr1", groupId = "sr1")
        public void listen1(final String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                @Header(KafkaHeaders.OFFSET) long offset) {
            String recordKey = topic + partition + offset;
            RetryState retryState = states.get(recordKey);
            if (retryState == null) {
                retryState = new DefaultRetryState(recordKey);
                states.put(recordKey, retryState);
            }
            this.result = retryTemplate.execute(c -> {

                // do your work here

                this.latch1.countDown();
                throw new RuntimeException("retry");
            }, c -> {
                latch2.countDown();
                return true;
            }, retryState);
            states.remove(recordKey);
        }

    }

}

Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void org.springframework.kafka.annotation.StatefulRetryTests$Listener.listen1(java.lang.String,java.lang.String,int,long)' threw exception; nested exception is java.lang.RuntimeException: retry

每次交付尝试后。

在这种情况下,我添加了一个恢复器来处理重试后的消息。您可以执行其他操作,例如停止容器(但在单独的线程上执行此操作,就像我们在 ContainerStoppingErrorHandler 中所做的那样)。

【讨论】:

  • 我添加了一个在监听器中使用状态恢复的例子。
  • 使用stateful-retry的时候,state默认持久化在哪里?在RetryTemplate 上使用ExponentialBackoffPolicy 时,我猜必须检查消息的状态才能查看下一个退避期。请原谅我不知道这是如何工作的,因为我是新手。
  • 您应该提出一个新问题,而不是评论一个旧问题。使用有状态重试的原因是为了防止超过max.poll.interval.ms 以避免重新平衡。 SeekToCurrentErrorHandler 重置偏移量,以便在下一次轮询时重新获取未处理的记录;然后立即重新传递失败的记录,并暂停线程以进行下一次回退。所以;一些状态(偏移量)保存在 Kafka 中,一些状态(重试尝试,backOff)保存在内存中而不是持久化。如果应用崩溃了,偏移量保持ok,但是重试状态会从头开始。
  • @GaryRussell 谢谢。我一直在寻找上述解决方案。您能否解释一下锁存器的需求以及重试已用尽是什么意思,就像指数退避将被重置一样30秒内?在您的示例中,您是否将 SeekToCurrentErrorHandler 称为恢复器?
  • 这是一个旧答案;不要在这里问新问题。从那时起,事情一直在发展,重试和退避现在已添加到 STCEH,因此不再需要在侦听器级别重试。见the documentation;如果您仍有问题,请提出一个新问题。 &gt;Now that the SeekToCurrentErrorHandler can be configured with a BackOff and has the ability to retry only certain exceptions (since version 2.3), the use of stateful retry, via the listener adapter retry configuration, is no longer necessary. ...
猜你喜欢
  • 2020-03-04
  • 2020-09-02
  • 1970-01-01
  • 2017-06-19
  • 2018-07-05
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多