【问题标题】:How to get similar behavior to bufferCount whilst emitting if there are less items than the buffer count如果项目少于缓冲区计数,如何在发射时获得与 bufferCount 类似的行为
【发布时间】:2026-01-08 16:50:02
【问题描述】:

我正在尝试实现与缓冲区计数非常相似的东西。当值通过管道时,bufferCount 当然会缓冲它们并将它们分批发送。如果当前流中的缓冲区大小小于缓冲区大小,我想要类似的东西,它将发出所有剩余的项目。

这个词有点令人困惑,所以我将提供一个示例来说明我想要实现的目标。

我有一些东西将项目单独添加到一个主题。有时它会在一分钟内添加1 项目,有时它会在1 秒内添加1000 项目。我希望对这些项目的批次进行长时间运行(2 seconds~),以免服务器过载。

因此,例如,考虑 P 正在处理的时间线

---A-----------B----------C---D--EFGHI------------------
   |_( P(A) )  |_(P(B))   |_(  P(C)  ) |_(P([D, E, F, G, H, I]))

通过这种方式,我可以根据发生的事件数量以小批量或大批量处理事件,但我确保批量保持小于 X。

我基本上需要将所有单个发射映射到包含 5 个或更少块的发射。当我将事件传送到concatMap 时,事件将开始堆积。我想分批挑选这些堆积起来的事件。我怎样才能做到这一点?

这是我目前所获得的堆栈闪电战:https://stackblitz.com/edit/rxjs-iqwcbh?file=index.ts

注意第 4 项和第 5 项是如何处理的,直到有更多项进入并填充缓冲区。理想情况下,在处理完 1,2,3 之后,它会从队列中取出 4,5。然后当 6,7,8 进来时,它会处理那些。

【问题讨论】:

  • 能否请您包括您到现在为止尝试过的内容?如果你stackblitz它会更容易帮助你。
  • 当然让我把它扔进stackblitz
  • @AmerYousuf 已添加,希望足够

标签: rxjs


【解决方案1】:

由于我注意到在您的示例代码中使用了forkJoin,我可以看到您正在向服务器发送每次发射的请求(我最初的印象是您每批只进行 1 次调用结合数据)。

在每个项目发送一个请求的情况下,解决方案要简单得多!

无需批量发送,只需使用mergeMap 并指定其并发参数即可。这将限制当前正在执行的请求数:

const stream$ = subject$.pipe(
  mergeMap(val => doWork(val), 3),  // 3 max concurrent requests
);

这是当对象快速发射时输出的视觉效果:

请注意,最初仅开始前 3 个项目的工作。之后的排放将排队并在之前的飞行项目完成时处理。

这是此行为的 StackBlitz 示例。

【讨论】:

  • 太棒了。 3 天的时间尝试了我能想到的所有解决方案和 hack,我需要做的就是阅读文档。吸取的教训(直到下一次)。这正是我需要的。非常感谢。
【解决方案2】:

听起来您想要bufferCountbufferTime 的组合。换句话说:“当缓冲区达到大小 X 或经过 Y 时间后释放缓冲区”。

我们可以使用race 运算符以及其他两个运算符来创建一个可观察对象,该可观察对象在缓冲区达到所需大小或持续时间过去后发出。我们还需要来自takerepeat 的一点帮助:

const chunk$ = subject$.pipe(bufferCount(3));

const partial$ = subject$.pipe(
  bufferTime(2000),
  filter(arr => !!arr.length) // don't emit empty array
);

const stream$ = race([chunk$, partial$]).pipe(
  take(1),
  repeat()
);

这里我们将stream$ 定义为chunk$partial$ 之间的第一个发射。然而,race 只会使用第一个发射的源,所以我们使用take(1)repeat 来“reset the race”。

然后您可以像这样使用concatMap 完成您的工作:

stream$.pipe(
  concatMap(chunk => this.doWorkWithChunk(chunk))
);

这是一个有效的 StackBlitz 演示。


您可能希望将其滚动到自定义运算符中,因此您可以简单地执行以下操作:

const stream$ = subject$.pipe(
  bufferCountTime(5, 2000)
);

bufferCountTime() 的定义可能如下所示:

