【问题标题】:Project Reactor Kafka: Perform action at the end of Flux without blockingProject Reactor Kafka:在 Flux 结束时执行操作而不阻塞
【发布时间】:2019-05-29 10:06:24
【问题描述】:

我正在开发一个应用程序,该应用程序使用project-reactor Kafka API 反应性地连接到Kafka-brokers。用例是有一个输入主题,其中包含要处理的文件的文件路径。应用程序读取每个文件,对其进行处理,创建已处理消息的通量并将其推送到输出主题。要求是文件一旦被处理就必须被删除,并且处理的消息应该被推送到输出主题。因此,必须在处理完每个文件并将消息的通量推送到输出主题之后执行删除操作。

public Flux<?> flux() {
   return KafkaReceiver
   .create(receiverOptions(Collections.singleton(sourceTopic)))
   .receive()
   .flatMap(m -> transform(m.value()).map(x -> SenderRecord.create(x, 
   m.receiverOffset())))
   .as(sender::send)
   .doOnNext(m -> {
   m.correlationMetadata().acknowledge();
   deleteFile(path);
}).doOnCancel(() -> close());

}

*transform() 方法在文件路径(m.value()) 中启动文件处理并返回消息流。

问题是文件在所有消息都被推送到输出主题之前就被删除了。因此,在失败的情况下,重试时原始文件不可用。

【问题讨论】:

  • 这同样适用于输入主题的偏移提交,我必须确保只有在文件的所有已处理消息都被推送到输出主题之后才完成确认。

标签: java apache-kafka nio project-reactor reactive


【解决方案1】:

由于path 变量似乎可以在整个管道中访问(方法输入参数?),您可以在单独的doFinally 中删除该文件。您需要过滤 onCompletecancel SignalType,因为您不想在失败的情况下删除文件。

如果您不想在取消后删除文件,另一个选项是 doOnComplete

【讨论】:

    猜你喜欢
    • 2020-09-14
    • 2021-07-14
    • 2021-09-28
    • 2019-01-07
    • 1970-01-01
    • 2012-01-14
    • 1970-01-01
    • 1970-01-01
    • 2020-09-25
    相关资源
    最近更新 更多