retryWhen() 的可调用对象需要返回一个 Observable,该 Observable 发出 complete 或 error 以结束流或发出一个值以重新订阅。
例如,由于Observable.empty(),此代码完成而没有发出错误:
Observable.create(obs => {
obs.next(1);
obs.next(2);
obs.error('error from source');
})
.retryWhen((errors) => {
errors.subscribe(sourceError => console.log(sourceError));
return Observable.create(obs => obs.error('inner error'));
})
.subscribe(
val => console.log(val),
err => console.log('error', err),
_ => console.log('complete')
);
源 Observable 的错误以next 的形式发送到errors。见源码:https://github.com/ReactiveX/rxjs/blob/master/src/operator/retryWhen.ts#L86
这会打印到控制台:
1
2
error inner error
error from source
观看现场演示:http://plnkr.co/edit/Fajsb54WJwB8J8hkUC6j?p=preview
根据下面的 cmets 进行编辑:
查看retryWhen()的文档:
一个错误将导致 Throwable 的发射,导致 notificationHandler 返回的 Observable 发生错误。如果该 Observable 调用 onComplete 或错误,那么 retry 将在子订阅上调用 complete 或 error。 否则,此 Observable 将重新订阅特定调度程序上的源 observable。
所以回调返回的 Observable 负责决定是否重新订阅。如果它发出next(),则重新订阅。如果它发出 error() 或 complete() 将它们传递给子 Observer。
例如你可以这样做(我没有测试这段代码):
return response.retryWhen((errors) => {
var retrySource = new Subject();
errors.subscribe(error => {
if (this.responseErrorProcess(error)) retrySource.next();
else retrySource.complete();
});
return retrySource;
});
根据您的内在逻辑,您在retrySource 上触发了正确的消息。