【发布时间】:2017-05-01 19:37:08
【问题描述】:
我使用 RxJs 处理从文件重放的数据。每个数据项都包含一个timereceived 属性。在重播时,我想创建缓冲区,其中包含最初在给定时间跨度 x 内收到的所有数据项。换句话说:我想将所有项目添加到当前缓冲区,而接收到的第一个缓冲区元素和接收到的当前元素之间的时间跨度小于时间跨度 x。
示例测试:
it('should be able to create buffers based on time received', function() {
// given
let source = require('rx').Observable.from([
{val: 1, timereceived: 10},
{val: 2, timereceived: 20},
{val: 3, timereceived: 100},
{val: 4, timereceived: 110},
{val: 5, timereceived: 120}
]);
// when
let actual = app.bufferWithTimeReceived(source, 105).toArray();
// then
console.log(actual);
assert.equal(actual.length, 2); // first contains val 1-3, second val 4-5
})
如果我不重播文件中的所有数据,而只是实时接收它,我可以使用 bufferWithTime 来解决这个问题。
用另一个例子更新
// given
let source = require('rxjs/Rx').Observable.from([
{val: 1, timereceived: 10},
{val: 2, timereceived: 20},
{val: 3, timereceived: 100},
{val: 4, timereceived: 110},
{val: 5, timereceived: 120},
{val: 6, timereceived: 9920},
{val: 7, timereceived: 9930}
]);
// when
app.bufferWithTimeReceived(source, 30).subscribe(console.log);
// then
// expected output would be [val 1-2][val 3-5][val 6-7] (empty arrays in between would be ok)
更新结束
现在我尝试了不同的方法。我的最后一个是:
exports.bufferWithTimeReceived = (source, timespan) => {
return Rx.Observable.defer(() => Rx.Observable.create(function (observer) {
let currBuffer = [];
source.subscribe(x => {
if (currBuffer.length == 0)
currBuffer = [x];
else {
if (x.timereceived-currBuffer[0].timereceived < timespan)
currBuffer.push(x);
else {
observer.onNext(currBuffer);
currBuffer = [x];
}
}
},
(err)=>observer.onError(err),
()=>observer.onCompleted());
}));
};
不幸的是,这只会导致oArrayObservable { source: Defer { _f: [Function] } } 作为错误消息,这不是很有帮助。我还想知道Rx - Divide stream into segments (lists) by condition 可能对我有什么帮助?!
额外问题:任何提示我如何使这个缓冲区重叠?
【问题讨论】:
标签: javascript rxjs reactive-programming rxjs5