【问题标题】:Observable stops firing even when catching the error即使发现错误,Observable 也会停止触发
【发布时间】:2020-09-04 02:55:12
【问题描述】:

我的项目遇到了一个非常奇怪的行为,我有一个简单的 Angular 服务,代码如下:

seatClick$ = new Subject<Seat>();

以及触发 observable 的服务上的方法:

  handleSeatClick(seat: Seat) {
    this.seatClick$.next(seat);
  }

可观察的逻辑很简单:

this.seatClick$.pipe(
    exhaustMap((seat: Seat) => {
       this.someFunctionThatThrowsException(); // this function throws ref exception
      return of(null);
    })
    , catchError(err => {
        console.log('error handled');
        return of(null);
    })
)
.subscribe(() => {
    console.log('ok');
  },
  (error1 => {
    console.log('oops');
  })
);

这真的很奇怪,当调用“someFunctionThatThrowsException”时会抛出一些 ReferenceError 异常,然后这个异常会被 catchError 捕获并触发 next() 事件。

但是,从这一刻起,seatClick observable 停止响应,就好像它完成了一样,在服务上调用 handleSeatClick 将不再响应。

我在这里错过了什么?

【问题讨论】:

  • 能否请您展示this.someFunctionThatThrowsException()的实现?

标签: angular typescript rxjs observable angular-observable


【解决方案1】:

使用repeat() 运算符的一个很好的替代方法是将错误处理嵌套在内部管道中。完全没问题 - 这个 observable 无论如何都应该终止。

this.seatClick$.pipe(
  exhaustMap((seat: Seat) => {
    // We moved the error handling down one level.
    this.someFunctionThatThrowsException().pipe(
      catchError(err => {
        console.log('error handled');
        return of(null);
      }),
    );
    return of(null);
  }),
).subscribe());

【讨论】:

    【解决方案2】:

    这是正确的行为,您需要repeat 操作员在这里重新订阅。

    this.seatClick$.pipe(
        exhaustMap((seat: Seat) => {
           this.someFunctionThatThrowsException();
           return of(null);
        })
    
        // in case of an error the stream has been completed.
        , catchError(err => {
            console.log('error handled');
            return of(null);
        })
    
        // now we need to resubscribe again
        , repeat() // <- here
    )
    .subscribe(() => {
        console.log('ok');
      },
      (error1 => {
        console.log('oops');
      })
    );
    

    此外,如果您知道某些事情可能会失败,您可以将其专用于内部流并在那里使用catchError,那么您就不需要repeat

    this.seatClick$.pipe(
      // or exhaustMap, or mergeMap or any other stream changer.
      switchMap(seal => of(seal).pipe(
        exhaustMap((seat: Seat) => {
           this.someFunctionThatThrowsException();
           return of(null);
        })
        , catchError(err => {
            console.log('error handled');
            return of(null);
        })
      )),
      // now switchMap always succeeds with null despite an error happened inside
      // therefore we don't need `repeat` outside and our main stream
      // will be completed only when this.seatClick$ completes
      // or throws an error.
    )
    .subscribe(() => {
        console.log('ok');
      },
      (error1 => {
        console.log('oops');
      })
    );
    

    【讨论】:

    • 谢谢,为什么这是正确的行为?如果没有抛出异常,它就可以工作,catchError 不应该处理错误并保持可观察的活动吗?
    • 在 rx 中,错误完成了流。当您在catchError 中发现错误时,父流已经关闭,作为一个选项,您可以将当前流切换到另一个流,因为ofnull 之后立即完成整个流已经完成并且subscribe 赢了'没有从中得到任何东西,您可以添加第三个回调以查看流何时完成(在 null 之后)。 repeat 在完成时监听并重新订阅,所以 subscribe,如果出现错误,它就像一个全新的订阅。
    • 想象一下您需要在error1 =&gt; console.log('oops') 中继续 - 这是不可能的,因为流已关闭,您需要解决方法来重新订阅。
    • @satanTime:我知道retry()repeat() 在这里没有任何区别,但是retry() 在语义上不会更正确,因为重新订阅是在错误之后发生的?
    • 取决于您的要求,如果您希望流失败并尝试 N 次成功,retry 是正确的解决方案。如果您希望流始终处于活动状态,那么您需要repeat,而且retry 仅在出现错误的情况下有效,成功完成后它不会重新订阅,据我所知。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-09-23
    • 1970-01-01
    • 2017-02-26
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多