TLDR;
可以在 here 找到包含该解决方案的 StackBlitz 应用程序。
说明
这是一种方法:
const bufferLen = 3;
const count$ = subject.pipe(filter((_, idx) => (idx + 1) % bufferLen === 0));
const timeout$ = subject.pipe(
filter((_, idx) => idx === 0),
switchMapTo(timer(0))
);
subject
.pipe(
buffer(
merge(count$, timeout$).pipe(
take(1),
repeat()
)
),
concatMap(buffer => forkJoin(buffer.map(doWork)))
)
.subscribe(/* console.warn */);
/* Output:
Processing 1
Processing 2
Processing 3
Processed 1
Processed 2
Processed 3
Processing 4
Processing 5
Processed 4
Processed 5
Processing 6 <- after the `setTimeout`'s timer expires
Processing 7
Processing 8
Processed 6
Processed 7
Processed 8
*/
这个想法是当项目同步进入时仍然使用bufferCount 的行为,但同时检测缓冲区中的项目何时少于所选的bufferLen。我认为可以使用timer(0) 来完成这种检测,因为它在内部调度了一个宏任务,因此可以确保首先考虑同步发出的项目。
但是,没有一个运算符可以完全结合上面描述的逻辑。但重要的是要记住,我们当然想要一种类似于buffer 运算符提供的行为。例如,我们肯定会有 subject.pipe(buffer(...)) 之类的东西。
让我们看看我们如何在不使用bufferTime 的情况下实现类似于bufferTime 所做的事情:
const bufferLen = 3;
const count$ = subject.pipe(filter((_, idx) => (idx + 1) % bufferLen === 0));
鉴于上述 sn-p,使用 buffer(count$) 和 bufferTime(3),我们应该得到相同的行为。
现在让我们进入检测部分:
const timeout$ = subject.pipe(
filter((_, idx) => idx === 0),
switchMapTo(timer(0))
);
它本质上是在主体发出第一个项目后启动一个计时器。当我们有更多上下文时,这将更有意义:
subject
.pipe(
buffer(
merge(count$, timeout$).pipe(
take(1),
repeat()
)
),
concatMap(buffer => forkJoin(buffer.map(doWork)))
)
.subscribe(/* console.warn */);
通过使用merge(count$, timeout$),这就是我们所说的:当主体发射时,开始向缓冲区添加项目,同时启动计时器。 计时器是也开始了,因为它用于确定缓冲区中是否会有更少的项目。
让我们看一下 StackBlitz 应用程序中提供的示例:
from([1, 2, 3, 4, 5])
.pipe(tap(i => subject.next(i)))
.subscribe();
// Then mimic some more items coming through a while later
setTimeout(() => {
subject.next(6);
subject.next(7);
subject.next(8);
}, 10000);
当1 被发出时,它会被添加到缓冲区中并且计时器会启动。然后2 和3 立即到达,因此会发出累积值。
因为我们还使用了take(1) 和repeat(),所以进程将重新启动。现在,当4 发出时,它将被添加到缓冲区中,并且计时器将再次启动。 5 立即到达,但是到目前为止收集的项目数小于给定的缓冲区长度,这意味着直到第 3 个值到达,计时器才有时间完成。当计时器结束时,将发出[4,5] 块。 [6, 7, 8] 发生的情况与 [1, 2, 3] 发生的情况相同。