【问题标题】:Fire and forget Action on doOnNext in Project Reactor在 Project Reactor 中对 doOnNext 执行触发并忘记操作
【发布时间】:2021-12-22 06:01:54
【问题描述】:

我有一个Flux 流。对于每个处理的元素,我希望触发一个异步/非阻塞的动作。例如,从数据库更新返回 Mono 的方法。 我希望在 doOnNext 块上执行此操作。 我不想影响Flux,在那里实现的处理和背压。

假设要调用的Mono方法是

Mono<Integer> dbUpdate();

我的Flux应该这样吗?

public Flux<Data> processData(PollRequest request)
{
    return searchService.search(request)              
                        .doOnNext(data -> dbUpdate(data));
}

或者应该在堆栈溢出example中提到。

public Flux<Data> processData(PollRequest request)
{
    return searchService.search(request)              
                        .doOnNext(data -> dbUpdate(data).subscribe());
}

以上内容不会导致doOnNext内部出现阻塞问题吗?

还有哪个调度器最适合用于此类操作?

【问题讨论】:

    标签: java spring reactive-programming spring-webflux project-reactor


    【解决方案1】:

    dbUpdate() 如果您不订阅,将被忽略。以下 sn-p 不打印任何内容,因为 Mono.just("db update") 没有被订阅。

    Mono<String> dbUpdate() {
        return Mono.just("db update")
            .doOnNext(System.out::println);
    }
    
    public Flux<String> processData() {
        return Flux.just("item 1", "item 2")
            .doOnNext(data -> dbUpdate());
    }
    

    请注意,.subscribe() 不会阻塞您的线程,它会启动工作并立即返回。

    【讨论】:

    • 感谢 Lefteris ;)
    猜你喜欢
    • 1970-01-01
    • 2021-04-29
    • 2022-01-02
    • 2021-07-14
    • 2018-09-26
    • 2019-05-29
    • 1970-01-01
    • 1970-01-01
    • 2020-09-14
    相关资源
    最近更新 更多