【问题标题】:RxJava: observeOn, subscribeOn, and doFinally, switching between IO and UI threadRxJava:observeOn、subscribeOn、doFinally,在IO和UI线程之间切换
【发布时间】:2017-04-28 14:28:06
【问题描述】:

我遇到了一个问题,我的 observable 在 IO 线程上订阅并在 android 主 (UI) 线程上观察到,但 doFinally 运算符在 IO 线程上运行,它需要在 UI 线程上运行.

用例与medium article 几乎完全相同。

我基本上想在订阅 Observable 时显示 ProgressBar,并在 Observable 终止或完成时隐藏 ProgressBar

我得到的错误是:java.lang.IllegalStateException: 当前线程必须有一个looper!

谁能帮我将doFinally 操作移回具有looper 的UI 线程?还是我遗漏了一些其他信息?

编辑 用例工作流程是:

-> 启动活动

-> 初始化

-> 执行可观察流

-> 启动新 Activity 并完成当前 Activity

-> 新活动

-> 开始原始活动并完成

-> 重复初始化

非常感谢。

详情:

  • RxJava 2.0.7
  • RxAndroid 2.0.1
  • Android sdk min 14 和 target 25

示例代码

listUseCase.execute(null)
            .doOnSubscribe(new Consumer<Disposable>() {
                @Override
                public void accept(@NonNull Disposable disposable) throws Exception {
                    getView().showLoading(true);
                }
            })
            .doFinally(new Action() {
                @Override
                public void run() throws Exception {
                    getView().showLoading(false);
                }
            })
            .subscribeOn(schedulerProvider.io())
            .observeOn(schedulerProvider.main())
            .subscribe(
                    new Consumer<List<AccountEntity>>() {
                        @Override
                        public void accept(@NonNull List<AccountEntity> accountEntities) throws Exception {
                            getView().setAccounts(accountEntities);
                        }
                    },
                    new Consumer<Throwable>() {
                        @Override
                        public void accept(@NonNull Throwable throwable) throws Exception {
                            if (isViewAttached()) {
                                getView().showError(throwable.getMessage());
                            }
                        }
                    }
            );

堆栈跟踪:

FATAL EXCEPTION: RxCachedThreadScheduler-1
  Process: com.example.android.demo.customerfirst.alpha, PID: 16685
  java.lang.IllegalStateException: The current thread must have a looper!
      at android.view.Choreographer$1.initialValue(Choreographer.java:96)
      at android.view.Choreographer$1.initialValue(Choreographer.java:91)
      at java.lang.ThreadLocal$Values.getAfterMiss(ThreadLocal.java:430)
      at java.lang.ThreadLocal.get(ThreadLocal.java:65)
      at android.view.Choreographer.getInstance(Choreographer.java:192)
      at android.animation.ValueAnimator$AnimationHandler.<init>(ValueAnimator.java:600)
      at android.animation.ValueAnimator$AnimationHandler.<init>(ValueAnimator.java:575)
      at android.animation.ValueAnimator.getOrCreateAnimationHandler(ValueAnimator.java:1366)
      at android.animation.ValueAnimator.end(ValueAnimator.java:998)
      at android.graphics.drawable.AnimatedVectorDrawable.stop(AnimatedVectorDrawable.java:439)
      at android.widget.ProgressBar.stopAnimation(ProgressBar.java:1523)
      at android.widget.ProgressBar.onVisibilityChanged(ProgressBar.java:1583)
      at android.view.View.dispatchVisibilityChanged(View.java:8643)
      at android.view.View.setFlags(View.java:9686)
      at android.view.View.setVisibility(View.java:6663)
      at android.widget.ProgressBar.setVisibility(ProgressBar.java:1563)
      at com.example.android.demo.customerfirst.featuresstore.list.ProductListActivity.showLoading(ProductListActivity.java:121)
      at com.example.android.demo.customerfirst.featuresstore.list.ProductListPresenterMediator$3.run(ProductListPresenterMediator.java:56)
      at io.reactivex.internal.operators.observable.ObservableDoFinally$DoFinallyObserver.runFinally(ObservableDoFinally.java:144)
      at io.reactivex.internal.operators.observable.ObservableDoFinally$DoFinallyObserver.onComplete(ObservableDoFinally.java:94)
      at io.reactivex.internal.observers.DisposableLambdaObserver.onComplete(DisposableLambdaObserver.java:73)
      at io.reactivex.internal.observers.DeferredScalarDisposable.complete(DeferredScalarDisposable.java:84)
      at io.reactivex.internal.operators.observable.ObservableFromCallable.subscribeActual(ObservableFromCallable.java:52)
      at io.reactivex.Observable.subscribe(Observable.java:10700)
      at io.reactivex.internal.operators.observable.ObservableDoOnLifecycle.subscribeActual(ObservableDoOnLifecycle.java:33)
      at io.reactivex.Observable.subscribe(Observable.java:10700)
      at io.reactivex.internal.operators.observable.ObservableDoFinally.subscribeActual(ObservableDoFinally.java:45)
      at io.reactivex.Observable.subscribe(Observable.java:10700)
      at io.reactivex.internal.operators.observable.ObservableSubscribeOn$1.run(ObservableSubscribeOn.java:39)
      at io.reactivex.Scheduler$1.run(Scheduler.java:138)
      at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
      at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
      at java.util.concurrent.FutureTask.run(FutureTask.java:237)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:152)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:265)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:587)
      at java.lang.Thread.run(Thread.java:818)

