【发布时间】:2020-05-09 09:55:28
【问题描述】:
我有一个快速但昂贵的生产者(Spring WebClient)和一个非常慢的订阅者。我需要一种方法来应对整个链条的背压。
在实施过程中,我意识到 flatMap、concatMap 和其他使用急切获取,似乎不可能禁用此行为。
在没有 flatMap 的情况下在订阅者中使用需求
Flux.defer(() -> Flux.range(1, 1000))
.doOnRequest(i -> System.out.println("Requested: " + i))
.doOnNext(v -> System.out.println("Emitted: " + v))
//.flatMap(Mono::just)
.subscribe(new BaseSubscriber<Object>() {
protected void hookOnSubscribe(final Subscription subscription) {
subscription.request(3);
}
protected void hookOnNext(final Object value) {
System.out.println("Received: " + value);
}
});
.. 产生:
Requested: 3
Emitted: 1
Received: 1
Emitted: 2
Received: 2
Emitted: 3
Received: 3
使用与 flatMap 相同的需求(未注释)产生:
Requested: 256
Emitted: 1
Received: 1
Emitted: 2
Received: 2
Emitted: 3
Received: 3
Emitted: 4
Emitted: 5
...
Emitted: 254
Emitted: 255
Emitted: 256
【问题讨论】:
-
concatMap 不急于获取。除非一个内部流完成,否则它不会订阅下一个流。
-
我刚刚用上面的代码测试了一下。它请求 32 个项目。我可以使用
prefetch参数来限制数量,但是在这种情况下,任何大于预取的需求都会被拆分。
标签: java reactive-programming project-reactor spring-webclient