【问题标题】:RxJava observeOn() issueRxJava observeOn() 问题
【发布时间】:2019-08-19 08:54:51
【问题描述】:

我分别订阅了两个PublishSubjects 和两个Observers。在PubslishSubjectSubscriber 之间,我使用observeOn(Schedulers.single()) 从[main] 线程切换到[RxSingleScheduler] 线程。我开始在循环内的两个PublishSubjects 上发布(PubslishSubject.onNext())。

publishSubject1.onNext("next");
publishSubject2.onNext("next");

我的预期是两个订阅者的运行顺序与发布排放的顺序相同,但我得到的结果完全不同。 Subsriber1 处理它的所有排放,然后 Subscriber2 处理它的所有排放。 我预计排放计划并按发布顺序运行。有什么办法可以实现吗?

import java.util.concurrent.TimeUnit;

import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.PublishSubject;

public class ObserveOnApp {

    public static void main(String[] args) {
        PublishSubject<String> publishSubject1 = PublishSubject.create();
        PublishSubject<String> publishSubject2 = PublishSubject.create();

        publishSubject1
        .observeOn(Schedulers.single())
        .subscribe(next -> {
            System.out.println("Subscriber1");
        });

        publishSubject2
        .observeOn(Schedulers.single())
        .subscribe(next -> {
            System.out.println("Subscriber2");
        });

        for (int i= 0; i < 10; i++) {
            publishSubject1.onNext("next");
            publishSubject2.onNext("next");
        }

        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

【问题讨论】:

  • 请不要在 SO 和 RxJava 问题网站上cross post您的问题。谢谢。

标签: rx-java reactive-programming rx-java2


【解决方案1】:

Schedulers Single 是为了按顺序执行工作,查看它的documentation

为需要在同一后台线程上强顺序执行的工作返回默认的、共享的、单线程支持的调度程序实例。

顺便说一句,在按原样运行几次代码之后,我很幸运地让 Subscribe1 发出一次,然后 Subscribe2 完成它的工作,Subscribe1 完成:

Subscriber1 // <- 1 started first!
Subscriber2 // <- then 2 started
Subscriber2
Subscriber2
...
Subscriber2
Subscriber2
Subscriber2 // <- 2 finished
Subscriber1 // <- 1 continued
Subscriber1
Subscriber1
...

如果您对使用single 调度程序没有特定要求,请尝试使用newThreadio,结果会更像:

Subscriber1
Subscriber1
Subscriber2
Subscriber1
Subscriber2
Subscriber1
Subscriber2
Subscriber2

如果您总是发出相同的项目,您不需要创建多个 PublishSubject,您可以使用 publish 多播它,并在您决定开始发出项目时调用 connect。您还可以使用autoconnect 并传递将触发项目发射的连接数:

        PublishSubject<String> publishSubject1 = PublishSubject.create();

        Observable<String> stringObservable = publishSubject1.publish().autoConnect(2);

        stringObservable
                .observeOn(Schedulers.io())
                .subscribe(next -> System.out.println("Subscriber1"));

        stringObservable
                .observeOn(Schedulers.io())
                .subscribe(next -> System.out.println("Subscriber2"));

        for (int i= 0; i < 10; i++) {
            publishSubject1.onNext("next");
        }

        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

【讨论】:

    【解决方案2】:

    如果我理解您的期望,您应该使用subscribeOn(Schedulers.single()) 而不是observeOn

    subscribeOn 有点棘手,因为它说明了可观察的“开始”在哪个线程上,而 observeOn 允许您在运算符序列的“中间”切换线程

    在此处查看 subscribeOn 和 observeOn 之间的区别: https://medium.com/upday-devs/rxjava-subscribeon-vs-observeon-9af518ded53a

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2022-07-27
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-06-10
      • 1970-01-01
      相关资源
      最近更新 更多