【发布时间】:2015-09-08 18:00:38
【问题描述】:
我需要在具有以下特征的流上创建一个新的实例运算符
-
签名
Rx.Observable.prototype.scan_with_reset(accumulator, seed$)在哪里:
参数
accumulator (Function):要在每个元素上调用的累加器函数。seed$ (Observable):一个可观察对象,其值将用于重新启动累加器功能。累加器函数具有以下签名function accumulator_fn(accumulator_state, source_value)。我希望seed$中的值将accumulator_state重置为seed值并发出seed值。退货 (Observable) :一个可观察的序列,它是由 comonadic 绑定操作产生的(无论如何,我在这里复制
Rxjs文档)。比。正常的scan运算符,这里发生的是,当累加器函数从seed$observable 发出的种子值“重新启动”时,该种子值被发出,下一个值由scan_with_reset发出运营商将是accumulator_fn(seed, source_value) -
使用示例:
var seed$ = Rx.Observable.fromEvent(document, 'keydown') .map(function(ev){return ev.keyCode}) .startWith(0); var result$ = counter$.scan_with_reset(seed$, function accumulator_fn (acc, counter) {return acc+counter});以下图表应更详细地解释预期结果:
seed : 0---------13--------27------------ counter : -1--5--2----6---2-----4---1---3--- result : 0-1--6--8-13-19--21-27-31--32--35-
我最初的尝试是修改accumulator_fn 以让seed$ 修改一个变量,该变量将在accumulator_fn 的范围内,这样我就可以检测到函数本身的变化。
我在这里追求两个目标:
- 有一个尽可能无状态和无闭包的实现
- 了解定义自己的运算符背后的机制 流,希望这是一个简单的示例
我查看了scan 源代码:https://github.com/Reactive-Extensions/RxJS/blob/master/src/core/linq/observable/scan.js
但我不知道从那里去哪里。
有人有创建 Rxjs 流操作符的经验吗?有哪些要遵循的约定和要避免的陷阱?有没有我可以看的定制运营商的例子?您将如何实施这个特定的?
[更新] : 接受答案的一些测试代码
var seed$ = Rx.Observable.fromEvent(document, 'keydown')
.map(function(ev){return ev.keyCode})
.startWith(0);
var counter$ = Rx.Observable.fromEvent(document, 'mousemove')
.map(function(ev){return 1});
var result$ = counter$.scanWithReset(seed$,
function accumulator_fn (acc, counter) {return acc+counter});
var s = function (x) {console.log("value: ", x)};
var disposable = result$.subscribe(s)
移动鼠标应该显示一个值增加 1,并且按下一个键应该重新启动计数器,并按下键的值。
【问题讨论】:
标签: rxjs