【问题标题】:Reactor Flux conditional emitReactor Flux 条件发射
【发布时间】:2020-05-23 22:16:18
【问题描述】:

是否可以允许基于全局布尔变量有条件地从 Flux 发射值? 我正在使用 Flux delayUntil(...) 但无法完全掌握功能或我的假设是错误的。

我有一个全局 AtomicBoolean 表示下游连接的可用性,并且只希望上游 Flux 在下游准备好处理时发出。

为了表示场景,创建了一个(不工作的)测试样本

//Randomly generates a boolean value every 5 seconds
private Flux<Boolean> signalGenerator() {
    return Flux.range(1, Integer.MAX_VALUE)
            .delayElements(Duration.ofMillis(5000))
            .map(integer -> new Random().nextBoolean());
}

   Flux.range(1, Integer.MAX_VALUE)
            .delayElements(Duration.ofMillis(1000))
            .delayUntil(evt -> signalGenerator())     // ??  Only proceed when signalGenerator returns true
            .subscribe(System.out::println);

我有另一种情况,下游进程每秒只能接受 x 条消息。在当前的非响应式实现中,我们有一个有 x 个许可的信号量,如果没有更多许可可用,则线程被阻塞,信号量许可每秒重置一次。

在这两种情况下,我都希望上游 Flux 仅在下游流程有需求时才发出,并且我不想缓冲。

【问题讨论】:

    标签: spring rx-java reactive-programming spring-webflux


    【解决方案1】:

    您可以考虑使用Mono.fromRunnable() 作为delayUntil() 的输入,如下所示;

    助手类;

    public class FluxCondition {
    
        CountDownLatch latch = new CountDownLatch(10); // it depends, might be managed somehow
        Runnable r = () -> { latch.await(); }
    
        public void lock() { Mono.fromRunnable(r) };
        public void release() { latch.countDown(); }
    }
        
    

    用法;

    FluxCondition delayCondition = new FluxCondition();
    Flux.range(1, 10).delayUntil(o -> delayCondition.lock()).subscribe();
    .....
    delayCondition.release(); // shall call this for each element
    

    我想使用 sink.emitNext 可能会有更好的解决方案,但这可能还需要一个条件变量来控制 Flux 流。

    据我了解,在响应式编程中,每个操作步骤都应考虑您的数据。因此,将消费者设计为反应式处理器可能会更好。就我而言,我没有机会并按照我上面描述的方式进行操作

    【讨论】:

    • 我认为上面的行应该写成public Mono&lt;Object&gt; lock() { return Mono.fromRunnable(r); }否则“用法”将无法编译。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-11-23
    • 1970-01-01
    • 1970-01-01
    • 2018-07-04
    • 2019-06-23
    • 1970-01-01
    相关资源
    最近更新 更多