在 RxJS 5 中,我会这样做:
Observable.range(1, 25)
.bufferCount(5)
.concatMap(batch => { // process images
console.log('process', batch);
return Observable.from(batch)
.mergeMap(val => Observable.of('p' + val).delay(300))
.toArray();
})
.concatMap(batch => { // send images
console.log('send batch', batch);
return Observable.from(batch)
.mergeMap(val => Observable.of('s' + val).delay(500))
.toArray();
})
.subscribe(val => {
// console.log('response');
console.log('response', val);
});
使用 bufferCount 运算符,我将输入数组拆分为 5 个项目的批次。然后首先使用第一个 concatMap() 处理每个批次(我故意使用 concat 是因为我想等到嵌套的 Observable 完成)。然后将处理后的数据发送到另一个concatMap(),然后再将其发送到您的服务器。
我正在使用两个delay() 运算符来模拟不同的任务需要不同的时间。在我们的例子中,处理图像非常快,所以第一个 concatMap 会比第二个 concatMap 更快地将它们发送到服务器,这没关系。处理后的图片会堆叠在concatMap里面,分批发送。
此演示的输出将如下所示:
process [ 1, 2, 3, 4, 5 ]
send batch [ 'p1', 'p2', 'p3', 'p4', 'p5' ]
process [ 6, 7, 8, 9, 10 ]
process [ 11, 12, 13, 14, 15 ]
response [ 'sp1', 'sp2', 'sp3', 'sp4', 'sp5' ]
send batch [ 'p6', 'p7', 'p8', 'p9', 'p10' ]
process [ 16, 17, 18, 19, 20 ]
process [ 21, 22, 23, 24, 25 ]
response [ 'sp6', 'sp7', 'sp8', 'sp9', 'sp10' ]
send batch [ 'p11', 'p12', 'p13', 'p14', 'p15' ]
response [ 'sp11', 'sp12', 'sp13', 'sp14', 'sp15' ]
send batch [ 'p16', 'p17', 'p18', 'p19', 'p20' ]
response [ 'sp16', 'sp17', 'sp18', 'sp19', 'sp20' ]
send batch [ 'p21', 'p22', 'p23', 'p24', 'p25' ]
response [ 'sp21', 'sp22', 'sp23', 'sp24', 'sp25' ]
观看现场演示:https://jsbin.com/mileqa/edit?js,console
但是,如果您想始终先处理一个批次而不是发送它,并且在发送它而不是继续另一个批次时,您必须将第二个内部 Observable 从 concatMap 移动到第一个 toArray() 的末尾concatMap() 来电。
.concatMap(batch => { // process images
console.log('process', batch);
return Observable.from(batch)
.mergeMap(val => Observable.of('p' + val).delay(100))
.toArray()
.concatMap(batch => { // send images
console.log('send batch', batch);
return Observable.from(batch)
.mergeMap(val => Observable.of('s' + val).delay(500))
.toArray();
});
})
观看现场演示:https://jsbin.com/sabena/2/edit?js,console
这会产生如下输出:
process [ 1, 2, 3, 4, 5 ]
send batch [ 'p1', 'p2', 'p3', 'p4', 'p5' ]
response [ 'sp1', 'sp2', 'sp3', 'sp4', 'sp5' ]
process [ 6, 7, 8, 9, 10 ]
send batch [ 'p6', 'p7', 'p8', 'p9', 'p10' ]
response [ 'sp6', 'sp7', 'sp8', 'sp9', 'sp10' ]
process [ 11, 12, 13, 14, 15 ]
send batch [ 'p11', 'p12', 'p13', 'p14', 'p15' ]
response [ 'sp11', 'sp12', 'sp13', 'sp14', 'sp15' ]
process [ 16, 17, 18, 19, 20 ]
send batch [ 'p16', 'p17', 'p18', 'p19', 'p20' ]
response [ 'sp16', 'sp17', 'sp18', 'sp19', 'sp20' ]
process [ 21, 22, 23, 24, 25 ]
send batch [ 'p21', 'p22', 'p23', 'p24', 'p25' ]
response [ 'sp21', 'sp22', 'sp23', 'sp24', 'sp25' ]
可以看到“进程”、“发送批处理”和“响应”日志是按顺序排列的。
在 RxJS 4 中的实现应该几乎相同(只是操作符名称可能略有不同)。
在 RxJS 4 中还有 controlled() operator 在 RxJS 5 中不存在(还没有?)。我可能会做一些与您需要的非常相似的事情。