function bufferCountTime<T>(count: number, time: number) {
  
  return (source$: Observable<T>) => {
    const chunk$ = source$.pipe(bufferCount(count));
    const partial$ = source$.pipe(
      bufferTime(time),
      filter((arr: T[]) => !!arr.length)
    );

    return race([chunk$, partial$]).pipe(
      take(1),
      repeat()
    );
  }
}

另一个StackBlitz 示例。

【讨论】:

  • 谢谢,这就像一个魅力。我唯一的问题是关于take(1), repeat() 期间错过事件的可能性。它从源取消订阅,然后重新订阅。这段时间我有可能错过一个活动吗?捕获每一个事件是非常关键的,所以我不想要这种可能性,但不确定 rx 的基本工作原理 - 是否会在事件循环中正确排队。
  • 老实说,我不是 100% 确定。这将需要一些测试和/或窥视 rxjs source code。但是...我刚刚注意到您的示例中使用了forkJoin。是不是这样,即使您正在“批量”排放,您仍然为每个排放向服务器发送单独的请求?如果是这样......有一个更简单的解决方案!
【解决方案3】:

TLDR;

可以在 here 找到包含该解决方案的 StackBlitz 应用程序。


说明

这是一种方法:

const bufferLen = 3;
const count$ = subject.pipe(filter((_, idx) => (idx + 1) % bufferLen === 0));
const timeout$ = subject.pipe(
  filter((_, idx) => idx === 0),
  switchMapTo(timer(0))
);

subject
  .pipe(
    buffer(
      merge(count$, timeout$).pipe(
        take(1),
        repeat()
      )
    ),
    concatMap(buffer => forkJoin(buffer.map(doWork)))
  )
  .subscribe(/* console.warn */);

/* Output:
Processing 1
Processing 2
Processing 3
Processed 1
Processed 2
Processed 3
Processing 4
Processing 5
Processed 4
Processed 5
Processing 6 <- after the `setTimeout`'s timer expires
Processing 7
Processing 8
Processed 6
Processed 7
Processed 8
*/

这个想法是当项目同步进入时仍然使用bufferCount 的行为,但同时检测缓冲区中的项目何时少于所选的bufferLen。我认为可以使用timer(0) 来完成这种检测,因为它在内部调度了一个宏任务,因此可以确保首先考虑同步发出的项目。

但是,没有一个运算符可以完全结合上面描述的逻辑。但重要的是要记住,我们当然想要一种类似于buffer 运算符提供的行为。例如,我们肯定会有 subject.pipe(buffer(...)) 之类的东西。

让我们看看我们如何在不使用bufferTime 的情况下实现类似于bufferTime 所做的事情:

const bufferLen = 3;
const count$ = subject.pipe(filter((_, idx) => (idx + 1) % bufferLen === 0));

鉴于上述 sn-p,使用 buffer(count$)bufferTime(3),我们应该得到相同的行为。

现在让我们进入检测部分

const timeout$ = subject.pipe(
  filter((_, idx) => idx === 0),
  switchMapTo(timer(0))
);

它本质上是在主体发出第一个项目后启动一个计时器。当我们有更多上下文时,这将更有意义:

subject
  .pipe(
    buffer(
      merge(count$, timeout$).pipe(
        take(1),
        repeat()
      )
    ),
    concatMap(buffer => forkJoin(buffer.map(doWork)))
  )
  .subscribe(/* console.warn */);

通过使用merge(count$, timeout$),这就是我们所说的:当主体发射时,开始向缓冲区添加项目,同时启动计时器。 计时器是也开始了,因为它用于确定缓冲区中是否会有更少的项目。

让我们看一下 StackBlitz 应用程序中提供的示例:

from([1, 2, 3, 4, 5])
  .pipe(tap(i => subject.next(i)))
  .subscribe();

// Then mimic some more items coming through a while later
setTimeout(() => {
  subject.next(6);
  subject.next(7);
  subject.next(8);
}, 10000);

1 被发出时,它会被添加到缓冲区中并且计时器会启动。然后23 立即到达,因此会发出累积值。
因为我们还使用了take(1)repeat(),所以进程将重新启动。现在,当4 发出时,它将被添加到缓冲区中,并且计时器将再次启动。 5 立即到达,但是到目前为止收集的项目数小于给定的缓冲区长度,这意味着直到第 3 个值到达,计时器才有时间完成。当计时器结束时,将发出[4,5] 块。 [6, 7, 8] 发生的情况与 [1, 2, 3] 发生的情况相同。

【讨论】:

    最近更新 更多