【问题标题】:How to test that observable uses correct schedulers in RxJava?如何在 RxJava 中测试 observable 使用正确的调度程序?
【发布时间】:2016-07-29 19:42:14
【问题描述】:

我有以下课程(简化):

public class Usecase<T> {
    private final Observable<T> get;
    private final Scheduler observeScheduler;

    public Usecase(Observable<T> get, Scheduler observeScheduler) {
        this.get = get;
        this.observeScheduler = observeScheduler;
    }

    public Observable<T> execute() {
        return get.subscribeOn(Schedulers.io()).observeOn(observeScheduler);
    }
}

我正在为它编写单元测试。如何测试 subscribeOnobserveOn 是否使用正确的值调用?

我尝试以下方法:

    Observable<String> observable = mock(Observable.class);
    Usecase<String> usecase = new Usecase(observable, Schedulers.computation());
    usecase.execute();

    verify(observable).subscribeOn(Schedulers.computation()); // should fail here, but passes
    verify(observable).observeOn(Schedulers.computation());   // should pass, but fails: Missing method call for verify(mock) here

上述失败(我认为)因为subscribeOnobserveOnfinal 方法。那么是否有其他方法可以确保 observable 使用正确的调度程序?

【问题讨论】:

    标签: unit-testing mockito rx-java


    【解决方案1】:

    Observable 上应用运算符会返回一个新的Observable,因此任何后续运算符应用都将发生在不同的对象上。您必须遵循 Observable 中的组合图来发现应用了哪些运算符以及使用了哪些参数。

    这在 RxJava 中不受支持,您必须依赖内部细节和反射。

    一般情况下,您不能假设事件的来源位置,但您可以申请 observeOn 以确保它们到达您选择的主题。

    【讨论】:

      【解决方案2】:

      There is a way 间接访问可观察对象正在运行和被观察的两个线程,这意味着您实际上可以验证 Observable 使用正确的调度程序。

      我们仅限于按名称验证线程。幸运的是,Schedulers.io() 使用的线程以我们可以匹配的一致前缀命名。这是(完整?)具有唯一前缀供参考的调度程序列表:

      • Schedulers.io() - RxCachedThreadScheduler
      • Schedulers.newThread() - RxNewThreadScheduler
      • Schedulers.computation() - RxComputationThreadPool

      要验证 Observable 是否在 IO 线程订阅

      // Calling Thread.currentThread() inside Observer.OnSubscribe will return the
      // thread the Observable is running on (Schedulers.io() in our case)
      Observable<String> obs = Observable.create((Subscriber<? super String> s) -> {
          s.onNext(Thread.currentThread().getName());
          s.onCompleted();
      })
      
      // Schedule the Observable
      Usecase usecase = new Usecase(obs, Schedulers.immediate());
      Observable usecaseObservable = usecase.execute();
      
      // Verify the Observable emitted the name of the IO thread
      String subscribingThread = usecaseObservable.toBlocking().first();
      assertThat(subscribingThread).startsWith("RxCachedThreadScheduler");
      

      要验证 Observable 是否在计算线程观察到,您可以使用TestSubscriber#getLastSeenThread 访问用于观察的最后一个线程。

      TestSubscriber<Object> subscriber = TestSubscriber.create();
      UseCase usecase = new UseCase(Observable.empty(), Schedulers.computation())
      usecase.execute().subscribe(subscriber);
      
      // The observable runs asynchronously, so wait for it to complete
      subscriber.awaitTerminalEvent();
      subscriber.assertNoErrors();
      
      // Verify the observable was observed on the computation thread
      String observingThread = subscriber.getLastSeenThread().getName();
      assertThat(observingThread).startsWith("RxComputationThreadPool");
      

      不需要第三方库或模拟,尽管我使用AssertJ 来实现流畅的startsWith 断言。

      【讨论】:

        猜你喜欢
        • 2017-01-20
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2021-04-22
        相关资源
        最近更新 更多