【问题标题】:RxJava retryWhen bizarre behaviorRxJava retryWhen 奇怪的行为
【发布时间】:2016-07-17 00:29:47
【问题描述】:

我正在使用 RxJava retryWhen 运算符。在互联网上几乎找不到它,唯一值得一提的是this。这也没有探索我想了解的各种用例。我还加入了异步执行并通过退避重试以使其更真实。

我的设置很简单:我有一个 ChuckNorrisJokesRepository 类,它从 JSON 文件中返回随机数的 Chuck Norris 笑话。我正在测试的班级是ChuckNorrisJokesService,如下所示。我感兴趣的用例如下:

  1. 第一次尝试成功(不重试)
  2. 1 次重试后失败
  3. 尝试重试 3 次,但第 2 次成功,因此不会重试第 3 次
  4. 第三次重试成功

注意:该项目在我的GitHub 上可用。

ChuckNorrisJokesService.java

@Slf4j
@Builder
public class ChuckNorrisJokesService {
    @Getter
    private final AtomicReference<Jokes> jokes = new AtomicReference<>(new Jokes());

    private final Scheduler scheduler;
    private final ChuckNorrisJokesRepository jokesRepository;
    private final CountDownLatch latch;
    private final int numRetries;
    private final Map<String, List<String>> threads;

    public static class ChuckNorrisJokesServiceBuilder {
        public ChuckNorrisJokesService build() {
            if (scheduler == null) {
                scheduler = Schedulers.io();
            }

            if (jokesRepository == null) {
                jokesRepository = new ChuckNorrisJokesRepository();
            }

            if (threads == null) {
                threads = new ConcurrentHashMap<>();
            }

            requireNonNull(latch, "CountDownLatch must not be null.");

            return new ChuckNorrisJokesService(scheduler, jokesRepository, latch, numRetries, threads);
        }
    }

    public void setRandomJokes(int numJokes) {
        mergeThreadNames("getRandomJokes");

        Observable.fromCallable(() -> {
            log.debug("fromCallable - before call. Latch: {}.", latch.getCount());
            mergeThreadNames("fromCallable");
            latch.countDown();

            List<Joke> randomJokes = jokesRepository.getRandomJokes(numJokes);
            log.debug("fromCallable - after call. Latch: {}.", latch.getCount());

            return randomJokes;
        }).retryWhen(errors ->
                errors.zipWith(Observable.range(1, numRetries), (n, i) -> i).flatMap(retryCount -> {
                    log.debug("retryWhen. retryCount: {}.", retryCount);
                    mergeThreadNames("retryWhen");

                    return Observable.timer(retryCount, TimeUnit.SECONDS);
                }))
                .subscribeOn(scheduler)
                .subscribe(j -> {
                            log.debug("onNext. Latch: {}.", latch.getCount());
                            mergeThreadNames("onNext");

                            jokes.set(new Jokes("success", j));
                            latch.countDown();
                        },
                        ex -> {
                            log.error("onError. Latch: {}.", latch.getCount(), ex);
                            mergeThreadNames("onError");
                        },
                        () -> {
                            log.debug("onCompleted. Latch: {}.", latch.getCount());
                            mergeThreadNames("onCompleted");

                            latch.countDown();
                        }
                );
    }

    private void mergeThreadNames(String methodName) {
        threads.merge(methodName,
                new ArrayList<>(Arrays.asList(Thread.currentThread().getName())),
                (value, newValue) -> {
                    value.addAll(newValue);

                    return value;
                });
    }
}

为简洁起见,我将只展示第一个用例的 Spock 测试用例。其他测试用例见我的GitHub

def "succeeds on 1st attempt"() {
    setup:
    CountDownLatch latch = new CountDownLatch(2)
    Map<String, List<String>> threads = Mock(Map)
    ChuckNorrisJokesService service = ChuckNorrisJokesService.builder()
            .latch(latch)
            .threads(threads)
            .build()

    when:
    service.setRandomJokes(3)
    latch.await(2, TimeUnit.SECONDS)

    Jokes jokes = service.jokes.get()

    then:
    jokes.status == 'success'
    jokes.count() == 3

    1 * threads.merge('getRandomJokes', *_)
    1 * threads.merge('fromCallable', *_)
    0 * threads.merge('retryWhen', *_)
    1 * threads.merge('onNext', *_)
    0 * threads.merge('onError', *_)
    1 * threads.merge('onCompleted', *_)
}

这失败了:

Too few invocations for:

1 * threads.merge('fromCallable', *_)   (0 invocations)
1 * threads.merge('onNext', *_)   (0 invocations)

我期待的是fromCallable 被调用一次,它成功,onNext 被调用一次,然后是onCompleted。我错过了什么?

P.S.:完全披露 - 我也在 RxJava GitHub 上发布了这个问题。

【问题讨论】:

    标签: java rx-java reactive-programming


    【解决方案1】:

    经过几个小时的故障排除后,我在 ReactiveX 成员 David Karnok 的帮助下解决了这个问题。

    retryWhen 是一个复杂的,甚至可能是错误的操作符。官方文档和此处的至少一个答案使用range 运算符,如果没有重试,则立即完成。请参阅我的 discussion 和 David Karnok。

    代码可在我的GitHub 上找到,并附有以下测试用例:

    1. 第一次尝试成功(不重试)
    2. 1 次重试后失败
    3. 尝试重试 3 次,但第 2 次成功,因此不会重试第 3 次
    4. 第三次重试成功

    【讨论】:

    • 那么“奇怪的行为”的原因是什么?
    • @YaroslavStavnichiy 您可以按照我的答案中的链接了解详细信息,但简而言之,range 立即完成,因此跳过onNext 并直接转到onCompleted。正如我在回答中所说,从那以后我重写了代码以不使用range
    • 我问是因为我没有在您的回答中看到这一点,也许您应该更新它。为什么range 会立即完成?那是因为numRetries 被设置为零(你忘了设置它)吗?
    • @YaroslavStavnichiy 在我的回答中好吧“范围运算符,如果没有重试,它将失败”。我现在改写了该声明以使其更清楚。至于numRetries=0,这是故意的(我的问题中的用例#1)。 numRetries=0 表示“我不想重试,我会接受第一次调用的结果,否则失败。
    • @Abhijit 你的 github 链接坏了
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-07-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多