【问题标题】:Making an RxJava Operator Chain Concurrent使 RxJava 操作符链并发
【发布时间】:2016-01-15 04:22:17
【问题描述】:

我刚刚开始使用 RxJava。我一直在尝试构建一个数据管道,从不同来源下载大量数据并以并发方式将数据插入数据库。

我的基本管道形式如下所示:

        Observable.range(1, 5)
            .concatMap((i) -> {
                return Observable.range(i, 2);
            })
            .concatMap((i) -> {
                return Observable.range(i, 2);
            })
            .subscribe((i) -> { System.out.println(i); }, System.out::println,() -> { System.out.println("Complete"); });

每当我调用 observeOn 而不是运行并打印出上面打印出的所有数字时,什么都不会打印出来。为什么是这样?我希望下一个 concatMap 和 subscribe 也只会使用计算调度程序。代码贴在下面。

        Observable.range(1, 5)
            .concatMap((i) -> {
                return Observable.range(i, 2);
            })
            .observeOn(Schedulers.computation())
            .concatMap((i) -> {
                return Observable.range(i, 2);
            })
            .subscribe((i) -> { System.out.println(i); }, System.out::println,() -> { System.out.println("Complete"); });

【问题讨论】:

    标签: java asynchronous reactive-programming rx-java


    【解决方案1】:

    这是一个猜测,因为您没有提供上下文,但如果您正在更改线程,则必须阻止,因为 main 没有被阻止,并且您可能在其他调度程序有机会运行之前终止:

    Observable.range(1, 5)
            .concatMap((i) -> {
                return Observable.range(i, 2);
            })
            .observeOn(Schedulers.computation())
            .concatMap((i) -> {
                return Observable.range(i, 2);
            })
            .subscribe((i) -> { System.out.println(i); }, System.out::println,() -> { System.out.println("Complete"); });
    // block to let other schedulers finish
    Thread.sleep(3000);
    

    【讨论】:

    • 是的,这最终成为了问题。是否有任何调度程序可以在调用代码的主线程上调度操作员?我查看了文档,没有看到任何看起来像我需要的东西。
    • 试试Schedulers.immediate()或使用Observable.toBlocking()
    • 立即尝试,无效。我本质上是想让上面的代码等到观察者都完成而没有任意睡眠。我在想也许可以创建一个 ThreadPool,将其传递到 Executor 接口,然后等待 ThreadPool 完成。
    • 这里仍然缺少上下文。这是一个 x-y 问题。如果您正在等待多个可观察对象,也许您应该 Observable.merge() 它们并等待它们通过阻塞完成?也许使用CountDownLatch 阻塞主线程并使用Observable.doOnComplete() 从可观察对象中倒数?根据具体情况,这里可以使用许多策略。
    • 感谢您的帮助!
    【解决方案2】:

    在阅读了您的 cmets 以获得更多上下文之后,我假设您想要请求获取 id 列表(项目摘要),然后为每个返回的 id 发出一个额外请求以获取每个项目的详细信息,然后执行带有结果的东西(项目)。

    考虑到您已经有了构建和触发请求的方法,返回一个Observable。您可以包装与 http 相关的代码并从中延迟一个 Observable,或者从 Future 创建一个 Observable,以防您的 http 库返回 Futures。

    // you have this available
    Observable<Summary> = querySummary(summaryId);
    Observable<Item> = queryItem(itemId);
    

    我没有运行以下任何代码,因此请将其作为指导。你可以从:

    Observable<Item> itemsObservable = querySummary(summaryId)
        .flatMap(summary -> {
            return Observable.from(summary.getItemsIdsList());
        })
        .map(itemId -> {
            return queryItem(itemId);
        });
    

    您正在查询摘要,然后使用 flatMap 运算符发出单独的返回 ID。然后你可以map每个id来一个请求。最后,您可以订阅 itemsObservable 以让各个 Item 对象流动。

    如果您想将项目保存在数据库中,在订阅之前,您可以在最后一个map 的尾部插入一个doOnNext 并保存每个项目。如果你想从每个项目中求和一个值,或者做任何聚合,你可以使用reduce。依此类推..按照它的方式,在末尾有一个.subscribe(),这段代码在主线程中运行(以防你的http库没有任何花哨的线程池功能)。

    itemsObservable.doOnNext(item -> {
        // do something in whatever thread it might be running
    }).subscribe();
    

    如果您想触发摘要请求并异步等待它,然后在新的物品 ID 到达时同时触发物品的请求,您可以像以前一样添加 .observeOn(Schedulers.io())

    Observable<Item> itemsObservable = querySummary(summaryId)
        .observeOn(Schedulers.io()) // from here, continue in the background
        .flatMap(summary -> { // ...
    

    请注意,您正在告诉 Observable 链跟随另一个线程中的执行并忘记它。在同样的情况下,这正是你想要的。但是,如果您从普通的public static void main() 方法作为程序主体运行它,您的程序将在后台执行结束之前关闭,这是意料之中的。如果你直接处理线程对象的引用,你应该自己负责join()线程,让主程序等待它们。

    由于响应式流旨在隐藏线程复杂性,您只需要“说”您想在聚会结束时聚在一起。在这种情况下,要将执行带回主线程,您可以插入.toBlocking().toList()。然后,您可以在当前线程中订阅的onNext 安全地获取结果。

    itemsObservable
        .toBlocking()
        .subscribe(item -> {
            // blocking the main thread
            // do something on each item 
    });
    

    或者..

    itemsObservable
        .toList()
        .subscribe(itemsList -> {
            // blocking the main thread
            // do something with the whole list at once
    });
    

    有很多不同的方法可以组合 Observable,创建一个或多个流,合并它们,异步运行或不运行它们。所以,这取决于您的需求。

    希望对你有帮助!

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-03-31
      • 2020-12-05
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多