【发布时间】:2019-05-29 10:06:24
【问题描述】:
我正在开发一个应用程序,该应用程序使用project-reactor Kafka API 反应性地连接到Kafka-brokers。用例是有一个输入主题,其中包含要处理的文件的文件路径。应用程序读取每个文件,对其进行处理,创建已处理消息的通量并将其推送到输出主题。要求是文件一旦被处理就必须被删除,并且处理的消息应该被推送到输出主题。因此,必须在处理完每个文件并将消息的通量推送到输出主题之后执行删除操作。
public Flux<?> flux() {
return KafkaReceiver
.create(receiverOptions(Collections.singleton(sourceTopic)))
.receive()
.flatMap(m -> transform(m.value()).map(x -> SenderRecord.create(x,
m.receiverOffset())))
.as(sender::send)
.doOnNext(m -> {
m.correlationMetadata().acknowledge();
deleteFile(path);
}).doOnCancel(() -> close());
}
*transform() 方法在文件路径(m.value()) 中启动文件处理并返回消息流。
问题是文件在所有消息都被推送到输出主题之前就被删除了。因此,在失败的情况下,重试时原始文件不可用。
【问题讨论】:
-
这同样适用于输入主题的偏移提交,我必须确保只有在文件的所有已处理消息都被推送到输出主题之后才完成确认。
标签: java apache-kafka nio project-reactor reactive