【问题标题】:RetryBackoffSpec not working with KafkaReceiver which throws exceptionRetryBackoffSpec 不适用于引发异常的 KafkaReceiver
【发布时间】:2021-07-26 02:47:47
【问题描述】:

我有一个用例,我想无限地继续接收来自 Kafka 的记录,并使用 processRecord(String record) 对记录进行一些处理,这可能会抛出 RuntimeException。我想重试多次(比如 5 次),如果在 5 次重试之前的任何时间都成功,想要手动提交偏移量并继续下一条记录,如果不是,那么想要(记录它-> 提交偏移量)然后继续接下来的记录。我有一个代码,但似乎不能正常工作。不胜感激。

public class MyClass {
    private final AtomicInteger atomicInteger = new AtomicInteger(0);
    private final ReceiverOptions<String, String> receiverOptions = getReceiverOptions();

    public void consumeRecords() {
        RetryBackoffSpec retrySpec = Retry.backoff(5, Duration.ofSeconds(1)).transientErrors(true);
        KafkaReceiver.create(receiverOptions)
                .receive()
                .doOnNext(record -> {
                    System.out.println(record.value());
                    processRecord(record.value());
                })
                .doOnError(e -> System.out.println(atomicInteger.incrementAndGet()))
                .onErrorContinue((e, r) -> {
                    System.out.println(atomicInteger.incrementAndGet());
                    System.out.println("Record: " + r);
                    System.out.println("Error: " + e);
                })
                .retryWhen(retrySpec)
                .repeat()
                .subscribe();

    }

    public void processRecord(String record) {
        // might throw an exception
        throw new RuntimeException("Throwing exception!");
    }
}

我收到的输出是:

some message
1
Record: ConsumerRecord(topic = my-topic, partition = 0, leaderEpoch = null, offset = 1, CreateTime = 1620062099518, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = some message)
Error: java.lang.RuntimeException: Throwing exception!

second message
1
Record: ConsumerRecord(topic = my-topic, partition = 1, leaderEpoch = null, offset = 2, CreateTime = 1620062166706, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = second message)
Error: java.lang.RuntimeException: Throwing exception!

它没有重试 5 次,而且 AtomicInteger 没有更新第二条记录。

我想要实现的是:

count = 0
while (count < 5) {
    if (exception) count++;
    else break_and_continue_with_next_record
}

if (count == 5) log_failure_and_continue_with_next_record

