【问题标题】:How do I implement something like "bufferWhile" in RxJs?如何在 RxJs 中实现类似“bufferWhile”的东西?
【发布时间】:2017-05-01 19:37:08
【问题描述】:

我使用 RxJs 处理从文件重放的数据。每个数据项都包含一个timereceived 属性。在重播时,我想创建缓冲区,其中包含最初在给定时间跨度 x 内收到的所有数据项。换句话说:我想将所有项目添加到当前缓冲区,而接收到的第一个缓冲区元素和接收到的当前元素之间的时间跨度小于时间跨度 x。

示例测试:

it('should be able to create buffers based on time received', function() {
    // given
    let source = require('rx').Observable.from([
        {val: 1, timereceived: 10},
        {val: 2, timereceived: 20},
        {val: 3, timereceived: 100},
        {val: 4, timereceived: 110},
        {val: 5, timereceived: 120}
    ]);

    // when
    let actual = app.bufferWithTimeReceived(source, 105).toArray();

    // then
    console.log(actual);
    assert.equal(actual.length, 2); // first contains val 1-3, second val 4-5
})

如果我不重播文件中的所有数据,而只是实时接收它,我可以使用 bufferWithTime 来解决这个问题。

用另一个例子更新

    // given
    let source = require('rxjs/Rx').Observable.from([
        {val: 1, timereceived: 10},
        {val: 2, timereceived: 20},
        {val: 3, timereceived: 100},
        {val: 4, timereceived: 110},
        {val: 5, timereceived: 120},
        {val: 6, timereceived: 9920},
        {val: 7, timereceived: 9930}
    ]);

    // when
    app.bufferWithTimeReceived(source, 30).subscribe(console.log);


    // then
    // expected output would be [val 1-2][val 3-5][val 6-7] (empty arrays in between would be ok)

更新结束

现在我尝试了不同的方法。我的最后一个是:

exports.bufferWithTimeReceived = (source, timespan) => {
    return Rx.Observable.defer(() => Rx.Observable.create(function (observer) {
        let currBuffer = [];
        source.subscribe(x => {
            if (currBuffer.length == 0)
                currBuffer = [x];
            else {
                if (x.timereceived-currBuffer[0].timereceived < timespan)
                    currBuffer.push(x);
                else {
                    observer.onNext(currBuffer);
                    currBuffer = [x];
                }
            }
        },
        (err)=>observer.onError(err),
        ()=>observer.onCompleted());
    }));
};

不幸的是,这只会导致oArrayObservable { source: Defer { _f: [Function] } } 作为错误消息,这不是很有帮助。我还想知道Rx - Divide stream into segments (lists) by condition 可能对我有什么帮助?!

额外问题:任何提示我如何使这个缓冲区重叠?

【问题讨论】:

    标签: javascript rxjs reactive-programming rxjs5


    【解决方案1】:

    我会这样做,但请注意这是 RxJS 5。但是,mergeAll 也应该在 RxJS 4 中可用(可能使用不同的名称)。

    const THRESHOLD = 105;
    
    const data = [
      {val: 1, timereceived: 10},
      {val: 2, timereceived: 20},
      {val: 3, timereceived: 100},
      {val: 4, timereceived: 110},
      {val: 5, timereceived: 120}
    ];
    const source = Observable.from(data)
      .groupBy(item => parseInt(item.timereceived / THRESHOLD))
      .map(observable => observable.toArray())
      .mergeAll()
      .subscribe(console.log);
    

    groupBy 运算符为parseInt(item.timereceived / THRESHOLD) 返回的每个键创建一个新的 Observable。然后我用toArray() 链接它,因为我想在重新发送它们之前收集它的所有项目和mergeAll() 订阅所有Observables(在本例中为2)并重新发送它们的项目,这对于每个源Observable 始终是一个数组。

    这会产生以下输出:

    [ { val: 1, timereceived: 10 }, { val: 2, timereceived: 20 }, { val: 3, timereceived: 100 } ]
    [ { val: 4, timereceived: 110 }, { val: 5, timereceived: 120 } ]
    

    见:

    【讨论】:

    • 非常感谢。 rxjs5 没问题。不幸的是,这并不能解决我的问题,因为我需要一次计算一个缓冲区的候选项目之间的增量。我用另一个例子更新了这个问题来澄清。提前致谢!
    【解决方案2】:

    感谢马丁的回答,我再次想到了 groupBy,这似乎对我有用:

    exports.bufferWithTimeReceived = (source, timespan) => {
            let currTime;
            return source.groupBy(x => {
                if (!currTime)
                    currTime = x.timereceived;
                if (x.timereceived-currTime > timespan)
                    currTime = x.timereceived;
                return currTime;            
            })
            .map(observable => observable.toArray())
            .mergeAll()
    };
    

    但我想知道bufferWhen 是否可以提供更优雅的解决方案?!

    【讨论】:

      猜你喜欢
      • 2019-09-09
      • 2019-07-24
      • 1970-01-01
      • 2021-09-29
      • 2015-06-04
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多