【问题标题】:Test several observables in order按顺序测试几个 observables
【发布时间】:2020-10-10 02:37:24
【问题描述】:

我想做一些试错法。我有几个 observables,我想取第一个 observables 的值,它没有遇到错误。我尝试了什么:

const err1$ = throwError('err1');
const err2$ = throwError('err2');
const ok1$ = of('ok1').pipe(tap(x => console.log('hot ok1')));
const ok2$ = of('ok2').pipe(tap(x => console.log('hot ok2')));
const ok3$ = of('ok3').pipe(tap(x => console.log('hot ok3')));

onErrorResumeNext(err1$, err2$, ok1$, ok2$, ok3$).pipe(
  first(),
  defaultIfEmpty('all failed'),
).subscribe(console.log);

我的问题是,我不想执行ok2$ok3$,因为ok1$ 很好,但是用这种方法它们变得很热。我的目标是,我只在控制台日志https://stackblitz.com/edit/rxjs-pmqbcv?file=index.ts 中看到hot ok1(来自点击)和ok1(来自订阅)

【问题讨论】:

  • 您的方法适用于异步可观察对象。所以您正在寻找一种同步执行此操作的方法?
  • 不,在我的实际情况下,这些是 http 调用

标签: javascript angular typescript rxjs


【解决方案1】:

这可以满足您的需求,方法是将每个可观察对象包装到一个管道中,该管道在错误 (catchDefaultConcat) 时返回未定义。之后我们可以过滤掉任何错误的结果(如果undefined 是可观察的有效结果,这将是有问题的)。

这是一个有趣的问题,onErrorResumeNext 的文档说它基本上只是连接源 observables 和“吞下”错误,但我也不确定为什么当你点击你的 @987654324 时它没有取消订阅@调用。

const err1$ = throwError('err1');
const err2$ = throwError('err2');
const ok1$ = of('ok1').pipe(tap(x => console.log('hot ok1')));
const ok2$ = of('ok2').pipe(tap(x => console.log('hot ok2')));
const ok3$ = of('ok3').pipe(tap(x => console.log('hot ok3')));

const catchDefaultConcat = (...args: Observable<any>[]) => {
  return concat(args.map((arg) => arg.pipe(
    catchError(() => of(undefined))
  )));
}

catchDefaultConcat(err1$, err2$, ok1$, ok2$, ok3$).pipe(
  concatAll(),
  filter((val) => val !== undefined),
  first(),
  catchError(() => EMPTY),
  defaultIfEmpty('all failed'),
).subscribe(console.log);

【讨论】:

  • 打印最后一个管道的错误Property 'pipe' does not exist on type 'OperatorFunction&lt;unknown, unknown&gt;'.
  • 您启发了我找到一种可行的方法,我将其添加为另一个答案。
  • 它在 Stackblitz 上工作,我有一种感觉,虽然它可能是不必要的复杂 ;)。你的版本更干净。
【解决方案2】:

受到 evilstiefel 答案的启发,我现在找到了一种可行的方法

const err1$ = throwError('err1');
const err2$ = throwError('err2');
const ok1$ = of('ok1').pipe(tap(x => console.log('hot ok1')));
const ok2$ = of('ok2').pipe(tap(x => console.log('hot ok2')));
const ok3$ = of('ok3').pipe(tap(x => console.log('hot ok3')));

of(err1$, err2$, ok1$, ok2$, ok3$).pipe(
  map(obs$ => obs$.pipe(catchError(() => of(undefined)))),
  concatAll(),
  filter((val) => val !== undefined),
  first(),
  defaultIfEmpty('all failed'),
).subscribe(console.log);

https://stackblitz.com/edit/rxjs-e5bcko?file=index.ts

【讨论】:

  • 请注意,在调用defaultIfEmpty 之前,您仍然需要catchError(() =&gt; EMPTY), 行,否则它永远不会被触发(如果您过滤掉所有内容,它会抛出EmptyError)。
【解决方案3】:

在 RxJs 6 中,onErrorResumeNext 方法对于同步 Observables 无效,但适用于异步 Observables。见:https://stackblitz.com/edit/rxjs-7jxjwh?file=index.ts

在 RxJs 7 中,这是fixedonErrorResumeNext 方法适用于所有 Observable。见:https://stackblitz.com/edit/rxjs-y98evs?file=index.ts


onErrorResumeNext 当前是 considered for deprecation,因此替换为 concatcatchError 可能是最好的选择。

function concatCatching(...args: ObservableInput<any>[]): Observable<any> {
  return concat(...args.map(o => from(o).pipe(catchError(() => EMPTY))))
}
concatCatching(err1$, err2$, ok1$, ok2$, ok3$).pipe(
  take(1),
  defaultIfEmpty('all failed'),
).subscribe(console.log);

https://stackblitz.com/edit/rxjs-fd1qyp?file=index.ts

【讨论】:

  • 我明白了,延迟(0)正在改变这里的行为。你能解释一下为什么吗?
  • @MoxxiManagarm 据我了解,onErrorResumeNext 直接订阅下一个源,如果前一个已完成而不检查目标是否已关闭。并且从take(1) 的源取消订阅只有在完成所有先前的同步操作之后才会发生。 delay(0) 将执行移动到下一个事件循环迭代,因此延迟取消订阅不是问题。这个问题在 RxJs 7 btw 中得到修复:github.com/ReactiveX/rxjs/pull/5650
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-09-01
  • 2017-09-06
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-02-28
  • 1970-01-01
相关资源
最近更新 更多