【问题标题】:RxJava block stream until other observer finishRxJava 阻塞流直到其他观察者完成
【发布时间】:2018-01-21 15:21:16
【问题描述】:

我从一种名为 getAlldata() 的方法中获得了 Flowable 这个方法会从服务器获取数据,然后根据已经存储在数据库中的数据修改返回的数据。

所以这个方法的过程是这样的:

  1. 从服务器获取数据
  2. doOnNext 为每个项目获取 id
  3. 通过id获取本地数据。
  4. 修改当前项

问题是:

结果将在实际修改doOnNext() 中的数据之前返回,因为从数据库获取本地数据是另一个可观察的。

问题

如何延迟流直到doOnNext() 上的另一个可观察对象完成?

代码

   private Flowable<List<MyModule>> getAlldata() {
         return remoteDataSource
                            .getData().flatMap(data -> Flowable.fromIterable(data))
                            .doOnNext(new Consumer<MyModule>() {
                                @Override
                                public void accept(MyModule singleItem) throws Exception {
                                    localDataSource.getData(singleItem.getId())
                                    .firstElement().toFlowable()
                                    .subscribe(new Consumer<Optional<MyModule>>() {
                                        @Override
                                        public void accept(Optional<MyModule> itemOptional) throws Exception {
                                            if (itemOptional.isPresent()) {
                                               // modify the item
                                            }

                                        }
                                    });


                                }
                            })
                            .distinct()
                            .sorted(ProductsRepository.this::sortItems)
                            .toList().toFlowable();
    }

【问题讨论】:

  • 你需要用 doOnNext 正确映射你的每个 api 响应
  • 是的,对于每一项。

标签: java android rx-java rx-java2


【解决方案1】:

使用 flatmap 运算符而不是 doOnNext(),它将 singleItem 作为参数并抛出 localdatabase 类型的 observable。所以现在你可以映射响应而不是让你的 Observable 等待。

Observable<ModuleData> obs = remoteDataSource
         .getData().flatMapIterable(data ->data)
         .flatMap(singleItem->localDataSource.getData(singleItem.getId()))
          .distinct()
          .sorted(ProductsRepository.this::sortItems)
          .toList().toFlowable();

【讨论】:

  • 谢谢,但还是不清楚?无法解析方法 'zipWith(io.reactivex.Flowable>, ) 以及如何获取 singleItem ?为了得到 id。
  • 检查zip是否有超载接收物品
  • 根据我的理解 zipwith 应该采用 ether Iterable 还是 Publisher?所以问题是什么进入 zipWith() ?
  • 但为什么会出现此错误无法解析方法'zipWith(io.reactivex.Flowable>, )
  • 但我不想返回 localdatabase 类型,因为本地数据库返回 Optional 并且我想返回 Flowable> 。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-06-22
  • 1970-01-01
  • 2014-08-26
  • 1970-01-01
  • 2021-07-30
相关资源
最近更新 更多