【发布时间】:2021-04-12 14:22:25
【问题描述】:
我有一些 Reactor Kafka 代码通过 KafkaReceiver 读取事件,并通过 1 个或多个 KafkaSenders 写入 1..许多下游消息,这些消息连接成单个 Publisher。一切都很好,但我想做的只是在完成时从这个连接的发件人Flux 发出一个事件(即,它已完成对任何给定事件的所有下游主题的写入,因此它不会为每个元素在下游写入 Kafka 直到完成)。通过这种方式,我可以sample() 并定期提交偏移量,知道每当sample() 碰巧触发并且我为传入事件提交偏移量时,我已经为我提交偏移量的每个事件处理了所有下游消息。似乎我可以以某种方式使用pauseUntilOther() 或then(),但我不太清楚我的代码和具体用例是如何给出的。任何想法或建议表示赞赏,谢谢。
主要发布者代码:
this.kafkaReceiver.receive()
.groupBy(m -> m.receiverOffset().topicPartition())
.flatMap(partitionFlux ->
partitionFlux.publishOn(this.scheduler)
.flatMap(this::processEvent)
.sample(Duration.ofMillis(10))
.concatMap(sr -> commitReceiverOffset(sr.correlationMetadata())))
.subscribe();
通过调用processEvent()返回连接的KafkaSenders:
return Flux.concat(kafkaSenderFluxA, kafkaSenderFluxB)
.doOnComplete(LOG.info(“Finished writing all downstream messages for incoming event);
【问题讨论】:
标签: project-reactor reactor reactor-kafka