【问题标题】:RxJava: Observable and default threadRxJava:可观察和默认线程
【发布时间】:2018-11-10 05:42:59
【问题描述】:

我有以下代码:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull final ObservableEmitter<String> s) throws Exception {
                Thread thread = new Thread(new Runnable() {
                    @Override
                    public void run() {
                        s.onNext("1");
                        s.onComplete();
                    }
                });
                thread.setName("background-thread-1");
                thread.start();
            }
        }).map(new Function<String, String>() {
            @Override
            public String apply(@NonNull String s) throws Exception {
                String threadName = Thread.currentThread().getName();
                logger.logDebug("map: thread=" + threadName);
                return "map-" + s;
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {}

            @Override
            public void onNext(String s) {
                String threadName = Thread.currentThread().getName();
                logger.logDebug("onNext: thread=" + threadName + ", value=" + s);
            }

            @Override
            public void onError(Throwable e) {}

            @Override
            public void onComplete() {
                String threadName = Thread.currentThread().getName();
                logger.logDebug("onComplete: thread=" + threadName);
            }
        });

这是输出:

map: thread=background-thread-1 
onNext: thread=background-thread-1, value=map-1 
onComplete: thread=background-thread-1

重要细节:我正在从另一个线程(Android 中的main 线程)调用subscribe 方法。

所以看起来Observable 类是同步的,默认情况下,它在发出事件的同一线程(s.onNext)上执行所有操作(像map + 通知订阅者),对吧?我想知道......这是有意的行为还是我只是误解了什么?实际上,我期望至少 onNextonComplete 回调将在调用者的线程上调用,而不是在一个发射事件上。我是否正确理解在这种特殊情况下实际调用者的线程无关紧要?至少在异步生成事件时。

另一个问题 - 如果我从某个外部源收到一些 Observable 作为参数(即我不是自己生成它)怎么办......作为其用户,我无法检查它是否是同步的或异步,我只需要通过subscribeOnobserveOn 方法明确指定接收回调的位置,对吗?

谢谢!

【问题讨论】:

    标签: java rx-java rx-java2


    【解决方案1】:

    RxJava 对并发没有意见。如果您不使用任何其他机制(如 observeOn/subscribeOn),它将在订阅线程上生成值。请不要在操作符中使用像线程这样的低级结构,你可能会违反合同。

    由于使用了Thread,onNext会从调用Thread('background-thread-1')中调用。订阅发生在调用(UI-Thread)上。链中的每个操作员都将从'background-thread-1'-calling-Thread 调用。订阅 onNext 也将从 'background-thread-1' 调用。

    如果你想生成不在调用线程上的值,请使用:subscribeOn。如果要将线程切换回主线程,请在链中的某处使用 observeOn。最有可能在订阅之前。

    例子:

    Observable.just(1,2,3) // creation of observable happens on Computational-Threads
                .subscribeOn(Schedulers.computation()) // subscribeOn happens only once in chain. Nearest to source wins
                .map(integer -> integer) // map happens on Computational-Threads
                .observeOn(AndroidSchedulers.mainThread()) // Will switch every onNext to Main-Thread
                .subscribe(integer -> {
                    // called from mainThread
                });
    

    这是一个很好的解释。 http://tomstechnicalblog.blogspot.de/2016/02/rxjava-understanding-observeon-and.html

    【讨论】:

      猜你喜欢
      • 2014-11-04
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-06-04
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多