【问题标题】:Using of "skipWhile" combined with "repeatWhen" in RxJava to implement server polling在 RxJava 中使用“skipWhile”结合“repeatWhen”实现服务器轮询
【发布时间】:2016-04-28 21:04:11
【问题描述】:

我真的很喜欢 RxJava,它是一个很棒的工具,但有时很难理解它是如何工作的。 我们在我们的 Android 项目中使用带有 RxJava 的 Retrofit,并且有以下用例:

我需要轮询服务器,重试之间有一些延迟,而服务器正在做一些工作。服务器完成后,我必须提供结果。所以我已经用 RxJava 成功完成了,这里是代码 sn-p: 我将“skipWhile”与“repeatWhen”一起使用

Subscription checkJobSubscription = mDataManager.checkJob(prepareTweakJob)
        .skipWhile(new Func1<CheckJobResponse, Boolean>() {

            @Override
            public Boolean call(CheckJobResponse checkJobResponse) {
                boolean shouldSkip = false;

                if (SHOW_LOGS) Logger.v(TAG, "checkJob, skipWhile, jobStatus " + checkJobResponse.getJobStatus());

                switch (checkJobResponse.getJobStatus()){
                    case CheckJobResponse.PROCESSING:
                        shouldSkip = true;
                        break;
                    case CheckJobResponse.DONE:
                    case CheckJobResponse.ERROR:
                        shouldSkip = false;
                        break;
                }
                if (SHOW_LOGS) Logger.v(TAG, "checkJob, skipWhile, shouldSkip " + shouldSkip);

                return shouldSkip;
            }
        })
        .repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
            @Override
            public Observable<?> call(Observable<? extends Void> observable) {
                if (SHOW_LOGS) Logger.v(TAG, "checkJob, repeatWhen " + observable);
                return observable.delay(1, TimeUnit.SECONDS);
            }
        }).subscribe(new Subscriber<CheckJobResponse>(){
            @Override
            public void onNext(CheckJobResponse response) {
                if (SHOW_LOGS) Logger.v(TAG, "checkJob, onSuccess, response " + response);

            }

            @Override
            public void onError(BaseError error) {
                if (SHOW_LOGS) Logger.v(TAG, "checkJob, onError, canEditTimeline, error " + error);
                Toast.makeText(ChoseEditOptionActivity.this, R.string.NETWORK__no_internet_message, Toast.LENGTH_LONG).show();

            }

            @Override
            public void onCompleted() {
                if (SHOW_LOGS) Logger.v(TAG, "onCompleted");
            }
        });

代码运行良好:

当服务器响应该作业正在处理时,我从“skipWhile”链返回“true”,原始 O​​bservable 等待 1 秒并再次执行 http 请求。 这个过程一直重复,直到我从“skipWhile”链返回“false”。

这里有一些我不明白的地方:

  1. 我在“skipWhile”的文档中看到它不会从原始 Observable 发出任何东西(onError、onNext、onComplete),直到我从它的“call”方法返回“false”。所以如果它没有发出任何东西,为什么“repeatWhen”Observable 会起作用呢?它等待一秒钟并再次运行请求。谁启动它?

  2. 第二个问题是:为什么“repeatWhen”中的 Observable 不会永远运行,我的意思是为什么当我从“skipWhile”返回“false”时它会停止重复?如果我返回“false”,我会在订阅者中成功获取 onNext。

  3. 在“repeatWhile”的文档中,它说最终我在订阅者中收到了对“onComplete”的调用,但从未调用过“onComplete”。

  4. 如果我改变链接“skipWhile”和“repeatWhen”的顺序没有区别。这是为什么呢?

我知道 RxJava 是开源的,我可以阅读代码,但正如我所说 - 这真的很难理解。

谢谢。

【问题讨论】:

    标签: android networking retrofit rx-java long-polling


    【解决方案1】:

    我之前没有和repeatWhen合作过,但是这个问题让我很好奇,所以我做了一些研究。

    skipWhile 确实发出onErroronCompleted,即使在此之前它从未返回true。因此,每次checkJob() 发出onCompleted 时都会调用repeatWhen。这回答了问题 1。

    其余的问题都是基于错误的假设。您的订阅将永远运行,因为您的 repeatWhen 永远不会终止。那是因为repeatWhen 是一个比你想象的更复杂的野兽。每当它从源获得onCompleted 时,其中的Observable 就会发出null。如果你接受它并返回onCompleted,那么它就会结束,否则如果你发出任何东西,它就会重试。由于delay 只是接受发射并延迟它,它总是再次发射null。因此,它会不断地重新订阅。

    那么,#2 的答案是它永远运行;您可能正在此代码之外执行其他操作来取消订阅。对于#3,你永远不会得到onCompleted,因为它永远不会完成。对于#4,顺序无关紧要,因为您无限期地重复。

    现在的问题是,如何获得正确的行为?就像使用takeUntil 而不是skipWhile 一样简单。这样,您就可以不断重复直到得到您想要的结果,从而在您希望它结束​​时终止流。

    这是一个代码示例:

    Observable<Boolean> source = ...; // Something that eventually emits true
    
    source
        .repeatWhen(completed -> completed.delay(1, TimeUnit.SECONDS))
        .takeUntil(result -> result)
        .filter(result -> result)
        .subscribe(
            res -> System.out.println("onNext(" + res + ")"),
            err -> System.out.println("onError()"),
            () -> System.out.println("onCompleted()")
        );
    

    在本例中,source 发出布尔值。我每 1 秒重复一次,直到源发出 true。我一直服用直到resulttrue。我过滤掉了所有false 的通知,所以订阅者在true 之前不会收到它们。

    【讨论】:

    • 我打错了:我在“skipWhile”的文档中看到它不会从原始 Observable 发出任何东西(onError、onNext、onComplete),直到我从它的“call”方法返回“false” .这是原始文档:返回一个 Observable,只要指定的 * 条件成立,它就会跳过源 Observable 发出的所有项目,但一旦条件变为 false,就会发出所有进一步的源项目。所以我前几次得到“PROCESSING”并从方法返回“true”。它不应该发出任何东西。那为什么要重试呢?
    • 关于#2。不,我只有一个代码可以取消订阅活动的 onStop 中的订阅,但事实并非如此。我会看到我仍在使用 API 调用轮询服务器 :)
    • 关于第一条评论 - checkJob() 的输出可能是onNext(CheckJobResponse) -> onCompleted()。它在返回PROCESSING 时跳过转发第一个,但onCompleted() 仍然通过。 (我通过查看源代码确认了这一点。)
    • 至于 #2 - 我无法解释。我所知道的是,在我自己的测试中,您的设置不会终止。你的 retryWhen 永远不会调用 onCompleted() 所以它永远不会终止。
    • 好的。我再次检查了我的实现:从服务器获得“完成”后,我运行另一个 API 请求,然后关闭活动(在这里我取消订阅)。这就是为什么我没有注意到原始 API 调用仍在重复的原因。这个答案是正确的。谢谢。
    猜你喜欢
    • 2017-09-18
    • 2012-06-10
    • 1970-01-01
    • 2020-04-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-04-06
    相关资源
    最近更新 更多