【问题标题】:Spring Reactor multiple subscriptionSpring Reactor 多重订阅
【发布时间】:2020-04-01 20:11:39
【问题描述】:

我正在尝试将我的单声道拆分为其他单独的单声道,这些单声道将在不同线程上处理相同的数据输入数据。

public Mono<String> process() { 
        Mono<String> someString = ... // fetching data from API 



        someString
                .publishOn(Schedulers.fromExecutorService(Executors.newFixedThreadPool(2)))
                .map(String::toLowerCase)
                .subscribe(this::saveLowercase);

        someString
                .publishOn(Schedulers.fromExecutorService(Executors.newFixedThreadPool(2)))
                .map(String::toUpperCase)
                .subscribe(this::saveUpperCase);

        return someString;
}

我在日志中看到我获取了 3 次数据,因为每个订阅调用都从 API 获取数据。我想使用 .cache() 方法,但我想知道一些新的更好的方法只调用一次 API 并处理乘以这些数据?如果 Reactor 不能这样做,我可以将 Reactor 交换为 RxJava。

【问题讨论】:

  • 我认为您需要退后一步,阅读响应式编程的基础知识。订阅者是发起呼叫的人、Web 客户端、GUI 或日程安排等。您几乎从未像在此处所做的那样在自己的应用程序中订阅。

标签: rx-java2 spring-webflux project-reactor


【解决方案1】:

在您给出的示例中,您创建了一个冷Mono。因此,正如您所说,每个subscriber 都会调用HTTP。如果您希望 HTTP 调用只发生一次,请创建 hot observable。您必须使用 .cache() 以便任何未来的订阅者都能收到响应。

这是执行此操作的正确方法。你为什么要寻找“更好的东西”?

来自官方文档的示例:

DirectProcessor<String> hotSource = DirectProcessor.create();

Flux<String> hotFlux = hotSource.map(String::toUpperCase).cache();


hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: "+d));

hotSource.onNext("blue");
hotSource.onNext("green");

hotFlux.subscribe(d -> System.out.println("Subscriber 2 to Hot Source: "+d));

hotSource.onNext("orange");
hotSource.onNext("purple");
hotSource.onComplete(); 

现在两个订阅者都将获得所有颜色。

【讨论】:

  • 最后的解决方案是publish方法,可以作为子流事件使用,并且该操作不会调用upstream的新订阅。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多