【问题讨论】:

    标签: java android rx-java2


    【解决方案1】:

    出现问题是因为我没有在活动完成/销毁时处置订阅。

    现在每个活动/视图都会告诉演示者它们何时停止或销毁,并且演示者处理订阅。

    这似乎解决了我的问题。

     @Override
    public void initialize() {
        if (!isViewAttached()) {
            throw new ViewNotAttachedException();
        }
        disposable = listUseCase.execute(null)
                .subscribeOn(schedulerProvider.io()) // Move subscribe on here
                .observeOn(schedulerProvider.main()) // Change threads here
                .doOnSubscribe(new Consumer<Disposable>() {
                    @Override
                    public void accept(@NonNull Disposable disposable) throws Exception {
                        getView().showLoading(true); // This should be on the main thread also
                    }
                })
                .doFinally(new Action() {
                    @Override
                    public void run() throws Exception {
                        getView().showLoading(false);
                    }
                })
                .subscribe(
                        new Consumer<List<AccountEntity>>() {
                            @Override
                            public void accept(@NonNull List<AccountEntity> accountEntities) throws Exception {
                                getView().setAccounts(accountEntities);
                            }
                        },
                        new Consumer<Throwable>() {
                            @Override
                            public void accept(@NonNull Throwable throwable) throws Exception {
                                if (isViewAttached()) {
                                    getView().showError(throwable.getMessage());
                                }
                            }
                        }
                );
    }
    
    @Override
    public void dispose() {
        if (disposable != null) {
            disposable.dispose();
        }
    }
    

    【讨论】:

      【解决方案2】:

      您需要做的就是将observeOn 向上移动。 observeOn 方法更改了调用 onNextonErroronCompleted 的线程,这是内部操作和副作用的工作方式(通过提升)

      listUseCase.execute(null)
                  .subscribeOn(schedulerProvider.io()) // Move subscribe on here
                  .observeOn(schedulerProvider.main()) // Change threads here
                  .doOnSubscribe(new Consumer<Disposable>() {
                      @Override
                      public void accept(@NonNull Disposable disposable) throws Exception {
                          getView().showLoading(true); // This should be on the main thread also
                      }
                  })
                  .doFinally(new Action() {
                      @Override
                      public void run() throws Exception {
                          getView().showLoading(false);
                      }
                  })
      
                  .subscribe(
                          new Consumer<List<AccountEntity>>() {
                              @Override
                              public void accept(@NonNull List<AccountEntity> accountEntities) throws Exception {
                                  getView().setAccounts(accountEntities);
                              }
                          },
                          new Consumer<Throwable>() {
                              @Override
                              public void accept(@NonNull Throwable throwable) throws Exception {
                                  if (isViewAttached()) {
                                      getView().showError(throwable.getMessage());
                                  }
                              }
                          }
                  );
      

      【讨论】:

      • 更改后问题仍然存在。我将在原始票证中添加更多信息,因为我可能在这个可观察的用例之外做错了什么。
      • 查看错误堆栈跟踪(或发布它)它应该告诉您哪一行在错误线程上调用。
      • @cryoxis 我发布了堆栈跟踪。我觉得这个动作正在回到一个没有弯针的已处理线程上。我没有做任何处置操作,所以我会去更新我的活动/视图及其演示者,以便在它们停止/暂停/销毁时处置可观察对象并回复给您。感谢您对升降机制的深入了解。我会回复它的进展情况。
      • @JTT 你没有回复。 :)
      • @milosmns 我已经发布了我自己的问题和解决方法的答案。这不是正确的做法吗?
      【解决方案3】:

      也许为时已晚。 但是请查看有关doFinally 的文档here 他们明确地说“

      在此 Observable 发出 onErroronCompleted被下游处置 信号后调用指定的操作。

      这意味着doFinally 无论如何都会被调用,并且不保证你有一个有效的上下文。

      我不使用doFinally 来处理这些事情。我总是设置加载(假)onNextonError。不漂亮但有效。

      【讨论】:

      • 是的。不能保证doFinally 会在observeOn 中指定的线程中被调用。对于这种情况,我个人更喜欢doAfterTerminate,文档中将其描述为“在此 Single 调用 onSuccess 或 onError 后调用”,这在使用 LiveData 时非常适合我。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-11-17
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-07-04
      相关资源
      最近更新 更多