据我了解,您只想在上一个流完成后才开始下一个流(即将流添加到队列)
import { Observable, of, BehaviorSubject, from } from 'rxjs';
import { tap, finalize, filter, take, switchMap, delay } from 'rxjs/operators';
class StreamQueue {
lastStreamCompleted$: Observable<boolean> = new BehaviorSubject(true);
private runAfter<T>(lastStreamCompleted$: Observable<boolean>, stream$: Observable<T>): [Observable<boolean>, Observable<T>] {
const newLastStreamCompleted$ = new BehaviorSubject(false);
const newStream$ = lastStreamCompleted$
.pipe(
filter(lastStreamCompleted => lastStreamCompleted),
take(1),
switchMap(() => stream$),
finalize(() => newLastStreamCompleted$.next(true)),
);
return [newLastStreamCompleted$, newStream$];
}
add(stream$: Observable<any>) {
const [newLastStreamCompleted$, newStream$] = this.runAfter(this.lastStreamCompleted$, stream$);
this.lastStreamCompleted$ = newLastStreamCompleted$;
return newStream$;
}
}
const streamQueue = new StreamQueue();
streamQueue.add(from([1, 2]).pipe(delay(100))).subscribe(console.log);
setTimeout(()=>streamQueue.add(from([21, 22]).pipe(delay(100))).subscribe(console.log), 100);
streamQueue.add(from([11, 12]).pipe(delay(100))).subscribe(console.log);
// Output:
// 1
// 2
// 11
// 12
// 21
// 22