【问题标题】:Java Reactor: Is there a way to transform Flux<Mono<T>> into Flux<T> without eager fetching?Java Reactor:有没有办法将 Flux<Mono<T>> 转换为 Flux<T> 而无需急切获取?
【发布时间】:2020-05-09 09:55:28
【问题描述】:

我有一个快速但昂贵的生产者(Spring WebClient)和一个非常慢的订阅者。我需要一种方法来应对整个链条的背压。

在实施过程中,我意识到 flatMap、concatMap 和其他使用急切获取,似乎不可能禁用此行为。

在没有 flatMap 的情况下在订阅者中使用需求

Flux.defer(() -> Flux.range(1, 1000))
            .doOnRequest(i -> System.out.println("Requested: " + i))
            .doOnNext(v -> System.out.println("Emitted:   " + v))
            //.flatMap(Mono::just)
            .subscribe(new BaseSubscriber<Object>() {
                protected void hookOnSubscribe(final Subscription subscription) {
                    subscription.request(3);
                }

                protected void hookOnNext(final Object value) {
                    System.out.println("Received:  " + value);
                }
            });

.. 产生:

Requested: 3
Emitted:   1
Received:  1
Emitted:   2
Received:  2
Emitted:   3
Received:  3

使用与 flatMap 相同的需求(未注释)产生:

Requested: 256
Emitted:   1
Received:  1
Emitted:   2
Received:  2
Emitted:   3
Received:  3
Emitted:   4
Emitted:   5
...
Emitted:   254
Emitted:   255
Emitted:   256

【问题讨论】:

  • concatMap 不急于获取。除非一个内部流完成,否则它不会订阅下一个流。
  • 我刚刚用上面的代码测试了一下。它请求 32 个项目。我可以使用prefetch 参数来限制数量,但是在这种情况下,任何大于预取的需求都会被拆分。

标签: java reactive-programming project-reactor spring-webclient


【解决方案1】:

这似乎有一个未解决的问题:https://github.com/reactor/reactor-core/issues/1397

无论如何,我找到了适合我的情况的解决方案:block()。请记住,仅允许在未标记为“仅限非阻塞操作”的线程上执行此操作。 (另见Project Blockhound

回顾一下,问题是在某些时候我有一个Flux&lt;Mono&lt;T&gt;&gt;.flatMap(...).concatMap(...) 等。使用某种急切的获取。用于测试的Flux&lt;Mono&lt;T&gt;&gt;

final Flux<Mono<Integer>> monoFlux = Flux.<Mono<Integer>, Integer>generate(
() -> 0, 
(state, sink) -> {
    state += 1;
    sink.next(Mono.just(state));
    return state;
}).doOnRequest(i -> System.out.println("Requested: " + i))
  .doOnNext(v -> System.out.println("Emitted:   " + v));

为了不急于获取,我现在在地图内做了一个块,它工作得非常好:

monoFlux.map(Mono::block)
        .subscribe(new MySubscriber<>());

结果:

Requested: 3
Emitted:   MonoJust
Received:  1
Emitted:   MonoJust
Received:  2
Emitted:   MonoJust
Received:  3

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-11-23
    • 1970-01-01
    • 2017-12-27
    • 2016-10-12
    • 2022-12-23
    • 1970-01-01
    • 1970-01-01
    • 2019-03-26
    相关资源
    最近更新 更多