【问题标题】:Using onErrorResume to handle problematic payloads posted to Kafka using Reactor Kafka使用 onErrorResume 处理使用 Reactor Kafka 发布到 Kafka 的有问题的有效负载
【发布时间】:2020-11-06 11:04:48
【问题描述】:

我正在使用 reactor kafka 发送 kafka 消息并接收和处理它们。 在接收 kakfa 有效载荷时,我会进行一些反序列化,如果出现异常,我只想记录该有效载荷(通过保存到 mongo ),然后继续接收其他有效载荷。

为此,我使用以下方法-

@EventListener(ApplicationStartedEvent.class)
public void kafkaReceiving() {
   for(Flux<ReceiverRecord<String, Object>> flux: kafkaService.getFluxReceives()) {
       flux.delayUntil(//some function to do something)
       .doOnNext(r -> r.receiverOffset().acknowledge())
       .onErrorResume(this::handleException()) // here I'll just save to mongo 
       .subscribe();
   }
}


private Publisher<? extends ReceiverRecord<String,Object>> handleException(object ex) {
 // save to mongo
 return Flux.empty();
}

在这里,我希望每当我在接收有效负载时遇到异常,onErrorResume 应该捕获它并记录到 mongo,然后当我发送到 kafka 队列时我应该继续接收更多消息。但是,我看到异常发生后,即使调用了 onErrorResume 方法,但我无法再处理发送到 Kakfa 主题的消息。 有什么我可能在这里遗漏的吗?

【问题讨论】:

    标签: spring-boot apache-kafka project-reactor reactor-kafka


    【解决方案1】:

    如果您需要优雅地处理错误,可以在delayUntil 中添加onErrorResume

    flux
        .delayUntil(r -> {
            return process(r)
                .onErrorReturn(e -> saveToMongo(r));
        });
        .doOnNext(r -> r.receiverOffset().acknowledge())
        .subscribe();
    

    反应式操作符将错误视为终止信号,并且,如果您的内部逻辑(delayUntil 内部)抛出错误,delayUntil 将终止序列,onErrorReturn after @987654327 @ 不会让它继续处理来自 Kafka 的事件。

    【讨论】:

    • 谢谢@bsideup。我这应该适用于我的应用程序抛出的其他异常。但是对于一种特定类型的异常,我仍然想知道这是否可行,即反序列化异常。由于在 kafka 侦听器接收记录时发生这种情况,因此我的应用程序中当前发生的情况是,直接跳转到 onErrorResume,但不知何故,这并不能让消费者保持活动状态,并且它无法使用后续记录。我可以选择不抛出该异常来自反序列化器类,但我想在监听器这里捕获它。有什么想法吗?
    • 我建议从反序列化器返回一个特殊值,表明反序列化失败。以后可以在process处理。否则,反序列化器抛出的错误将影响来自poll()的整批记录。
    • 谢谢@bsideup。我认为这是有道理的。我最终没有从反序列化器中抛出异常,因为 kafka 无法为该记录提交偏移量,并且没有干净的方法可以忽略该记录并继续进一步消费记录,因为我们没有偏移量信息记录(因为它格式错误)。因此,即使我尝试使用反应式错误运算符忽略记录,轮询也会获取相同的记录,然后消费者就会陷入困境
    【解决方案2】:

    正如@bsideup 所提到的,我最终没有从反序列化器中抛出异常,因为 kafka 无法提交该记录的偏移量,并且没有干净的方法可以忽略该记录并继续进行记录的消费,因为我们没有记录的偏移信息(因为它格式错误)。因此,即使我尝试使用反应式错误运算符忽略记录,轮询也会获取相同的记录,然后消费者就会陷入困境

    【讨论】: