【问题标题】:Test with EmbeddedKafka - prove number of retries使用 EmbeddedKafka 进行测试 - 证明重试次数
【发布时间】:2021-09-27 12:56:27
【问题描述】:

我有一个使用@EmbeddedKafka 编写的测试。 我使用 SeekToCurrentErrorHandler 配置了错误处理。

我有一个测试,我将一条消息推送到 Kafka,我可以看到它由 DeadLetterPublishingRecover 正确处理,它发布到 DLT。

我想在我的测试中添加一个进一步的断言,以证明 SeekToCurrentErrorHandler 重试了 X 次,但发生 MyCustomException 时只重试了 1 次。

我已经配置了错误处理程序

errorHandler.addNotRetryableExceptions(MyCustomException.class)

我正在努力如何在测试中获取重试信息。如果 DeadLetterPublishingRecoverer 添加一个标头即 kafka_deliveryAttempt 标头会很好。

我也试过这样做:

  1. 在测试中创建 RetryTemplate
@Configuration
public class TestConfiguration {

    @Bean
    @Primary
    public RetryTemplate retryTemplate() {
        return new RetryTemplateBuilder().maxAttempts(3)
                                         .fixedBackoff(500)
                                         .build();
    }
}
  1. 在重试模板上设置监听器。
@Import(TestConfiguration.class)
class MyTest {

@Autowired
private RetryTemplate retryTemplate;

private int retryCount = 0;

  @BeforeEach
  void setup () {
   retryTemplate.registerListener(new RetryListenerSupport() {
            @Override
            public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
                retryCount = context.getRetryCount();
            }
        });
  }
}


// I then execute the test and I can see in the logs it had 3 goes at processing the message. However when I assert for the count to equal 3 it's always 0

【问题讨论】:

    标签: spring-kafka spring-retry


    【解决方案1】:

    SeekToCurrentErrorHandler 不使用RetryTemplate;它有自己的重试机制。

    从 2.7 版本开始,您可以在错误处理程序中添加一个或多个 RetryListeners:

        /**
         * Set one or more {@link RetryListener} to receive notifications of retries and
         * recovery.
         * @param listeners the listeners.
         * @since 2.7
         */
        public void setRetryListeners(RetryListener... listeners) {
            this.failureTracker.setRetryListeners(listeners);
        }
    
    /**
     * A listener for retry activity.
     *
     * @author Gary Russell
     * @since 2.7
     *
     */
    @FunctionalInterface
    public interface RetryListener {
    
        /**
         * Called after a delivery failed for a record.
         * @param record the failed record.
         * @param ex the exception.
         * @param deliveryAttempt the delivery attempt.
         */
        void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
    
        /**
         * Called after a failing record was successfully recovered.
         * @param record the record.
         * @param ex the exception.
         */
        default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
        }
    
        /**
         * Called after a recovery attempt failed.
         * @param record the record.
         * @param original the original exception causing the recovery attempt.
         * @param failure the exception thrown by the recoverer.
         */
        default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
        }
    
    }
    

    【讨论】:

    • 我不想在生产代码中添加重试侦听器,因为它仅用于测试。目前,我正在生产代码中更新 STCEH,作为创建 KafkaListenerContainerFactory 的 @Bean 方法的一部分。为什么 DeadLetterPublishingRecoverer 不添加 kafka_deliveryAttempt 标头?
    • 由于 STCEH 是 @Bean,您可以将 @Autowire 它添加到您的测试中,然后在其中添加侦听器;不会影响生产。如果您将 deliveryAttemptHeader 容器属性设置为 true,则会将传递尝试标头写入 DLT 主题。
    • 目前还不是 Bean。我的 KafkaListenerContainerFactory 上有 @Bean。我的应用程序中有两个 KafkaListenerContainerFactory。我在每个方法中都新建了 STCEH,并将其设置在 ConcurrentKafkaListenerContainerFactory 上。如果我使用一个 Bean 方法来提供一个 STCEH 的实例,两个容器工厂可以使用同一个吗?我猜他们需要自己的,因为它是一个 ContainerAwareErrorHandler?也许让它成为一个原型 bean?
    • 我还尝试使用 setDeliveryAttemptHeader(true) 设置容器属性。我发现当发生反序列化异常时,标头不存在。我本来期望值 1 存在。其次,在其他例外情况下,我可以看到标题 存在。但是它的值是不可读的,我无法粘贴到堆栈溢出中。 (我将标头字节包装成一个字符串)
    • ContainerAware 简单来说就是将容器的引用传递给handle方法; bean 可以共享。对于反序列化异常,没有向侦听器进行传递尝试,记录直接发送到错误处理程序,默认情况下,错误直接发送到恢复器(不重试)。 documentation 解释了如何读取标题。 int delivery = ByteBuffer.wrap(record.headers().lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value()).getInt();
    猜你喜欢
    • 2021-05-02
    • 2019-10-21
    • 2022-01-05
    • 2021-11-16
    • 1970-01-01
    • 1970-01-01
    • 2020-01-31
    • 2020-04-26
    • 2020-09-22
    相关资源
    最近更新 更多