【问题标题】:RxJava subscribe and observe on same thread as unit testRxJava 在与单元测试相同的线程上订阅和观察
【发布时间】:2015-03-18 17:56:10
【问题描述】:

我想为内部使用 RxJava 的组件编写一种“黑盒测试”。

在内部,它使用Retrofit 返回Observable 进行httpcall,然后使用.flatmap() 对从改造中检索到的数据进行未来处理。我们的想法是给该组件一个Transformer,以便像这样在观察者上设置调度程序:

class DefaultTransformer <T> implements Transformer<T, T> {

   public Observable<T> call(Observable<T> observable) { 
      return observable.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread());
   }
}

我的组件做这样的事情:

void execute(Transformer<T, T> scheduler){
     Observable<List<Team>> observable = retrofitApi.getLeague(leagueId, seasonId)
        .flatMap(new Func1<LeagueWrapper, Observable<List<Team>>>() {
          @Override public Observable<List<Team>> call(LeagueWrapper wrapper) {                
             return Observable.just(wrapper.getLeague().getTeams());
          }
        });

   observable.compose(transformer);

   observable.subscribe(this);
}

在生产中,我将DefaultTransformer 作为参数传递,但对于单元测试,我想提交一个Transformer,它与单元测试在同一线程上运行,所以一切都应该同步运行(而不是异步)。

我试过了:

class UnitTestTransformer <T> implements Transformer<T, T> {

       public Observable<T> call(Observable<T> observable) { 
          return observable.subscribeOn(Schedulers.test()).observeOn(AndroidSchedulers.test());
       }
    }

但它仍然在我的单元测试中异步运行。我也试过Scheduler.immediate()toBlocking() 似乎不是一个选项,因为它不再是 Observable。知道有什么问题吗?

【问题讨论】:

  • observable.compose(transformer); 是问题所在。所有Observable 实例都是不可变的,调用compose 不会更改现有Observable,但会返回一个新实例(在您的代码中将被忽略)。 observable.compose(transformer).subscribe(this) 应该可以正常工作。
  • 还有一件事。 retrofit 返回的所有Observable 已经指定了Scheduler,因为retrofit 在内部调用subscribeOn。再调用一次subscribeOn 实际上并不会改变执行网络调用的最终调度程序。
  • 如果你想测试你的代码,你应该使用toBlocking操作符(在大多数RxJava单元测试中使用得非常多)
  • 嗨,感谢compose() 的提示!所以没有机会更新改造的SchedulertoBlock() 对我不起作用,因为我无法更改组件的 execute() 方法,或者您是否看到了解决方法? execute 方法将在内部调用(在其他非 RxJava 调用中)。我唯一能做的就是将Scheduler 设置为Component.setScheduler(Transformer t),这将作为参数传递给Component.execute()

标签: rx-java rx-android


【解决方案1】:

如果无法更改execute() 的调用模式,您可能需要尝试使用 RxJava 插件机制。

https://github.com/ReactiveX/RxJava/wiki/Plugins

您可以提供:

  • RxJavaSchedulersHook 覆盖测试执行期间提供的调度程序并让它们同步执行
  • RxJavaObservableExecutionHook 挂钩到 Observable 执行管道并使用某种同步方法(如 CountdownLatch)等待 Observable 订阅完成后再继续

【讨论】:

    【解决方案2】:

    我有一个类似的问题,并通过使用类 TestObserver (http://reactivex.io/RxJava/javadoc/rx/observers/TestObserver.html) 解决了它

    它有以下三种方法可以访问接收到的事件:

    getOnCompletedEvents()
    getOnErrorEvents()
    getOnNextEvents()
    

    如果您能以某种方式注入订阅者,我希望这也能对您有所帮助。 这是我如何测试它的示例:

    TestObserver<MyModel> testObserver = new TestObserver<>();
    myObservableSupplyingMethod().subscribe(testObserver);
    
    assertThat(testObserver.getOnErrorEvents().size()).isEqualTo(0);
    assertThat(testObserver.getOnNextEvents().get(0)).isNotNull();
    ...
    

    【讨论】:

      猜你喜欢
      • 2015-06-04
      • 2017-01-20
      • 2018-06-11
      • 1970-01-01
      • 2016-04-20
      • 2018-08-31
      • 2020-07-29
      • 2021-12-18
      • 2017-11-14
      相关资源
      最近更新 更多