【问题标题】:Exponential Backoff in RxJavaRxJava 中的指数退避
【发布时间】:2017-01-16 14:43:10
【问题描述】:

我有一个 API,它接受一个触发事件的 Observable

如果检测到 Internet 连接,我想返回一个 Observable,它每 defaultDelay 秒发出一个值,如果没有连接,则延迟 numberOfFailedAttempts^2 次。

我尝试了很多不同的样式,我遇到的最大问题是retryWhen's observable 只评估一次:

Observable
    .interval(defaultDelay,TimeUnit.MILLISECONDS)
    .observeOn(Schedulers.io())
    .repeatWhen((observable) ->
         observable.concatMap(repeatObservable -> {
             if(internetConnectionDetector.isInternetConnected()){
                 consecutiveRetries = 0;
                 return observable;
             } else {
                 consecutiveRetries++;
                 int backoffDelay = (int)Math.pow(consecutiveRetries,2);
                 return observable.delay(backoffDelay, TimeUnit.SECONDS);
                }
         }).onBackpressureDrop())
    .onBackpressureDrop();

有什么方法可以做我想做的事吗?我发现了一个相关的问题(现在找不到它),但所采取的方法似乎不适用于动态值。

【问题讨论】:

    标签: java java-8 rx-java exponential-backoff


    【解决方案1】:

    在你的代码中有两个错误:

    1. 为了重复一些可观察的序列,该序列必须是有限的。 IE。而不是interval,你最好使用justfromCallable之类的东西,就像我在下面的示例中所做的那样。
    2. 您需要从repeatWhen 的内部函数返回新的延迟可观察源,因此您必须返回Observable.timer() 而不是observable.delay()

    工作代码:

    public void testRepeat() throws InterruptedException {
        logger.info("test start");
    
        int DEFAULT_DELAY = 100; // ms
        int ADDITIONAL_DELAY = 100; // ms
        AtomicInteger generator = new AtomicInteger(0);
        AtomicBoolean connectionAlive = new AtomicBoolean(true); // initially alive
    
        Disposable subscription = Observable.fromCallable(generator::incrementAndGet)
                .repeatWhen(counts -> {
                    AtomicInteger retryCounter = new AtomicInteger(0);
                    return counts.flatMap(c -> {
                        int retry = 0;
                        if (connectionAlive.get()) {
                            retryCounter.set(0); // reset counter
                        } else {
                            retry = retryCounter.incrementAndGet();
                        }
                        int additionalDelay = ADDITIONAL_DELAY * (int) Math.pow(retry, 2);
                        logger.info("retry={}, additionalDelay={}ms", retry, additionalDelay);
                        return Observable.timer(DEFAULT_DELAY + additionalDelay, TimeUnit.MILLISECONDS);
                    });
                })
                .subscribe(v -> logger.info("got {}", v));
    
        Thread.sleep(220);
        logger.info("connection dropped");
        connectionAlive.set(false);
        Thread.sleep(2000);
        logger.info("connection is back alive");
        connectionAlive.set(true);
        Thread.sleep(2000);
        subscription.dispose();
        logger.info("test complete");
    }
    

    查看关于repeatWhenhere的详细文章。

    【讨论】:

    • 问题中的示例可能来自我的尝试中间,因为它似乎混合了我使用的两种方法(一种计时器+基于重试,一种间隔+基于延迟订阅),问题实际上来自那篇文章,它说应该再次使用重试/重复的可观察输入。不使用该可观察的会导致订阅泄漏问题吗?
    • @AssortedTrailmix 那是关于第一级输入,而不是关于内部flatMap。有关非常相似的模式,请参阅该文章中的最后一个示例。
    • 哦,我明白了,对不起,我错过了 counts 是什么被 flatMap'ed
    【解决方案2】:

    我一直发现retryWhen 有点低级,因此对于指数退避,我使用了一个经过单元测试且可用于 RxJava 1.x 的构建器(如 Abhijit)rxjava-extras。我建议使用有上限的版本,这样延迟的指数增长不会超过您定义的最大值。

    这就是你如何使用它:

    observable.retryWhen(
        RetryWhen.exponentialBackoff(
            delay, maxDelay, TimeUNIT.SECONDS)
        .build());
    

    我不同意retryWhen 存在错误,但如果您发现错误,请将其报告给 RxJava。错误修复很快!

    您需要 Maven Central 上的 rxjava-extras 0.8.0.6 或更高版本:

    <dependency>
        <groupId>com.github.davidmoten</groupId>
        <artifactId>rxjava-extras</artifactId>
        <version>0.8.0.6</version>
    </dependency>
    

    如果您需要 RxJava 2.x 版本,请告诉我。从 0.1.4 开始,rxjava2-extras 提供了相同的功能。

    【讨论】:

    • 我知道我在哪里见过这个!我不想在这里重新发明轮子,所以我可能会继续这样做并查看实现,看看我应该如何去做
    • 今天我注意到我忘了实现最大回退,但是在使用版本 0.8.0.6 时,我似乎不存在该方法签名
    • 前几天我很着急,所以我告诉自己我会回到我需要的那段代码,看来这个解决方案没有我预期的行为,那就是成功调用后重试应该重置(这是有道理的,因为这需要“带外”通信)。我认为下面概述的 repeatWhen 方法是我目前情况下所需要的,这种方法似乎针对“重试直到它工作”与“总是重试,如果它不起作用则延迟更长”的更常见情况进行了优化
    • repeat ().retryWhen() 怎么样才能满足您始终重试的要求?
    • 这就是我根据 retryWhen observable + flatMap 的答案所采用的方法。我认为我缺少的两件事是在 retryWhen 可观察时使用 flatMap 而不是只返回一个新的,以及 not 从 flatMap 内部返回可观察的源(这会导致细微的内存泄漏并且没有t 正常工作)
    【解决方案3】:

    您可以使用retryWhen 运算符来配置无连接时的延迟。如何定期发出项目是一个单独的主题(查找 intervaltimer 运算符)。如果您无法弄清楚,请打开一个单独的问题。

    我的 Github 上有一个广泛的示例,但我会在这里为您提供要点。

    RetryWithDelay retryWithDelay = RetryWithDelay.builder()
        .retryDelayStrategy(RetryDelayStrategy.RETRY_COUNT)
        .build()
    
    Single.fromCallable(() -> {
        ...
    }).retryWhen(retryWithDelay)
    .subscribe(j -> {
        ...
    })
    

    RetryWithDelay 定义如下。我用的是 RxJava 2.x,所以如果你用的是 1.x,签名应该是Func1&lt;Observable&lt;? extends Throwable&gt;, Observable&lt;Object&gt;&gt;

    public class RetryWithDelay implements
            Function<Flowable<? extends Throwable>, Publisher<Object>> {
        ...
    }
    

    RetryWithDelay 类。

    RetryStrategy枚举。

    这使我可以根据RetryDelayStrategy 配置各种类型的超时,常数、线性、指数。对于您的用例,您可以选择 CONSTANT_DELAY_TIMES_RETRY_COUNT 延迟策略并在构建 RetryWithDelay 时调用 retryDelaySeconds(2)

    retryWhen 是一个复杂的,甚至可能是错误的操作符。您会在网上找到的大多数示例都使用range 运算符,如果没有重试,它将失败。详情见我的回答here

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-01-09
      • 2015-09-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-03-17
      相关资源
      最近更新 更多