【问题标题】:Catching error on a merged observable and continue the others in RxJS在合并的 observable 上捕获错误并在 RxJS 中继续其他
【发布时间】:2020-02-23 09:48:18
【问题描述】:

用例: 我正在尝试使用 RxJS 在 javascript 中实现保存过程。我有一个名为 example set 的方法,我想首先将对象保存到本地 DB 中,然后将其发送到服务器。所以,例如,我有这个代码:

set({ obj, group = 'main' }) {
    // tell the LocalDB to store obj in appropriate name and group (appropriate table)
    const storageObservable = this.parealDB.storage.set(this.name, group, { ...obj, status: 0 }); // status 0 means that the records need to be synced (sent) to the server
    // tell the WebClient to sync obj to appropriate name and group (appropriate table)
    const webclientObservable = this.parealDB.webclient.set(this.name, group, obj); // try to send to server and then set status=1 on the localDB / this request should not be failed

    storageObservable.pipe(
        catchError(error => of({
            action: 'STORAGE_ERROR',
            data: error,
        }))
    );

    webclientObservable.pipe(
        tap(result => this.parealDB.storage.set(this.name, group, { _id: obj._id, status: 1 })),
        catchError(error => of({
            action: 'WEBCLIENT_ERROR',
            data: error,
        }))
    );

    return merge(storageObservable, webclientObservable);
}

我想在 set 方法之外返回一个合并的 observable,并在捕获每个内部作业的错误时,也继续其他作业。

问题: 我已经用一些测试可观察物测试了这个想法。首先,看一下示例:

//emit every 2.5 seconds
const first = interval(200).pipe(
  map(val => "" + val + " A"),
  take(3)
);
//emit every 2 seconds
const second = interval(100).pipe(
  mergeMap(val => {if(val === 1) return throwError('salam'); return of('' + val + " B");}),
  // tap(val => console.log('tapped ', val)),
  take(3)
)
// .pipe(
//   onErrorResumeNext(),
// )

//emit every 1.5 seconds
const third = interval(300).pipe(
  map(val => val + " C"),
  take(3)
);

merge(first, second, third)
// .pipe(
//   catchError(error => of(error)),
// )
.subscribe({
  next: console.log,
  error: console.error,
  complete: () => console.log('completed')
});

现在当第二个失败时,它破坏了整个工作,输出是这样的:

0 B
0 A
salam // (error)

如果我尝试在合并的 observable 上取消注释 catch 错误管道,它会将 salam 作为正常输出并完成工作,但仍会使其短路:

0 B
0 A
salam
completed

我发现的最佳解决方案是在将每个内部 observable 合并在一起之前通过管道传递 onErrorResumeNext。因此,在第二个 observable(这会导致问题)上取消注释提到的运算符会导致以下输出:

0 B
0 A
0 C
1 A
2 A
1 C
2 C
completed

它非常接近我正在搜索的内容,但在设置方法之外,我无法捕获错误(侦听错误)!

问题: 那么我错过了使用 RxJS 可观察对象还是有更好的方法来处理这种情况?

【问题讨论】:

  • 我不明白你想要发生什么
  • 我想要这个:例如,如果 B 失败,A 和 C 将继续发布下一个回调,并且订阅者可能会收到 B 错误的通知。
  • 在 RxJS 中,一个链最多可以发出一个error 通知,然后该链被释放。因此,您无法收到有关两个 errors 的通知,然后收到带有正确数据的 next。我认为您唯一的选择是将每个error 变成nextcatchError()
  • 谢谢@martin 我已经在我的第一个示例中对此进行了测试,但我想这可能是一个不好的做法。
  • 使用catchError 包装错误在例如。 NgRx 效果,我认为 Angular 文档也在使用它。很多时候没有别的办法。也许您可以为每个输入使用中间主题进行多个订阅,但这会不必要地复杂。

标签: error-handling merge rxjs observable


【解决方案1】:

你可以使用repeat() 操作符来保持流的存活,但是要小心使用它,你必须确保源流是一个热的observable,否则你可能会遇到无限循环。

streamA.pipe(
catch(e=>of(e)),
repeat())

【讨论】:

  • 不错的提示@FanCheung,但这与我的问题无关。如果我在其中一个流上捕获错误,则错误不会传播到合并流,如果在合并流上这样做,它会在捕获错误后崩溃。
  • 你可以对合并流使用相同的模式,认为它是一个失败的重新订阅
  • 在这种情况下我不同意你的看法。因为每个子流的作业独立运行,并且当合并的流抛出一个失败流的错误时,其他流仍然存在(我不希望并且在某些情况下不能杀死它们)而合并流立即停止。
  • 你想达到什么目的?如果您担心的话,您甚至可以考虑使用 repeat when 运算符
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-09-02
  • 1970-01-01
相关资源
最近更新 更多