【问题标题】:RxAndroid, Retrofit 2 unit test Schedulers.ioRxAndroid,Retrofit 2 单元测试 Schedulers.io
【发布时间】:2018-03-29 23:49:03
【问题描述】:

我刚开始学习 RxAndroid,但不幸的是我学习的书没有涉及任何单元测试。我在谷歌上搜索了很多,但没有找到任何简单的教程以精确的方式涵盖 RxAndroid 单元测试。

我基本上已经使用 RxAndroid 和 Retrofit 2 编写了一个小的 REST API。这是 ApiManager 类:

public class MyAPIManager {
    private final MyService myService;

    public MyAPIManager() {
        HttpLoggingInterceptor logging = new HttpLoggingInterceptor();
        // set your desired log level
        logging.setLevel(HttpLoggingInterceptor.Level.BODY);

        OkHttpClient.Builder b = new OkHttpClient.Builder();
        b.readTimeout(35000, TimeUnit.MILLISECONDS);
        b.connectTimeout(35000, TimeUnit.MILLISECONDS);
        b.addInterceptor(logging);
        OkHttpClient client = b.build();

        Retrofit retrofit = new Retrofit.Builder()
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                .addConverterFactory(GsonConverterFactory.create())
                .baseUrl("http://192.168.1.7:8000")
                .client(client)
                .build();

        myService = retrofit.create(MyService.class);
    }

    public Observable<Token> getToken(String username, String password) {
        return myService.getToken(username, password)
                .subscribeOn(Schedulers.io());
                .observeOn(AndroidSchedulers.mainThread());
    }
}

我正在尝试为getToken 创建一个单元测试。这是我的示例测试:

public class MyAPIManagerTest {
    private MyAPIManager myAPIManager;
    @Test
    public void getToken() throws Exception {
        myAPIManager = new MyAPIManager();

        Observable<Token> o = myAPIManager.getToken("hello", "mytoken");
        o.test().assertSubscribed();
        o.test().assertValueCount(1);
    }

}

由于subscribeOn(Schedulers.io),上述测试没有在主线程上运行,因此它返回 0 值。如果我从MyAPIManager 中删除subscribeOn(Schedulers.io),那么它运行良好并返回1 个值。有没有办法用Schedulers.io进行测试?

