【问题标题】:RxJava testing : how to wait for all background tasks to completeRxJava 测试:如何等待所有后台任务完成
【发布时间】:2016-08-24 11:33:26
【问题描述】:

TLDR:我在 RxJava Observables 中正在进行后台处理,我正在进行集成测试,我希望能够独立等待该处理完成,以确保从一个测试开始的后台处理不会干扰另一个测试测试。


简单来说,我有一个 @RequestMapping 方法,它执行以下操作:

  • 在数据库中插入数据
  • 启动对该数据的异步处理(通过 Feign 进行 http 调用,数据库更新)
  • 什么都不返回 (HttpStatus.NO_CONTENT)

此异步处理之前是使用ThreadPoolTaskExecutor 完成的。我们将过渡到 RxJava,并希望删除此 ThreadPoolTask​​Executor 并使用 RxJava 进行后台处理。

我当时非常天真地尝试这样做:

Observable
    .defer(() -> Observable.just(call to long blocking method) 
    .subscribeOn(Schedulers.io())
    .subscribe();

最终目标当然是一步一步地进入“调用长阻塞方法”并一直使用 Observable。

在此之前,我想先让我的集成测试工作。我正在通过对映射进行 RestTemplate 调用来测试这一点。由于大部分工作都是异步的,我的调用返回非常快。现在我想找到一种方法来等待异步处理完成(以确保它不会与另一个测试冲突)。

在 RxJava 之前,我只会计算 ThreadPoolTask​​Executor 中的任务,然后等到它达到 0。

如何使用 RxJava 做到这一点?


我尝试了什么:

  • 我尝试使用 RxJavaSchedulersHook 使我的所有调度程序立即执行:这会导致某处出现某种阻塞,代码执行在我的 Feign 调用之前停止(Feign 在后台使用 RxJava)
  • 我尝试使用 Rx RxJavaObservableExecutionHook 对任务进行计数:我尝试保留订阅,并在 isSubcribed = false 时删除它们,但这根本不起作用(很多订阅者,计数永远不会减少)
  • 我试图在实际的生产代码中加入一个observeOn(immediate())。这似乎可行,我可以为运行时/测试阶段注入正确的调度程序,但我并不热衷于将仅用于测试目的的代码放入我的实际生产代码中。

我可能大错特错,或者事情过于复杂,所以请不要犹豫,纠正我的推理!

【问题讨论】:

    标签: java spring-boot rx-java spring-cloud


    【解决方案1】:

    您可以使用ExecutorServiceAdapter 将Spring ThreadPoolTaskExecutor 桥接到RX 中的ExecutorService,然后执行与以前相同的技巧。

    【讨论】:

    • 这确实是解决我的问题的最简单和最干净的解决方案,无需更改运行时代码以进行测试。感谢您的洞察力,我没有意识到这两个库可以如此轻松地桥接!
    • 嗯,这工作正常,直到我“完全 rx”... 从逻辑上讲,任务并不是从一开始就排队,而是根据需要来来去去。因此,您可以在 ThreadPool 上有 0 个任务,并且仍然处于处理过程中......
    • 也许你应该断言一些不同的东西(例如DeferredResult)?
    • 是的,我真的在考虑返回一个 SseEmiter ... 这样我就可以让客户等待第一个事件(快速事件),而其他客户(在测试中)等待为所有事件完成。这将是最干净的恕我直言。我刚换了一会儿,但会尽快更新所有这些
    【解决方案2】:

    你如何返回HttpStatus.NO_CONTENT

    @RequestMapping(value = "/")
    public HttpStatus home() {
        Observable.defer(() -> Observable.just(longMethod())
                  .subscribeOn(Schedulers.io())
                  .subscribe();
        return HttpStatus.NO_CONTENT;
    }
    

    在这种形式中,您无法知道longMethod 何时结束。

    如果您想知道所有异步作业何时完成,您可以在所有作业完成后返回HttpStatus.NO_CONTENT,使用Spring DefferedResult 或使用TestSubscriber

    PS:如果需要,您可以使用Observable.fromCallable(() -> longMethod()); 代替Observable.defer(() -> Observable.just(longMethod());

    使用 DeferedResult

    @RequestMapping(value = "/")
    public DeferredResult<HttpStatus> index() {
        DeferredResult<HttpStatus> deferredResult = new DeferredResult<HttpStatus>();
        Observable.fromCallable(() -> longMethod())
                .subscribeOn(Schedulers.io())
                .subscribe(value -> {}, e -> deferredResult.setErrorResult(e.getMessage()), () -> deferredResult.setResult(HttpStatus.NO_CONTENT))
        return deferredResult;
    }
    

    像这样,如果你调用你的方法,你只会在你的 observable 完成时得到你的结果(所以,当 longMethod 完成时)

    使用 TestSubscriber

    您必须注入 TestSubscriber 并在要求他等待/检查您的 Observable 的完成时:

    @RequestMapping(value = "/")
    public HttpStatus home() {
        Observable.defer(() -> Observable.just(longMethod())
                  .subscribeOn(Schedulers.io())
                  .subscribe(subscriber); // you'll have to inject this subscriber in your test
        return HttpStatus.NO_CONTENT;
    }
    

    在你的测试中:

     TestSubscriber subscriber = new TestSubscriber(); // you'll have to inject it into your controller
     // ....
     controller.home();
     subscriber.awaitTerminalEvent();
     subscriber.assertCompleted(); // check that no error occurred
    

    【讨论】:

    • 非常感谢您的意见,我不知道 TestSubscriber,并设法得到一些非常干净的东西来使用它,但是,我仍然必须更改运行时代码以进行测试,并且我真的不喜欢那样。使用 ThreadPoolTask​​Executor 还解决了另一个出现的问题,所以我选择了 Dave 的答案。非常感谢!
    【解决方案3】:

    几个月后的比赛:我的建议是“不要那样做”。 RxJava 不太适合这种工作。无需过多详细说明,在后台运行大量“松散”的 Observable 是不合适的:根据您的请求量,您很容易陷入队列和内存问题,更重要的是,所有计划和正在运行的任务会发生什么如果网络服务器崩溃?你如何重新启动它?

    Spring 提供了其他更好的选择,恕我直言:Spring BatchSpring Cloud Task、与Spring Cloud Stream 发送消息,所以不要像我一样做,只需使用正确的工具来完成正确的工作。

    现在如果你真的想走坏路:

    • 要么返回一个 SseEmmitter 并仅在消费者服务中使用来自 SSE 的第一个事件,并在您的测试中使用所有事件
    • 要么创建一个 RxJava lift 运算符,该运算符将订阅者(在 call 方法中)包装在具有 waitForCompletion 方法的父订阅者中。你如何等待取决于你(例如CountDownLatch)。该订阅者将被添加到同步列表中(并在完成后从其中删除),并且在您的测试中,您可以遍历列表并在列表的每个项目上调用waitForCompletion。这不是那么复杂,我让它工作,但请不要那样做!

    【讨论】:

      猜你喜欢
      • 2011-03-17
      • 1970-01-01
      • 2017-04-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-08-14
      • 2011-09-03
      • 1970-01-01
      相关资源
      最近更新 更多