1 总体思路 - 在缓冲和非缓冲之间来回切换
看起来您需要在开始时暂停(缓冲)historyStream$,然后在 deviceVersionModelStream$ 发出时取消暂停。一种暂停/取消暂停流的方法是。
merge(
source$.pipe(bufferToggle(pauseOn$, () => pauseOff$)),
source$.pipe(windowToggle(pauseOff$, () => pauseOn$))
).pipe(mergeAll())
另见:https://medium.com/@kddsky/pauseable-observables-in-rxjs-58ce2b8c7dfd
您的具体情况
对于您的情况,这将是:
const versionModel$ = deviceVersionModelStream$.pipe(take(1));
merge(
historyStream$.pipe(bufferToggle(of(0), v => versionModel$)),
historyStream$.pipe(windowToggle(versionModel$, v => NEVER))
).pipe(
mergeAll(),
bufferTime(2000)
);
如果您想在每次发出时都可以访问deviceVersionModelStream$ 的输出,您可以使用combineLatest。如果对historyStream$ 和deviceVersionModelStream$ 的多个订阅有问题,您可以事先使用share。
const versionModel$ = deviceVersionModelStream$.pipe(share(), take(1));
const historyStreamShared$ = historyStream$.pipe(share());
combineLatest(
versionModel$,
merge(
historyStreamShared$.pipe(bufferToggle(of(0), v => versionModel$)),
historyStreamShared$.pipe(windowToggle(versionModel$, v => NEVER))
).pipe(
mergeAll(),
bufferTime(2000)
)
).subscribe(console.log);
https://stackblitz.com/edit/rxjs-oyctsg
编辑(如果您在开始时只需要一次缓冲区)
在您的情况下,一旦关闭缓冲区,您就不需要再次使用缓冲区,即一旦您从 bufferToggle 切换到 windowToggle 流,您就不需要切换回 bufferToggle 流。
这允许一种稍微简单的方法,只使用buffer 而不是bufferToggle 和windowToggle。
2 总体思路 - 仅在开头缓冲
source$ = dataStream.pipe(share()) // make sure this is a hot observable
pauseOff$ = timer(5000) // make sure this observable emits once and completes
concat(
source$.pipe(buffer(pauseOff$), mergeAll()), // start with a buffered stream
source$ // switch to the unbuffered stream when pauseOff$ emits and completes the previous stream
)
您的具体情况
const historyStream$ = timer(0, 100).pipe(share(), take(200));
const versionModel$ = of("version-model").pipe(delay(5000), take(1));
combineLatest(
versionModel$,
concat(
historyStream$.pipe(buffer(versionModel$), mergeAll()),
historyStream$
).pipe(
bufferTime(2000)
)
).subscribe(console.log);
https://stackblitz.com/edit/rxjs-2ptrux