【问题标题】:How to skip error and continue processing stream如何跳过错误并继续处理流
【发布时间】:2019-07-22 21:09:04
【问题描述】:

假设我有流:

final Observable<Integer> integerObservable = Observable
    .fromArray(1, 2, 3, 4, 5, 6)
    .map(i -> {
          if (i % 3 == 0) {
            throw new RuntimeException("Haha");
          } else {
            return i;
          }
        }
    );

如您所见,我可能会收到一些意外异常。

如何实现跳过异常/错误并继续接收下一个元素的解决方案,以便我的目标订阅者能够在这种特殊情况下看到:

1, 2, 4, 5

【问题讨论】:

    标签: rx-java rx-java2


    【解决方案1】:

    您可以将“危险”代码移动到“子”流中,如果发生错误,则不返回任何元素:

       Observable
            .fromArray(1, 2, 3, 4, 5, 6)
            .flatMap(ii -> Observable.just(ii).map(i -> {
                        if (i % 3 == 0) {
                            throw new RuntimeException("Haha");
                        } else {
                            return i;
                        }
                    }).onErrorResumeNext(Observable.empty())
            )
            .subscribe(System.out::println);
    

    【讨论】:

    • 我喜欢这个解决方案,请检查我的发现。
    【解决方案2】:

    没有像.continueNextIfError 这样的东西。当源流发出错误时,这意味着流已经终止并且不会有任何额外的事件。在此之后无法恢复或继续直播。


    如果应用程序设计正确,通常无需担心这种情况,因为您的问题有一个简单的解决方案:在错误到达下游之前处理它。

    final Observable<Integer> integerObservable = Observable
        .fromArray(1, 2, 3, 4, 5, 6)
        .map(i -> {
            try {
                if (i % 3 == 0) {
                    throw new RuntimeException("Haha");
                } else {
                    return i;
                }
            } catch (RuntimeException e) {
                return -1;
            }
        )
        .filter(i -> i >= 0);
    
    
    // or flatMaps cases
    final Observable<Integer> integerObservable = Observable
        .fromArray(1, 2, 3, 4, 5, 6)
        .flatMapMaybe(i -> {
            return Maybe.fromCallable(() -> {
                    if (i % 3 == 0) {
                        throw new RuntimeException("Haha");
                    } else {
                        return i;
                    }
                })
                .onErrorComplete();
        });
    


    此外,如果源 observable 是冷 observable,那么使用 .retry 重新订阅可能会有所帮助。

    【讨论】:

    • 你写的很伤心:When the source stream emits an error, it means the stream has terminated and there won't be any additional event. There is no way to recover or continue the stream after this point.你确定在这种意外异常之后没有办法恢复吗?
    • 我认为它是故意这样设计的。如果您可以展示一些需要持续流的具体示例,那么我认为我们应该能够找到解决方法。
    【解决方案3】:

    出于教育原因,我找到了可以解决此问题的解决方案。

    Observable
        .fromArray(1, 2, 3, 4, 5, 6)
        .concatMapDelayError(i -> Observable.fromCallable(() -> {
          if (i % 3 == 0) {
            throw new RuntimeException("Haha");
          } else {
            return i;
          }
        }))
        .onErrorResumeNext(Observable.empty())
        .subscribe(
            System.out::println,
            throwable -> log.error("That escalated quickly", throwable));
    

    上述代码的结果是(这是预期的):

    1
    2
    4
    5
    

    无限流可能会有问题(值得检查)。

    【讨论】:

    • onErrorResumeNext 触发错误发射,因此这不会阻止错误缓冲。结果错误 - 带有收集错误的复合异常,所以是的,这里有无限流的问题。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-09-05
    • 2011-05-07
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多