【问题标题】:RxJava: prevent an observable from emitting until data from another observable is emittedRxJava:防止一个可观察对象发出,直到另一个可观察对象的数据被发出
【发布时间】:2020-03-05 16:02:04
【问题描述】:

下面的代码块设计为离线优先。如果内存 observable 发出数据,则本地和远程 observable 将永远不会触发。如果数据没有保存在内存中,本地 observable 将尝试从房间数据库中读取数据,如果一切都失败,那么远程 observable 会查询 API。

远程源使用改造来发送查询并返回一个可流动的,然后将其转换为可观察的。然而,在远程 observable 触发之前,我有另一个 observable 返回查询所需的位置数据。换句话说,远程可观察对象依赖于可观察对象的位置。在位置数据可用之前,如何使用 RxJava 防止远程 observable 在 Concat 运算符中被调用?

locationObservable = locationSource.getLocationObservable();
memory = source.getSuggestionsFromMemory();
local = source.getSuggestionsFromDisk();


remote = source.getSuggestionsFromNetwork(parameters)
                    .skipUntil(locationObservable);

locationObservable.subscribe(
                    source -> parameters = ParamManager.queryParameters(
                                    source.getLatitude() + "," + source.getLongitude()),

                    error -> Log.println(Log.ERROR, TAG, error.getMessage()
                    )
            );

Observable.concat(memory,local, remote)
            .firstElement()
            .subscribeOn(Schedulers.io())
            .toObservable()
            .observeOn(AndroidSchedulers.mainThread());

远程可观察对象:

public Observable<List<Venue>> getSuggestionsFromNetwork(HashMap<String, String> parameters){
    return remoteSource.getData(parameters).doOnNext(
            data -> {
                localSource.cacheDataToDisk(data);
                memorySource.cacheDataInMemory(data);
            });
}

远程来源:

Observable<List<Venue>> getData(HashMap<String, String> params){
    return Flowable.zip(loadSearchVenues(params), loadTrendingVenues(params),
            loadRecommendedVenues(params), (search, trending, recommended) -> {

                generalVenues = search.getResponse().getSuggestions();
                trendingVenues = trending.getResponse().getSuggestions();
                recommendedVenues = recommended.getResponse().getSuggestions();

                allVenues.addAll(generalVenues);
                allVenues.addAll(trendingVenues);
                allVenues.addAll(recommendedVenues);

                return allVenues;
            }).toObservable();
}

错误:

2019-11-15 09:18:08.703 29428-29491/com.example.suggest E/MemorySource: getData() called
2019-11-15 09:18:08.703 29428-29491/com.example.suggest E/LocalSource: getData() called
2019-11-15 09:18:08.767 29428-29428/com.example.suggest E/MainViewModel: Query map was null (parameter #3)

【问题讨论】:

  • 你问的问题真的很糟糕 :) 试着更好地解释你想要达到的目标
  • 好的,现在问题的措辞如何
  • 如果我正确理解你的问题,你想先从内存中获取数据,如果它不存在从数据库中,如果它失败了也从 API 获取
  • 所以你想读取本地数据,如果缓存未命中上线,总是(也就是你不关心过时的内容),对吗?
  • 是的,这就是上面的代码所做的。如果没有任何数据可用,复合 observables 将调用 onComplete 而不发出数据。但是,为了让远程 observable 查询 API,它需要设备的经度和纬度。我试图弄清楚有没有办法阻止远程 observable 在位置数据可用之前被调用。

标签: rx-java rx-android


【解决方案1】:

如果您可以等到下一个位置发射,您可以执行以下操作:

locationObservable = locationSource.getLocationObservable();
memory = source.getSuggestionsFromMemory();
local = source.getSuggestionsFromDisk();
remote = locationObservable
           .map(source -> ParamManager.queryParameters(source.getLatitude() + "," 
                     + source.getLongitude()))
           .concatMap(params -> source.getSuggestionsFromNetwork(params));

Observable
  .concat(memory,local, remote)
  .firstElement()
...

但是如果你不能,你必须将最后一个位置存储在一个可以直接使用的变量中,比如:

remote = Optional.ofNullable(getLastLocation())
           .map(Observable::just)
           .orElse(locationObservable)
           .map(source -> ParamManager.queryParameters(source.getLatitude() + "," 
                     + source.getLongitude()))
           .concatMap(params -> source.getSuggestionsFromNetwork(params));

还有其他地方:

locationObservable.subscribe(location -> setLastLocation(location));

【讨论】:

  • 感谢您的帮助。我的问题很难理解吗?这是我收到的一些反馈
  • 没有上面的元素,这是可以理解的。我也赞成
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2022-08-19
相关资源
最近更新 更多