【发布时间】:2020-02-23 09:48:18
【问题描述】:
用例: 我正在尝试使用 RxJS 在 javascript 中实现保存过程。我有一个名为 example set 的方法,我想首先将对象保存到本地 DB 中,然后将其发送到服务器。所以,例如,我有这个代码:
set({ obj, group = 'main' }) {
// tell the LocalDB to store obj in appropriate name and group (appropriate table)
const storageObservable = this.parealDB.storage.set(this.name, group, { ...obj, status: 0 }); // status 0 means that the records need to be synced (sent) to the server
// tell the WebClient to sync obj to appropriate name and group (appropriate table)
const webclientObservable = this.parealDB.webclient.set(this.name, group, obj); // try to send to server and then set status=1 on the localDB / this request should not be failed
storageObservable.pipe(
catchError(error => of({
action: 'STORAGE_ERROR',
data: error,
}))
);
webclientObservable.pipe(
tap(result => this.parealDB.storage.set(this.name, group, { _id: obj._id, status: 1 })),
catchError(error => of({
action: 'WEBCLIENT_ERROR',
data: error,
}))
);
return merge(storageObservable, webclientObservable);
}
我想在 set 方法之外返回一个合并的 observable,并在捕获每个内部作业的错误时,也继续其他作业。
问题: 我已经用一些测试可观察物测试了这个想法。首先,看一下示例:
//emit every 2.5 seconds
const first = interval(200).pipe(
map(val => "" + val + " A"),
take(3)
);
//emit every 2 seconds
const second = interval(100).pipe(
mergeMap(val => {if(val === 1) return throwError('salam'); return of('' + val + " B");}),
// tap(val => console.log('tapped ', val)),
take(3)
)
// .pipe(
// onErrorResumeNext(),
// )
//emit every 1.5 seconds
const third = interval(300).pipe(
map(val => val + " C"),
take(3)
);
merge(first, second, third)
// .pipe(
// catchError(error => of(error)),
// )
.subscribe({
next: console.log,
error: console.error,
complete: () => console.log('completed')
});
现在当第二个失败时,它破坏了整个工作,输出是这样的:
0 B
0 A
salam // (error)
如果我尝试在合并的 observable 上取消注释 catch 错误管道,它会将 salam 作为正常输出并完成工作,但仍会使其短路:
0 B
0 A
salam
completed
我发现的最佳解决方案是在将每个内部 observable 合并在一起之前通过管道传递 onErrorResumeNext。因此,在第二个 observable(这会导致问题)上取消注释提到的运算符会导致以下输出:
0 B
0 A
0 C
1 A
2 A
1 C
2 C
completed
它非常接近我正在搜索的内容,但在设置方法之外,我无法捕获错误(侦听错误)!
问题: 那么我错过了使用 RxJS 可观察对象还是有更好的方法来处理这种情况?
【问题讨论】:
-
我不明白你想要发生什么
-
我想要这个:例如,如果 B 失败,A 和 C 将继续发布下一个回调,并且订阅者可能会收到 B 错误的通知。
-
在 RxJS 中,一个链最多可以发出一个
error通知,然后该链被释放。因此,您无法收到有关两个errors 的通知,然后收到带有正确数据的next。我认为您唯一的选择是将每个error变成next和catchError() -
谢谢@martin 我已经在我的第一个示例中对此进行了测试,但我想这可能是一个不好的做法。
-
使用
catchError包装错误在例如。 NgRx 效果,我认为 Angular 文档也在使用它。很多时候没有别的办法。也许您可以为每个输入使用中间主题进行多个订阅,但这会不必要地复杂。
标签: error-handling merge rxjs observable