实际上,使用bufferTime() 运算符及其三个参数有一种更简单的方法:
bufferTime(bufferTimeSpan, bufferCreationInterval, maxBufferSize)
这意味着我们可以使用bufferTime(1000, null, 10),这意味着我们将在最多 1 秒后发出最多 10 个项目或的缓冲区。 null 表示我们想在当前缓冲区发出后立即打开一个新缓冲区。
function mockRequest(val) {
return Observable
.of(val)
.delay(100)
.map(val => 'R' + val);
}
Observable
.range(0, 55)
.concatMap(val => Observable.of(val)
.delay(25) // async source of values
// .delay(175)
)
.bufferTime(1000, null, 10) // collect all items for 1s
.concatMap(buffer => Observable
.from(buffer) // make requests
.delay(1000) // delay this batch by 1s (rate-limit)
.mergeMap(value => mockRequest(value)) // collect results regardless their initial order
.toArray()
)
// .timestamp()
.subscribe(val => console.log(val));
观看现场演示:https://jsbin.com/mijepam/19/edit?js,console
您可以尝试不同的初始延迟。只有25ms的请求会在10之前分批发送:
[ 'R0', 'R1', 'R2', 'R3', 'R4', 'R5', 'R6', 'R7', 'R8', 'R9' ]
[ 'R10', 'R11', 'R12', 'R13', 'R14', 'R15', 'R16', 'R17', 'R18', 'R19' ]
[ 'R20', 'R21', 'R22', 'R23', 'R24', 'R25', 'R26', 'R27', 'R28', 'R29' ]
[ 'R30', 'R31', 'R32', 'R33', 'R34', 'R35', 'R36', 'R37', 'R38', 'R39' ]
[ 'R40', 'R41', 'R42', 'R43', 'R44', 'R45', 'R46', 'R47', 'R48', 'R49' ]
[ 'R50', 'R51', 'R52', 'R53', 'R54' ]
但是对于.delay(175),我们将发出少于 10 个项目的批次,因为我们受到 1 秒延迟的限制。
[ 'R0', 'R1', 'R2', 'R3', 'R4' ]
[ 'R5', 'R6', 'R7', 'R8', 'R9', 'R10' ]
[ 'R11', 'R12', 'R13', 'R14', 'R15' ]
[ 'R16', 'R17', 'R18', 'R19', 'R20', 'R21' ]
[ 'R22', 'R23', 'R24', 'R25', 'R26', 'R27' ]
[ 'R28', 'R29', 'R30', 'R31', 'R32' ]
[ 'R33', 'R34', 'R35', 'R36', 'R37', 'R38' ]
[ 'R39', 'R40', 'R41', 'R42', 'R43' ]
[ 'R44', 'R45', 'R46', 'R47', 'R48', 'R49' ]
[ 'R50', 'R51', 'R52', 'R53', 'R54' ]
但是,您可能需要的内容有所不同。由于.bufferTime(1000, ...) 和delay(1000),此解决方案最初在 2 秒延迟后开始发出值。所有其他排放都在 1 秒后发生。
你最终可以使用:
.bufferTime(1000, null, 10)
.mergeAll()
.bufferCount(10)
这将始终收集 10 个项目,然后才会执行请求。这可能会更有效。