【问题标题】:RxJS reduce a ReplaySubjectRxJS 减少一个 ReplaySubject
【发布时间】:2016-03-18 02:56:52
【问题描述】:

我正在使用 ReactiveX/RxJS 版本。

假设我有一个 Rx.ReplaySubject,它每 2 秒发出一个对象,该对象包含一个 id 和一个带有值的数组。我想减少这个值数组并得到它们的总和。

问题是 ReplaySubject 是一个热门的 observable 并且它永远不会完成,至少我不希望它完成,因为我想要每 2 秒该对象值的总和。但是为了使用 reduce 运算符,应该完成 observable。那么,我该怎么做呢?

E.G 不工作代码:

var subject = new Rx.ReplaySubject();

subject.
  map(x => x.transactions).
  // Reduce never concludes because ReplaySubject instance is not completed
  reduce((v1, v2) => v1+v2, 0).
  subscribe(function (value) {
    console.log(value)
  });

setInterval(injectData, 2000);

function injectData () {
  subject.next({id: Date.now(), transactions: [
    {value: Math.round(Math.random() * 5000)},
    {value: Math.round(Math.random() * 5000)},
    {value: Math.round(Math.random() * 5000)},
    {value: Math.round(Math.random() * 5000)},
    {value: Math.round(Math.random() * 5000)}
  ]});
}

【问题讨论】:

    标签: javascript rxjs reactivex


    【解决方案1】:

    考虑使用Observable.prototype.scan() (RxJS documentation)

    scan() 基本上聚合一个 observable 并发出每个连续的值,不像 reduce() 只在完成时发出结果。 (见scanreduce的Rx解释)

    使用 OP 代码的示例(这里是 fiddle):

    var subject = new Rx.ReplaySubject();
    
    subject
      // note: use "selectMany" to flatten observable of observables
      .selectMany(x => Rx.Observable.fromArray(x.transactions))
      // note: use "scan" to aggregate values
      .scan((agg, val) => agg+val.value, 0)
      .subscribe(function (value) {
        console.log(value)
      });
    
    setInterval(injectData, 2000);
    
    function injectData () {
      subject.onNext({id: Date.now(), transactions: [
        {value: Math.round(Math.random() * 5000)},
        {value: Math.round(Math.random() * 5000)},
        {value: Math.round(Math.random() * 5000)},
        {value: Math.round(Math.random() * 5000)},
        {value: Math.round(Math.random() * 5000)}
      ]});
    }
    

    另一个例子:

    由于selectMany(),上面的代码会为每个事务发出聚合。如果您只希望它每 2 秒发射一次,那么现在是使用 reduce() 的好时机(这是另一个 fiddle):

    subject
      // note: use "selectMany" to flatten observable of observables
      // note: using "reduce" inside here so that we only emit the aggregate
      .selectMany(x => 
        Rx.Observable
          .fromArray(x.transactions)
          .reduce((agg, val) => agg + val.value, 0)
      )
      // note: use "scan" to aggregate values
      .scan((agg, val) => agg+val, 0)
      .subscribe(function (value) {
        console.log(value)
      });
    

    补充说明:

    Rx Subjects 可以完成;准备好后,您只需致电onCompleted()。如果你完成了你的主题,你仍然可以使用reduce()。将此fiddle 与上面的进行比较。

    【讨论】:

    • 抱歉,我可能不够清楚。问题是按照你说的做,我只会得到一次价值。但我想每 2 秒获取一次值的总和。这就是我无法完成主题的原因。
    • @JoãoMosmann 使用scan(),您将每 2 秒获得一次值,尽管由于selectMany(),它也会每 2 秒为每笔交易发出一次。
    • 但是通过.scan() 我得到了所有接收到的对象的所有值的总和。但我想要的是分别得到每个对象的总和。
    • @JoãoMosmann,啊,听起来您想在没有 .scan() 的情况下执行我的第二个示例。试试看,让我知道它是否有效。
    • 完美!像魅力一样工作。谢谢!
    猜你喜欢
    • 2021-04-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-05-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多