【问题标题】:RxJS - Continue sending notifications after error occurs in onNextRxJS - onNext 发生错误后继续发送通知
【发布时间】:2017-01-05 18:37:06
【问题描述】:

我是 Rx 的新手,如果能在错误处理方面提供一些帮助,我将不胜感激。我有以下代码,基本上是预先输入的:

var observable = Rx.Observable.fromEvent(searchField, 'keyup')
    .map(ev => ev.target.value)
    .filter(text => text.length > 2)
    .debounce(500 /* ms */)
    .distinctUntilChanged()
    .flatMapLatest(getAsyncSearchResults);

observable.subscribe(function(value) {
        console.log("onNext");
        throw "error"; // THIS TERMINATES THE OBSERVABLE
    },
    function(error) {
        console.log("Error");
    },
    function() {
        console.log("Completed");
    });

在 onNext 函数发生异常后 Observable 终止。这对我来说似乎是一种不受欢迎的行为,因为我不想将所有 onNext 代码包装在 try-catch 中。有什么方法可以告诉可观察者无论发生什么异常都继续发出通知?

【问题讨论】:

  • 我不知道你在 subscribe 方法中抛出错误的目的,但这是一种非常不符合 RxJS-y 的做事方式。我会重新考虑您的模式并将反应式错误处理合并到管道中。

标签: rxjs reactive-programming reactive-extensions-js


【解决方案1】:

您需要切换到一个新的一次性流,如果其中发生错误将被安全地释放,并保持原始流处于活动状态。

我已将您的代码修改为更简单的 0 - 5 流,以便轻松演示。如果数字为 3,则会引发错误/异常。

Rx.Observable.from([0,1,2,3,4,5])
    .switchMap(value => {

        // This is the disposable stream!
        // Errors can safely occur in here without killing the original stream

        return Rx.Observable.of(value)
            .map(value => {
                if (value === 3) {
                    throw new Error('Value cannot be 3');
                }
                return value;
            })
            .catch(error => {
                // You can do some fancy stuff here with errors if you like
                // Below we are just returning the error object to the outer stream
                return Rx.Observable.of(error);
            });

    })
    .map(value => {
        if (value instanceof Error) {
            // Maybe do some error handling here
            return `Error: ${value.message}`;
        }
        return value;
    })
    .subscribe(
      (x => console.log('Success', x)),
      (x => console.log('Error', x)),
      (() => console.log('Complete'))
    );
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.1/Rx.min.js"></script>

此博客文章中有关此技术的更多信息:The Quest for Meatballs: Continue RxJS Streams When Errors Occur

【讨论】:

  • 这应该是公认的答案。他们的关键是有一个 switchMap 到另一个内部 observable。
  • 这个答案应该用紫色的长袍包裹起来,加冕,并用金银牲畜向它进贡。
【解决方案2】:

你真的应该让你的nextcompletederror 处理程序对异常安全。当订阅者出错时,源流没有合理的方式“恢复”订阅 - 唯一合理的做法是停止与订阅者对话。

关于为什么会这样的完整讨论,请参阅我的回答 How to handle exceptions thrown by observer's onNext in RxJava? - 它同样适用于 Rx 的任何平台实现。

此外,请参阅 Why is the OnError callback never called when throwing from the given subscriber? 以了解有关注意事项的一个很好的解释性比喻(报刊亭) - 特别是需要考虑可能有 其他订阅者 在他们的 next 中没有抛出错误处理程序。

【讨论】:

    【解决方案3】:

    这是每个平台上 Rx 的行为,在观察者中捕获异常的唯一方法是……嗯……捕获它们。

    如果您经常有这种需要,您可以创建自己的 subscribeSafe 方法,该方法在 try/catch 中包装回调。

    或者,如果您可以将观察者抛出的部分移动到流本身,那么您可以控制错误流,并(例如)重试。

    下面的例子:

    // simulates (hot) keyup event
    const obs1 = Rx.Observable.interval(1000).publish()
    obs1.connect()
    
    function getAsyncSearchResults(i) {
      return Rx.Observable.timer(500).mapTo(i)
    }
    
    obs1.switchMap(getAsyncSearchResults).do(x => {
      console.log(x) // all events gets logged
      throw new Error()
    }).retry().subscribe()
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.2/Rx.js"></script>

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2021-05-24
      • 1970-01-01
      • 2022-01-01
      • 2014-09-11
      • 2016-12-12
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多