【问题讨论】:

    标签: kafka-consumer-api spring-kafka project-reactor flux reactor-kafka


    【解决方案1】:

    onErrorResume() 优先于 onErrorContinue()

    那么问题是您不能在那里提交偏移量,因为此时接收器不再处于活动状态。

    这对我有用...

        private final AtomicInteger atomicInteger = new AtomicInteger();
    
        public void consumeRecords(ReceiverOptions<String, String> receiverOptions) {
            RetryBackoffSpec retrySpec = Retry.backoff(5, Duration.ofSeconds(1)).transientErrors(true);
            KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions);
            AtomicReference<ReceiverRecord<?, ?>> failed = new AtomicReference<>();
            receiver.receive()
                    .subscribeOn(Schedulers.single())
                    .doOnNext(record -> {
                        System.out.println(record.value() + "@" + record.offset());
                        if (failed.get() != null) {
                            System.out.println("Committing failed record offset " + record.value()
                                    + "@" + record.offset());
                            record.receiverOffset().acknowledge();
                            failed.set(null);
                        }
                        else {
                            atomicInteger.set(0);
                            try {
                                processRecord(record.value());
                                record.receiverOffset().acknowledge();
                            }
                            catch (Exception e) {
                                throw new ReceiverRecordException(record, e);
                            }
                        }
                    })
                    .doOnError(ex -> atomicInteger.incrementAndGet())
                    .retryWhen(retrySpec)
                    .onErrorResume(e -> {
                        ReceiverRecordException ex = (ReceiverRecordException) e.getCause();
                        ReceiverRecord<?, ?> record = ex.getRecord();
                        System.out.println("Retries exhausted for " + record.value()
                                + "@" + record.offset());
                        failed.set(record);
                        return Mono.empty();
                    })
                    .repeat()
                    .subscribe();
        }
    
        public void processRecord(String record) {
            // might throw an exception
            throw new RuntimeException("Throwing exception!");
        }
    
    }
    
    @SuppressWarnings("serial")
    class ReceiverRecordException extends RuntimeException {
    
        private final ReceiverRecord record;
    
        ReceiverRecordException(ReceiverRecord record, Throwable t) {
            super(t);
            this.record = record;
        }
    
        public ReceiverRecord getRecord() {
            return this.record;
        }
    
    }
    

    编辑

    这是完整的应用程序...

    @SpringBootApplication
    public class So67373188Application {
    
        private static final Logger log = LoggerFactory.getLogger(So67373188Application.class);
    
        public static void main(String[] args) throws InterruptedException {
            SpringApplication.run(So67373188Application.class, args);
            Thread.sleep(120_000);
        }
    
        @Bean
        public NewTopic topic() {
            return TopicBuilder.name("so67373188").partitions(1).replicas(1).build();
        }
    
        @Bean
        public ApplicationRunner runner2() {
            return args -> {
                SenderOptions<String, String> so = SenderOptions.create(
                        Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
                                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
                                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class));
                KafkaSender<String, String> sender = KafkaSender.create(so);
                Disposable subscribed = sender.send(Flux.just(pr("foo"), pr("bar"), pr("fail"), pr("baz")))
                    .subscribe(result -> {
                        System.out.println(result.recordMetadata());
                    });
                Thread.sleep(5000);
                subscribed.dispose();
            };
        }
    
        @Bean
        public ApplicationRunner runner3(KafkaOperations<String, String> template) {
            return args -> {
                DeadLetterPublishingRecoverer dlpr = new DeadLetterPublishingRecoverer(template);
                ReceiverOptions<String, String> ro = ReceiverOptions.<String, String> create(
                        Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
                                ConsumerConfig.GROUP_ID_CONFIG, "so67373188",
                                ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1,
                                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"))
                        .withKeyDeserializer(new StringDeserializer())
                        .withValueDeserializer(new StringDeserializer())
                        .addAssignListener(assignments -> log.info("Assigned: " + assignments))
                        .commitBatchSize(1)
                        .subscription(Collections.singletonList("so67373188"));
                consumeRecords(ro);
            };
        }
    
        private SenderRecord<String, String, String> pr(String value) {
            return SenderRecord.create("so67373188", 0, null, null, value, value + ".corr");
        }
    
        private final AtomicInteger atomicInteger = new AtomicInteger();
    
        public void consumeRecords(ReceiverOptions<String, String> receiverOptions) {
            RetryBackoffSpec retrySpec = Retry.backoff(5, Duration.ofSeconds(1)).transientErrors(true);
            KafkaReceiver<String, String> receiver = KafkaReceiver.create(receiverOptions);
            AtomicReference<ReceiverRecord<?, ?>> failed = new AtomicReference<>();
            receiver.receive()
                    .subscribeOn(Schedulers.single())
                    .doOnNext(record -> {
                        System.out.println(record.value() + "@" + record.offset());
                        if (failed.get() != null) {
                            System.out.println("Committing failed record offset " + record.value()
                                    + "@" + record.offset());
                            record.receiverOffset().acknowledge();
                            failed.set(null);
                        }
                        else {
                            atomicInteger.set(0);
                            try {
                                processRecord(record.value());
                                record.receiverOffset().acknowledge();
                            }
                            catch (Exception e) {
                                throw new ReceiverRecordException(record, e);
                            }
                        }
                    })
                    .doOnError(ex -> atomicInteger.incrementAndGet())
                    .retryWhen(retrySpec)
                    .onErrorResume(e -> {
                        ReceiverRecordException ex = (ReceiverRecordException) e.getCause();
                        ReceiverRecord<?, ?> record = ex.getRecord();
                        System.out.println("Retries exhausted for " + record.value()
                                + "@" + record.offset());
                        failed.set(record);
                        return Mono.empty();
                    })
                    .repeat()
                    .subscribe();
        }
    
        public void processRecord(String record) {
            // might throw an exception
            if (record.equals("fail")) {
                throw new RuntimeException("Throwing exception!");
            }
        }
    
    }
    
    @SuppressWarnings("serial")
    class ReceiverRecordException extends RuntimeException {
    
        private final ReceiverRecord record;
    
        ReceiverRecordException(ReceiverRecord record, Throwable t) {
            super(t);
            this.record = record;
        }
    
        public ReceiverRecord getRecord() {
            return this.record;
        }
    
    }
    

    结果:

    
      .   ____          _            __ _ _
     /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
    ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
     \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
      '  |____| .__|_| |_|_| |_\__, | / / / /
     =========|_|==============|___/=/_/_/_/
    [32m :: Spring Boot :: [39m              [2m (v2.4.5)[0;39m
    
    so67373188-0@16
    so67373188-0@17
    so67373188-0@18
    so67373188-0@19
    foo@16
    bar@17
    fail@18
    fail@18
    fail@18
    fail@18
    fail@18
    fail@18
    Retries exhausted for fail@18
    fail@18
    Committing failed record offset fail@18
    baz@19
    

    【讨论】:

    • 我觉得应该是ReceiverRecordException ex = (ReceiverRecordException) e;
    • 当我运行它时,e 是一个重试耗尽异常,或者类似的东西,原因是 RRE。
    • 另外,这段代码对我不起作用。我已经添加了我的代码 here 以及我面临的错误。
    • 什么版本?我正在使用 1.3.3。
    • 我使用的是 1.2.5.RELEASE。将其更改为 1.3.3 版。请忽略我之前的链接,参考this。现在代码只读取第一条消息一次并且不执行onErrorResume
    猜你喜欢
    • 1970-01-01
    • 2017-10-05
    • 1970-01-01
    • 2012-08-04
    • 2015-10-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多