【发布时间】:2016-08-24 11:33:26
【问题描述】:
TLDR:我在 RxJava Observables 中正在进行后台处理,我正在进行集成测试,我希望能够独立等待该处理完成,以确保从一个测试开始的后台处理不会干扰另一个测试测试。
简单来说,我有一个 @RequestMapping 方法,它执行以下操作:
- 在数据库中插入数据
- 启动对该数据的异步处理(通过 Feign 进行 http 调用,数据库更新)
- 什么都不返回 (
HttpStatus.NO_CONTENT)
此异步处理之前是使用ThreadPoolTaskExecutor 完成的。我们将过渡到 RxJava,并希望删除此 ThreadPoolTaskExecutor 并使用 RxJava 进行后台处理。
我当时非常天真地尝试这样做:
Observable
.defer(() -> Observable.just(call to long blocking method)
.subscribeOn(Schedulers.io())
.subscribe();
最终目标当然是一步一步地进入“调用长阻塞方法”并一直使用 Observable。
在此之前,我想先让我的集成测试工作。我正在通过对映射进行 RestTemplate 调用来测试这一点。由于大部分工作都是异步的,我的调用返回非常快。现在我想找到一种方法来等待异步处理完成(以确保它不会与另一个测试冲突)。
在 RxJava 之前,我只会计算 ThreadPoolTaskExecutor 中的任务,然后等到它达到 0。
如何使用 RxJava 做到这一点?
我尝试了什么:
- 我尝试使用 RxJavaSchedulersHook 使我的所有调度程序立即执行:这会导致某处出现某种阻塞,代码执行在我的 Feign 调用之前停止(Feign 在后台使用 RxJava)
- 我尝试使用 Rx RxJavaObservableExecutionHook 对任务进行计数:我尝试保留订阅,并在 isSubcribed = false 时删除它们,但这根本不起作用(很多订阅者,计数永远不会减少)
- 我试图在实际的生产代码中加入一个observeOn(immediate())。这似乎可行,我可以为运行时/测试阶段注入正确的调度程序,但我并不热衷于将仅用于测试目的的代码放入我的实际生产代码中。
我可能大错特错,或者事情过于复杂,所以请不要犹豫,纠正我的推理!
【问题讨论】:
标签: java spring-boot rx-java spring-cloud