【问题标题】:RXjs : How to create an operator on streams : scan operator where the accumulator state can be reset through an observableRXjs:如何在流上创建运算符:扫描运算符,其中累加器状态可以通过 observable 重置
【发布时间】:2015-09-08 18:00:38
【问题描述】:

我需要在具有以下特征的流上创建一个新的实例运算符

  • 签名 Rx.Observable.prototype.scan_with_reset(accumulator, seed$)

    在哪里:

    参数

    accumulator (Function):要在每个元素上调用的累加器函数。

    seed$ (Observable) :一个可观察对象,其值将用于重新启动累加器功能。累加器函数具有以下签名function accumulator_fn(accumulator_state, source_value)。我希望seed$ 中的值将accumulator_state 重置为seed 值并发出seed 值。

    退货 (Observable) :一个可观察的序列,它是由 comonadic 绑定操作产生的(无论如何,我在这里复制 Rxjs 文档)。比。正常的scan 运算符,这里发生的是,当累加器函数从seed$ observable 发出的种子值“重新启动”时,该种子值被发出,下一个值由scan_with_reset 发出运营商将是accumulator_fn(seed, source_value)

  • 使用示例:

    var seed$ = Rx.Observable.fromEvent(document, 'keydown') .map(function(ev){return ev.keyCode}) .startWith(0); var result$ = counter$.scan_with_reset(seed$, function accumulator_fn (acc, counter) {return acc+counter});

    以下图表应更详细地解释预期结果: seed : 0---------13--------27------------ counter : -1--5--2----6---2-----4---1---3--- result : 0-1--6--8-13-19--21-27-31--32--35-

我最初的尝试是修改accumulator_fn 以让seed$ 修改一个变量,该变量将在accumulator_fn 的范围内,这样我就可以检测到函数本身的变化。

我在这里追求两个目标:

  • 有一个尽可能无状态和无闭包的实现
  • 了解定义自己的运算符背后的机制 流,希望这是一个简单的示例

我查看了scan 源代码:https://github.com/Reactive-Extensions/RxJS/blob/master/src/core/linq/observable/scan.js 但我不知道从那里去哪里。

有人有创建 Rxjs 流操作符的经验吗?有哪些要遵循的约定和要避免的陷阱?有没有我可以看的定制运营商的例子?您将如何实施这个特定的?

[更新] : 接受答案的一些测试代码

 var seed$ = Rx.Observable.fromEvent(document, 'keydown')
                          .map(function(ev){return ev.keyCode})
                          .startWith(0);
 var counter$ = Rx.Observable.fromEvent(document, 'mousemove')
                             .map(function(ev){return 1});
 var result$ = counter$.scanWithReset(seed$,
                                      function accumulator_fn (acc, counter) {return acc+counter});
 var s = function (x) {console.log("value: ", x)};
 var disposable = result$.subscribe(s)

移动鼠标应该显示一个值增加 1,并且按下一个键应该重新启动计数器,并按下键的值。

【问题讨论】:

    标签: rxjs


    【解决方案1】:

    作为创建运算符的一般情况,通常最容易使用Observable.create 方法,该方法本质上定义了Observable 在订阅或仅包装现有的一组运算符时应如何表现share

    当您更深入地了解性能时,还有其他一些考虑因素(Observable.create 在规模上并不是非常有效),您可以考虑创建一个自定义的Observable,例如map

    对于您的情况,我现在推荐前者。我会认为您的问题实际上是几个独立的流,我们希望将它们扁平化为单个流。触发重置时,每个新流都将开始。这对我来说真的很像flatMap

    Rx.Observable.prototype.scanWithReset = function ($reset, accum, seed) {
        var source = this;
        //Creates a new Observable
        return Rx.Observable.create(function (observer) {
            //We will be reusing this source so we want to make sure it is shared
            var p = source.publish();
    
    
            var r = $reset
                //Make sure the seed is added first
                .startWith(seed)
                //This will switch to a new sequence with the associated value
                //every time $reset fires
                .flatMapLatest(function (resetValue) {
                  //Perform the scan with the latest value
                  return source.scan(accum, resetValue);
            });
    
            //Make sure every thing gets cleaned up
            return new Rx.CompositeDisposable(
                r.subscribe(observer),
                //We are ready to start receiving from our source
                p.connect());
        });
    }
    

    【讨论】:

    • 谢谢!这很有效。我确实理解代码,但我很难考虑使用publishconnectCompositeDisposable,尽管现在看起来很明显。修改scan 源代码似乎是通过命令式编码和更少的Rxjs 内部知识来包含我的逻辑的最简单方法。但是,是的,这种方式更容易理解。
    • 只是想纠正一个我当时忘记提及的小错误/错别字,最近又困扰我:而不是source.scan(accum, resetValue),应该是p.scan(accum, resetValue)
    • @paulpdaniels 你能更新一下 RxJs 6+ 吗?
    猜你喜欢
    • 2021-06-11
    • 1970-01-01
    • 1970-01-01
    • 2019-10-13
    • 2016-12-02
    • 2012-12-09
    • 1970-01-01
    • 2017-04-18
    相关资源
    最近更新 更多