【问题标题】:Reactor Flux - Only emit from Publisher on completionReactor Flux - 仅在完成时从 Publisher 发出
【发布时间】:2021-04-12 14:22:25
【问题描述】:

我有一些 Reactor Kafka 代码通过 KafkaReceiver 读取事件,并通过 1 个或多个 KafkaSenders 写入 1..许多下游消息,这些消息连接成单个 Publisher。一切都很好,但我想做的只是在完成时从这个连接的发件人Flux 发出一个事件(即,它已完成对任何给定事件的所有下游主题的写入,因此它不会为每个元素在下游写入 Kafka 直到完成)。通过这种方式,我可以sample() 并定期提交偏移量,知道每当sample() 碰巧触发并且我为传入事件提交偏移量时,我已经为我提交偏移量的每个事件处理了所有下游消息。似乎我可以以某种方式使用pauseUntilOther()then(),但我不太清楚我的代码和具体用例是如何给出的。任何想法或建议表示赞赏,谢谢。

主要发布者代码:

this.kafkaReceiver.receive()
        .groupBy(m -> m.receiverOffset().topicPartition())
        .flatMap(partitionFlux ->
                partitionFlux.publishOn(this.scheduler)
                        .flatMap(this::processEvent)
                        .sample(Duration.ofMillis(10))
                        .concatMap(sr -> commitReceiverOffset(sr.correlationMetadata())))
        .subscribe();

通过调用processEvent()返回连接的KafkaSenders:

return Flux.concat(kafkaSenderFluxA, kafkaSenderFluxB)
        .doOnComplete(LOG.info(“Finished writing all downstream messages for incoming event);

【问题讨论】:

    标签: project-reactor reactor reactor-kafka


    【解决方案1】:

    听起来像 Flux.last() 就是你要找的东西:

    return Flux.concat(kafkaSenderFluxA, kafkaSenderFluxB)
        .doOnComplete(LOG.info(“Finished writing all downstream messages for incoming event)
        .last();
    

    然后,您的.sample(Duration.ofMillis(10)) 将执行发送给这些经纪人的一个或多个批次中的最后一项可用的任何事情。最后,您的commitReceiverOffset() 将正确提交最后的内容。

    查看其 JavaDocs 了解更多信息:

    /**
     * Emit the last element observed before complete signal as a {@link Mono}, or emit
     * {@link NoSuchElementException} error if the source was empty.
     * For a passive version use {@link #takeLast(int)}
     *
     * <p>
     * <img class="marble" src="doc-files/marbles/last.svg" alt="">
     *
     * <p><strong>Discard Support:</strong> This operator discards elements before the last.
     *
     * @return a {@link Mono} with the last value in this {@link Flux}
     */
    public final Mono<T> last() {
    

    和大理石图:https://projectreactor.io/docs/core/release/api/reactor/core/publisher/doc-files/marbles/last.svg

    【讨论】:

    • 我在发布此内容后发现了 takeLast(1),但这更好,谢谢!
    • 啊,但是源可能是空的,我看到在这种情况下会引发异常。我在两个 Flux 上都有 onErrorContinue() 运算符,这是否允许发布者恢复并继续处理此异常?在这种情况下,我似乎想要 takeLast(1)。
    • 请参阅 Mono.onErrorResume(Class&lt;E&gt; type, Function&lt;? super E, ? extends Throwable&gt; mapper) 以获取 NoSuchElementException。我相信它必须像这样onErrorResume(NoSuchElementException.class, ex -&gt; Mono.empty())。因此,您在主流程中的 flatMap() 不会发出任何内容 - 不会对任何内容进行采样,也不会提交任何内容。一切都将返回接收源,以便下一批从 Kafka 轮询。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-01-19
    • 2017-12-17
    • 1970-01-01
    • 2019-12-03
    • 2019-01-09
    相关资源
    最近更新 更多