【问题标题】:How to multicast a Mono to multiple subscribers in Project Reactor?如何在 Project Reactor 中将 Mono 多播给多个订阅者?
【发布时间】:2021-10-21 14:09:52
【问题描述】:

根据我目前阅读的内容,可以使用ConnectableFlux 将 Flux 多播给多个订阅者,使用类似于以下内容:

Flux<Integer> integerFlux = Flux.range(1, 50)
                .log();

ConnectableFlux<Integer> integerConnectableFlux = integerFlux.publish();

integerConnectableFlux.map(i -> i*2)
                .subscribe(System.out::println);

integerConnectableFlux.map(i -> i*3)
                .subscribe(System.out::println);

integerConnectableFlux.connect();

根据我对响应式流的有限理解,上面的代码将冷发布者转换为热发布者。 我正在处理我有多个订阅者Mono 的场景。如何从 Mono 中获得热门发布者?

【问题讨论】:

    标签: java reactive-programming project-reactor reactive-streams


    【解决方案1】:

    看看Mono#cache()运营商:

    将此 Mono 转换为热源并缓存最后发出的信号 进一步的订户。完成和错误也将被重播。

    【讨论】:

      【解决方案2】:

      使用Mono.share() 操作符管理多播单声道:

      准备一个与 Flux.shareNext() 类似的共享此 Mono 结果的 Mono。当第一个订阅者使用 subscribe() API 订阅时,这将有效地将这个 Mono 变成一个热门任务。进一步的订阅者将共享相同的订阅,因此获得相同的结果。值得注意的是,这是一个不可取消的订阅。

      示例: 以下代码仅在发布者上创建一个订阅:

              Mono<Integer> integerMono = Mono.just(2)
                      .log()
                      .share();
      
              integerMono.map(i -> i+3)
                      .subscribe(System.out::println);
      
              integerMono.map(i -> i*5)
                      .subscribe(System.out::println);
      

      以上代码的输出如下(数据只请求一次):

      5
      reactor.Mono.Just.1 - {} - | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
      reactor.Mono.Just.1 - {} - | request(unbounded)
      reactor.Mono.Just.1 - {} - | onNext(2)
      reactor.Mono.Just.1 - {} - | onComplete()
      10
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-04-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2019-01-07
        相关资源
        最近更新 更多