【问题标题】:How to call api recursively using rxAndroid until some met condition?如何使用 rxAndroid 递归调用 api 直到满足某些条件?
【发布时间】:2019-09-25 05:52:05
【问题描述】:

我正在使用 rxAndroid 进行改造,试图从服务器获得响应,直到服务器返回 0 列表大小。

getPageAndNext(rows_id),它的接受起点 id 现在考虑它的 0,在下一个请求中,我将传递第 9 个 id,以便服务器在第 9 个 id 响应后返回。(10th,11th,...15th...19th )

 Observable<List<Feed>> getPageAndNext(int id) {
        return apiInterface.postGetFeed(membersIds, id)
                .flatMap(feedResponse -> {
                    if (feedResponse.getData().size() > 0) {
                        return Observable.just(feedResponse.getData())
                                .concatWith(getPageAndNext(feedResponse.getData().get(feedResponse.getData().size() - 1).getId()));

                    }
                    return Observer::onComplete;
                });

    }

这里每个请求,我得到 10 个提要并在 onNext 方法中将提要插入到房间数据库中

getPageAndNext(0)
                .subscribeOn(Schedulers.computation())
                .subscribe(new DisposableObserver<List<Feed>>() {
                    @Override
                    public void onNext(List<Feed> feeds) {
                        //save to room database
                        memberDatabaseRepository.insertMemberList(feeds);

                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e("jjjj", "error"+e);
                    }

                    @Override
                    public void onComplete() {
                        Log.d("kkkk","done");
                    }
                });

现在我没有收到来自服务器的所有响应。我正在搜索过去 24 小时,但没有得到任何解决方案。

注意:之后我可以进行 76 次 API 调用,即使只有 76 次 API 调用保存在房间数据库中的数据,也没有得到任何响应。但大约有 630 个 API 调用

链接:-

How To Do Recursive Observable Call in RxJava?

How to ignore error and continue infinite stream?

RxJava takeUntil with emmision of last item?

How to make multiple requests with reactive android and retrofit

【问题讨论】:

    标签: android rx-java rx-java2 rx-android


    【解决方案1】:

    最后,我解决了 rxAndroid 中的递归调用 API。 takeUntil -> 我正在检查大小是否为零,然后停止递归调用。

    Observable.range(0, Integer.MAX_VALUE)
                    // Get each page in order.
                    .concatMap(integer -> {
                        return apiInterface.postGetFeed(membersIds, lastFeedId);
                    })
                    // Take every result up to and including the one where the next page index is null.
                    .takeUntil(result -> {
                        if(!result.getData().isEmpty())
                        lastFeedId = result.getData().get(result.getData().size() - 1).getId();
                        return result.getData().isEmpty();
                    })
                    .map(FeedResponse::getData)
                    .subscribeOn(Schedulers.io())
                    .subscribe(new DisposableObserver<List<Feed>>() {
                                   @Override
                                   public void onNext(List<Feed> feeds) {
                                       Log.d("kkkk", "" + feeds);
                                       memberDatabaseRepository.insertMemberList(feeds, lastFeedId);
    
                                   }
    
                                   @Override
                                   public void onError(Throwable e) {
                                       Log.d("kkkk", "error " + e.getMessage());
    
                                   }
    
                                   @Override
                                   public void onComplete() {
                                       Log.d("kkkk", "onComplete");
                                       memberDatabaseRepository.getAllFeed(FeedFragment.this);
                                   }
                               }
                    );
    

    【讨论】:

      猜你喜欢
      • 2021-12-10
      • 1970-01-01
      • 2021-06-06
      • 2022-01-20
      • 1970-01-01
      • 2021-09-05
      • 1970-01-01
      • 2018-01-27
      • 1970-01-01
      相关资源
      最近更新 更多