【发布时间】:2020-05-23 22:16:18
【问题描述】:
是否可以允许基于全局布尔变量有条件地从 Flux 发射值? 我正在使用 Flux delayUntil(...) 但无法完全掌握功能或我的假设是错误的。
我有一个全局 AtomicBoolean 表示下游连接的可用性,并且只希望上游 Flux 在下游准备好处理时发出。
为了表示场景,创建了一个(不工作的)测试样本
//Randomly generates a boolean value every 5 seconds
private Flux<Boolean> signalGenerator() {
return Flux.range(1, Integer.MAX_VALUE)
.delayElements(Duration.ofMillis(5000))
.map(integer -> new Random().nextBoolean());
}
和
Flux.range(1, Integer.MAX_VALUE)
.delayElements(Duration.ofMillis(1000))
.delayUntil(evt -> signalGenerator()) // ?? Only proceed when signalGenerator returns true
.subscribe(System.out::println);
我有另一种情况,下游进程每秒只能接受 x 条消息。在当前的非响应式实现中,我们有一个有 x 个许可的信号量,如果没有更多许可可用,则线程被阻塞,信号量许可每秒重置一次。
在这两种情况下,我都希望上游 Flux 仅在下游流程有需求时才发出,并且我不想缓冲。
【问题讨论】:
标签: spring rx-java reactive-programming spring-webflux