【问题标题】:RxJava retryWhen (exponential back-off) not workingRxJava retryWhen(指数退避)不起作用
【发布时间】:2020-02-13 14:57:38
【问题描述】:

所以我知道这已经被问过很多次了,但是我尝试了很多东西,但似乎没有任何效果。

让我们从这些博客/文章/代码开始:

还有很多其他的。

简而言之,它们都描述了如何使用 retryWhen 来实现指数退避。像这样的:

source
.retryWhen(
  errors -> {
    return errors
    .zipWith(Observable.range(1, 3), (n, i) -> i)
    .flatMap(
       retryCount -> {
       System.out.println("retry count " + retryCount);
       return Observable.timer((long) Math.pow(1, retryCount), SECONDS);
     });
 })

甚至库中的文档也同意它: https://github.com/ReactiveX/RxJava/blob/3.x/src/main/java/io/reactivex/rxjava3/core/Observable.java#L11919

但是,我已经尝试过这个和一些非常相似的变体,不值得在这里描述,而且似乎没有任何效果。有一种方法可以使示例工作并使用阻塞订阅者,但我想避免阻塞线程。

所以如果我们对前一个 observable 应用这样的阻塞订阅者:

.blockingForEach(System.out::println);

它按预期工作。但因为那不是想法。如果我们尝试:

.subscribe(
x -> System.out.println("onNext: " + x),
Throwable::printStackTrace,
() -> System.out.println("onComplete"));

流程只运行一次,因此不是我想要实现的。

这是否意味着它不能像我尝试的那样使用?从文档来看,尝试完成我的要求似乎不是问题。

知道我错过了什么吗?

TIA。

编辑:我有两种测试方法:

一种测试方法(使用testng):

Observable<Integer> source =
    Observable.just("test")
        .map(
            x -> {
              System.out.println("trying again");
              return Integer.parseInt(x);
            });
source
    .retryWhen(
        errors -> {
          return errors
              .zipWith(Observable.range(1, 3), (n, i) -> i)
              .flatMap(
                  retryCount -> {
                    return Observable.timer((long) Math.pow(1, retryCount), SECONDS);
                  });
        })
    .subscribe(...);

来自 Kafka 消费者(使用 Spring 引导):

这只是对观察者的订阅,但重试逻辑是我在帖子前面描述的。

@KafkaListener(topics = "${kafka.config.topic}")
      public void receive(String payload) {
        log.info("received payload='{}'", payload);
        service
            .updateMessage(payload)
            .subscribe(...)
            .dispose();
      }

【问题讨论】:

  • 请格式化代码,很难阅读。
  • 你是从主方法运行它吗? RxJava 使用守护线程,如果您的主线程在后台工作正在进行时退出,这些线程将被 JVM 终止。这就是示例使用blocking 方法的原因。
  • 我已经更新了帖子以解释我是如何调用该方法的。
  • 你遇到了和这里stackoverflow.com/questions/59005142/…一样的问题

标签: java rx-java2 exponential-backoff


【解决方案1】:

您的代码的主要问题是Observable.timer is by default operating on the computation scheduler。在尝试验证测试中的行为时,这会增加额外的工作量。

这里有一些单元测试代码,用于验证您的重试代码是否确实在重试。

  • 它添加了一个计数器,以便我们可以轻松地检查发生了多少次调用。
  • 它使用TestScheduler而不是计算调度器,这样我们就可以假装在advanceTimeBy中移动。

    TestScheduler testScheduler = new TestScheduler();
    AtomicInteger counter = new AtomicInteger();
    
    Observable<Integer> source =
        Observable.just("test")
            .map(
                x -> {
                    System.out.println("trying again");
                    counter.getAndIncrement();
                    return Integer.parseInt(x);
                });
    TestObserver<Integer> testObserver = source
        .retryWhen(
            errors -> {
                return errors
                    .zipWith(Observable.range(1, 3), (n, i) -> i)
                    .flatMap(
                        retryCount -> {
                            return Observable.timer((long) Math.pow(1, retryCount), SECONDS, testScheduler);
                        });
            })
        .test();
    
    assertEquals(1, counter.get());
    
    testScheduler.advanceTimeBy(1, SECONDS);
    assertEquals(2, counter.get());
    
    testScheduler.advanceTimeBy(1, SECONDS);
    assertEquals(3, counter.get());
    
    testScheduler.advanceTimeBy(1, SECONDS);
    assertEquals(4, counter.get());
    
    testObserver.assertComplete();
    

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-07-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多