【问题标题】:In an unit test, how to wait for an RxJava2 Observable to finish before asserting results在单元测试中,如何在断言结果之前等待 RxJava2 Observable 完成
【发布时间】:2018-05-06 11:04:24
【问题描述】:

我正在尝试测试订阅冷可观察的代码的一部分。我的问题是单元测试在断言结果之前没有等待内部可观察调用结束,因此我的测试失败了。

这是我要测试的代码:

public class UserManager {

    private UserRepository userRepository;
    private PublishSubject<List<User>> usersSubject = PublishSubject.create();

    public UserManager(UserRepository userRepository) {
        this.userRepository = userRepository;
    }

    public Observable<List<User>> users() {
        return usersSubject;
    }

    public void onUserIdsReceived(List<String> userIds) {
        userRepository.getUsersInfo(userIds)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new SingleObserver<List<User>>() {
                @Override
                    public void onSubscribe(@NonNull Disposable d) {}

                    @Override
                    public void onSuccess(@NonNull List<User> users) {
                        usersSubject.onNext(users);
                    }

                    @Override
                    public void onError(Throwable e) {}
                }
    }
}

这是我的测试:

@RunWith(MockitoJUnitRunner::class)
class UserManagerTest {

private val userRepository = mock(UserRepository::class.java)

companion object {
    @BeforeClass
    @JvmStatic
    fun setup() {
        RxJavaPlugins.setIoSchedulerHandler({ _ -> TestScheduler() })
        RxJavaPlugins.setSingleSchedulerHandler({ _ -> TestScheduler() })
        RxJavaPlugins.setNewThreadSchedulerHandler({ _ -> TestScheduler() })
        RxJavaPlugins.setComputationSchedulerHandler({ _ -> TestScheduler() })
        RxAndroidPlugins.setInitMainThreadSchedulerHandler({ _ -> TestScheduler() })
        RxAndroidPlugins.setMainThreadSchedulerHandler({ _ -> TestScheduler() })
    }
}

@Test
fun testReceivedUserIdResultsInPushedUser() {
    // Given
    val usersIds = mutableListOf<String>()
    usersIds.add("id1")
    val usersDetails = mutableListOf<User>()
    usersDetails.add(User.testUser())
    given(userRepository.getUsersInfo(usersIds)).willReturn(Single.just(usersDetails))

    val userManager = UserManager(userRepository)
    val testObserver = TestObserver<List<User>>()
    userManager.users().subscribeWith(testObserver)

    // When
    userManager.onUsersIdsReceived(usersIds)

    // Then
    testObserver.assertNoErrors()
    testObserver.assertNotComplete()
    testObserver.assertValueCount(1)
    testObserver.assertValue( { detectedUsers: List<DetectedUser> ->
        // My test
    })
}

我的测试失败是因为我的 testObserver 没有收到任何事件,最合乎逻辑的原因是测试没有等待 userRepository.getUsersInfo(userIds) 调用结束,即使我模拟了 UserRepository 并使用了测试调度器。 有没有办法等待内部 Observable 调用结束,从而使我的代码可测试?

【问题讨论】:

  • 你可以考虑使用TestObserver#awaitCount()

标签: android unit-testing rx-java rx-java2


【解决方案1】:

我正在使用trampoline,而不是像这样的TestScheduler,它是同步执行的。

// override Schedulers.io()
RxJavaPlugins.setIoSchedulerHandler(schedulerCallable -> Schedulers.trampoline());
// override AndroidSchedulers.mainThread()
RxAndroidPlugins.setInitMainThreadSchedulerHandler(schedulerCallable -> Schedulers.trampoline());

还可以考虑删除 RxAndroidPlugins.setMainThreadSchedulerHandler({ _ -&gt; TestScheduler() }) 我想我有一些问题。

【讨论】:

    猜你喜欢
    • 2017-02-02
    • 2016-04-13
    • 1970-01-01
    • 1970-01-01
    • 2016-06-26
    • 2020-05-13
    • 1970-01-01
    • 2018-09-05
    • 2021-06-20
    相关资源
    最近更新 更多