【问题标题】:RXJS: Combining observables to emit in orderRXJS:组合可观察对象以按顺序发出
【发布时间】:2019-11-12 14:08:52
【问题描述】:

我有三个可观察的 ob1、ob2、ob2,它们都是 API 调用,我需要按顺序执行它们,并将值作为数组发出,如果有任何错误,我希望错误被捕获catchError 并停止进一步的 api 调用,例如:

someCombine(ob1, ob2.pipe(tap(val=> doSomething)), ob3.pipe(tap(val=> 
doSomething))).pipe(catchError((err)=> of(err))).subscribe(combinedVal=>{
if(err) {
processErr
} 
else 
doSomething
});

我尝试过使用 zip

 zip(ob1, ob2.pipe(tap(val=> doSomething)), ob3.pipe(tap(val=> 
    doSomething))).pipe(catchError((err)=> of(err))).subscribe(combinedVal=>{
    if(err) {
    processErr
    } 
    else 
    doSomething
    });

但是 zip 将在第一个完成之前开始下一个 observable,我已经尝试过 concat

  concat(ob1, ob2.pipe(tap(val=> doSomething)), ob3.pipe(tap(val=> 
    doSomething))).pipe(catchError((err)=> of(err))).subscribe(combinedVal=>{
    if(err) {
    processErr
    } 
    else 
    doSomething
    });

但是对于每个单独的可观察完成,这些值都是单独发出的。

我该如何处理这种情况。

【问题讨论】:

  • concat(...).pipe(bufferCount(3))
  • concat(...).pipe(toArray())

标签: rxjs observable


【解决方案1】:

我认为是这样的:

import { tap, delay, catchError } from 'rxjs/operators';
import { of, throwError, combineLatest } from 'rxjs';


// this is just to simulate the requests
const ob1$ = of('ob1').pipe(delay(5000)),
      ob2$ = of('ob2').pipe(delay(1000)),
      ob3$ = of('ob3').pipe(delay(3000));


// abstract to reuse, since we'll catch for each observable
const handleError = (err) => {
  // process error
  console.log('process error', err);
  return throwError(err);
};

const doSomething = (val) => {
  // do something
  console.log('do something', val);
};


combineLatest(
  ob1$.pipe(catchError(handleError), tap(doSomething)),
  ob2$.pipe(catchError(handleError), tap(doSomething)),
  ob3$.pipe(catchError(handleError), tap(doSomething))
).subscribe(value => console.log(value));

// value = ["ob1", "ob2", "ob3"].
// Only emits when/if all 3 observables emit/complete.

我意识到这并不完全符合您的要求,因为请求是同时发出的,并以它们可以的任何顺序发出,但结果数组将被正确排序,并且只在最后发出。每个 observable 都会触发自己的 tapcatchError。除非您的用例非常特殊,否则这应该是可取的。

【讨论】:

    【解决方案2】:

    bufferCount
    bufferCount 将发出最后 x 次发出,因此如果您知道您有 3 个 api 请求,那么在连接后您有 3 个发出,您可以使用 bufferCount(3)

    const { of, concat, operators: { delay, bufferCount } } = rxjs;
    
    const ob1$ = of ('ob1').pipe(delay(5000)),
      ob2$ = of ('ob2').pipe(delay(1000)),
      ob3$ = of ('ob3').pipe(delay(3000));
    
    concat(ob1$, ob2$, ob3$).pipe(bufferCount(3)).subscribe({
      next: next => console.dir(next) // ['ob1', 'ob2', 'ob3]
    });
    <script src="https://unpkg.com/@reactivex/rxjs@6.5.3/dist/global/rxjs.umd.js"></script>

    --或--

    自定义管道
    这个drain 管道操作符将在源 observable 完成并立即发出所有发射后发射,因此在联系后您可以在最后获得所有结果,并且它将处理任意数量的发射。

    const { of, concat, Subscriber, operators: { delay } } = rxjs;
    
    class DrainSubscriber extends Subscriber {
    	constructor(destination) {
        super(destination);
        this.result = [];
      }
      _next(value) {
        this.result.push(value);
      }
      _complete() {
      	this.destination.next(this.result);
        super._complete();
      }
    }
    
    class DrainOperator {
    	call(subscriber, source) {
      	return source.subscribe(new DrainSubscriber(subscriber));
      }
    }
    
    function drain() {
    	return (source) => source.lift(new DrainOperator());
    }
    
    const ob1$ = of ('ob1').pipe(delay(5000)),
      ob2$ = of ('ob2').pipe(delay(1000)),
      ob3$ = of ('ob3').pipe(delay(3000));
    
    concat(ob1$, ob2$, ob3$).pipe(drain()).subscribe({
      next: next => console.dir(next) // ['ob1', 'ob2', 'ob3]
    });
    <script src="https://unpkg.com/@reactivex/rxjs@6.5.3/dist/global/rxjs.umd.js"></script>

    在 TypeScript 中https://stackblitz.com/edit/typescript-1yrvm2-drain-operator?file=index.ts

    --promise alt--

    基于承诺的 alt

    const { of, concat, operators: { delay } } = rxjs;
    
    const ob1$ = of('ob1').pipe(delay(5000)),
      ob2$ = of('ob2').pipe(delay(1000)),
      ob3$ = of('ob3').pipe(delay(3000));
    
    async function resolve() {
      const res1 = await ob1$.toPromise();
      const res2 = await ob2$.toPromise();
      const res3 = await ob3$.toPromise();
      return [res1, res2, res3];
    }
    
    resolve().then(results => {
      console.dir(results); // ['ob1', 'ob2', 'ob3]
    });
    <script src="https://unpkg.com/@reactivex/rxjs@6.5.3/dist/global/rxjs.umd.js"></script>

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-06-18
      • 2017-03-31
      • 1970-01-01
      • 1970-01-01
      • 2018-06-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多