【问题标题】:How to do a sliding buffer stream?如何做一个滑动缓冲流?
【发布时间】:2016-09-02 02:27:55
【问题描述】:

鉴于以下代码,我使用bufferCount(不是我想要的)...

   var Rx = require('rxjs/Rx');
    var Observable = Rx.Observable;
    var Subject = Rx.Subject;

    var first = new Rx.Subject();

    var source = first.bufferCount(2).map(a => a.reduce((acc,x) => acc+x,0));

    var subscription = source.subscribe(console.log)

    first.next(1)
    first.next(2)
    first.next(2)
    first.next(3)

我明白了

3
5

我想得到的是

3
4
5

所以缓冲区正在缓冲最后 2 个项目。

有没有办法简单地做到这一点?

【问题讨论】:

    标签: javascript node.js rxjs


    【解决方案1】:

    使用scan 运算符来保存您的滑动缓冲区,并对其执行操作。例如:

    var source = first.scan((slidingBuffer, newInput) => {
        return addTo(slidingBuffer, newInput)
      }, [])
      .map(processBuffer)
    

    addToprocessBuffer 函数含义明显。

    【讨论】:

    • 对于滑动缓冲区未满的初始情况,有没有办法从扫描返回而不向流中发出任何内容?
    • 只返回 false,过滤掉 false 值,或者在 processBuffer 函数中处理这种情况
    • 是的,刚刚意识到我可以过滤它
    【解决方案2】:

    您是否尝试过使用bufferCountskip / startBufferEvery 参数?这允许您生成重叠缓冲区,然后您可以根据需要减少这些缓冲区。这种方式不涉及发出您必须随后过滤掉的值。如果您特别希望缓冲区大小为 2,则可以使用 pairwise 代替 bufferCount。

    Keith:- 使用这种方法的代码

    var Rx = require('rxjs/Rx');
    var Observable = Rx.Observable;
    var Subject = Rx.Subject;
    
    var first = new Rx.Subject();
    
    var source = first.bufferCount(2,1).map(a => a.reduce((acc,x) => acc+x,0));
    
    var subscription = source.subscribe(console.log)
    
    first.next(1)
    first.next(2)
    first.next(2)
    first.next(3)
    

    【讨论】:

    • 不错,我没注意到那个偷偷摸摸的参数
    【解决方案3】:

    您可以将数组操作与扫描结合使用。这是我使用的最大尺寸为 20 个项目。

    first.pipe(
       scan((buffer, newItem) => ([ newItem, ...buffer].slice(0, 20)), <string[]>[])
    )
    

    bufferCount() 不同,它会立即发出。

    【讨论】:

      猜你喜欢
      • 2022-08-18
      • 2013-09-13
      • 2022-12-21
      • 2020-09-10
      • 2013-05-12
      • 2013-05-12
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多