【发布时间】:2020-11-06 11:04:48
【问题描述】:
我正在使用 reactor kafka 发送 kafka 消息并接收和处理它们。 在接收 kakfa 有效载荷时,我会进行一些反序列化,如果出现异常,我只想记录该有效载荷(通过保存到 mongo ),然后继续接收其他有效载荷。
为此,我使用以下方法-
@EventListener(ApplicationStartedEvent.class)
public void kafkaReceiving() {
for(Flux<ReceiverRecord<String, Object>> flux: kafkaService.getFluxReceives()) {
flux.delayUntil(//some function to do something)
.doOnNext(r -> r.receiverOffset().acknowledge())
.onErrorResume(this::handleException()) // here I'll just save to mongo
.subscribe();
}
}
private Publisher<? extends ReceiverRecord<String,Object>> handleException(object ex) {
// save to mongo
return Flux.empty();
}
在这里,我希望每当我在接收有效负载时遇到异常,onErrorResume 应该捕获它并记录到 mongo,然后当我发送到 kafka 队列时我应该继续接收更多消息。但是,我看到异常发生后,即使调用了 onErrorResume 方法,但我无法再处理发送到 Kakfa 主题的消息。 有什么我可能在这里遗漏的吗?
【问题讨论】:
标签: spring-boot apache-kafka project-reactor reactor-kafka