【问题讨论】:

    标签: android unit-testing retrofit2 rx-android rx-java2


    【解决方案1】:

    很好的问题,当然是社区中缺乏大量报道的一个主题。我想分享几个我个人使用过的非常棒的解决方案。这些是为 RxJava 2 考虑的,但它们在 RxJava 1 中可用,只是名称不同。如果需要,您一定会找到它。

    1. RxPlugins 和 RxAndroidPlugins(这是我目前最喜欢的)

    所以 Rx 实际上提供了一种机制来更改 SchedulersAndroidSchedulers 内部的静态方法提供的调度程序。例如:

    RxJavaPlugins.setComputationSchedulerHandler
    RxJavaPlugins.setIoSchedulerHandler
    RxJavaPlugins.setNewThreadSchedulerHandler
    RxJavaPlugins.setSingleSchedulerHandler
    RxAndroidPlugins.setInitMainThreadSchedulerHandler
    

    它们的作用非常简单。他们确保当您调用时,即Schedulers.io(),返回的调度程序是您在setIoSchedulerHandler 中设置的处理程序中提供的调度程序。您要使用哪个调度程序?那么你想要Schedulers.trampoline()。这意味着代码将在与以前相同的线程上运行。如果所有调度程序都在蹦床调度程序中,那么所有调度程序都将在JUnit 线程上运行。测试运行后,您可以通过调用来清理整个事情:

    RxJavaPlugins.reset()
    RxAndroidPlugins.reset()
    

    我认为最好的方法是使用 JUnit 规则。这是一个可能的(抱歉,kotlin 语法):

    class TrampolineSchedulerRule : TestRule {
      private val scheduler by lazy { Schedulers.trampoline() }
    
      override fun apply(base: Statement?, description: Description?): Statement =
            object : Statement() {
                override fun evaluate() {
                    try {
                        RxJavaPlugins.setComputationSchedulerHandler { scheduler }
                        RxJavaPlugins.setIoSchedulerHandler { scheduler }
                        RxJavaPlugins.setNewThreadSchedulerHandler { scheduler }
                        RxJavaPlugins.setSingleSchedulerHandler { scheduler }
                        RxAndroidPlugins.setInitMainThreadSchedulerHandler { scheduler }
                        base?.evaluate()
                    } finally {
                        RxJavaPlugins.reset()
                        RxAndroidPlugins.reset()
                    }
                }
            }
    }
    

    在单元测试的顶部,您只需要声明一个带有 @Rule 注释并使用此类实例化的公共属性:

    @Rule
    public TrampolineSchedulerRule rule = new TrampolineSchedulerRule()
    

    在科特林中

    @get:Rule
    val rule = TrampolineSchedulerRule()
    
    1. 注入调度程序(也称为依赖注入)

    另一种可能性是在你的类中注入调度程序,这样在测试时你可以再次注入Schedulers.trampoline(),在你的应用程序中你可以注入正常的调度程序。这可能会工作一段时间,但是当您需要为一个简单的类注入大量调度程序时,它很快就会变得很麻烦。这是执行此操作的一种方法

    public class MyAPIManager {
      private final MyService myService;
      private final Scheduler io;
      private final Scheduler mainThread;
    
      public MyAPIManager(Scheduler io, Scheduler mainThread) {
        // initialise everything
        this.io = io;
        this.mainThread = mainThread;
      }
    
      public Observable<Token> getToken(String username, String password) {
        return myService.getToken(username, password)
                .subscribeOn(io);
                .observeOn(mainThread);
      }
    }
    

    如您所见,我们现在可以告诉班级实际的调度程序。在您的测试中,您会执行以下操作:

    public class MyAPIManagerTest {
      private MyAPIManager myAPIManager;
      @Test
      public void getToken() throws Exception {
        myAPIManager = new MyAPIManager(
              Schedulers.trampoline(),
              Schedulers.trampoline());
    
        Observable<Token> o = myAPIManager.getToken("hello", "mytoken");
        o.test().assertSubscribed();
        o.test().assertValueCount(1);
      }
    }
    

    重点是:

    • 您希望它在 Schedulers.trampoline() 调度程序上以确保一切都在 JUnit 线程上运行

    • 您需要能够在测试时修改调度程序。

    就是这样。希望对您有所帮助。

    ================================================ ==========

    这是我在上面的 Kotlin 示例之后使用的 Java 版本:

    public class TrampolineSchedulerRule implements TestRule {
        @Override
        public Statement apply(Statement base, Description description) {
            return new MyStatement(base);
        }
    
        public class MyStatement extends Statement {
            private final Statement base;
    
            @Override
            public void evaluate() throws Throwable {
                try {
                    RxJavaPlugins.setComputationSchedulerHandler(scheduler -> Schedulers.trampoline());
                    RxJavaPlugins.setIoSchedulerHandler(scheduler -> Schedulers.trampoline());
                    RxJavaPlugins.setNewThreadSchedulerHandler(scheduler -> Schedulers.trampoline());
                    RxJavaPlugins.setSingleSchedulerHandler(scheduler -> Schedulers.trampoline());
                    RxAndroidPlugins.setInitMainThreadSchedulerHandler(scheduler -> Schedulers.trampoline());
                    base.evaluate();
                } finally {
                    RxJavaPlugins.reset();
                    RxAndroidPlugins.reset();
                }
            }
    
            public MyStatement(Statement base) {
                this.base = base;
            }
        }
    }
    

    【讨论】:

    • 非常感谢。为了简单起见,我在转换为 Java 后遵循了 Kotlin 示例,它运行良好。
    • 还有第三种方式。在测试中,您可以调用Observable.blockingFirst()Observable.blockingIterable(),而不是订阅。这样,测试线程等待 observable 完成,当它完成时,您可以验证结果。它不如使用Schedulers.trampoline() 干净,但它需要较少的设置。
    猜你喜欢
    • 2020-02-19
    • 2021-10-17
    • 2017-03-09
    • 1970-01-01
    • 1970-01-01
    • 2017-08-25
    • 1970-01-01
    • 2020-01-02
    • 1970-01-01
    相关资源
    最近更新 更多