【问题标题】:Can the RxJs operator groupBy leak memory?RxJs 运算符 groupBy 会泄漏内存吗?
【发布时间】:2018-04-05 00:03:26
【问题描述】:

我正试图围绕 RxJs 运算符 groupBy 的用例展开思考,我担心在某些情况下它可能会导致内存泄漏。

我熟悉传统意义上的 groupBy(例如同步列表处理)。我准备写一个groupBy函数来参考:

const groupBy = f => list =>
  list.reduce((grouped, item) => {
    const category = f(item);
    if (!(category in grouped)) {
      grouped[category] = [];
    }
    grouped[category].push(item);
    return grouped;
  }, {});

const oddsAndEvens = x => x % 2 === 0 ? 'EVEN' : 'ODD';

compose(
  console.log,
  groupBy(oddsAndEvens)
)([1,2,3,4,5,6,7,8])

// returns: { ODD: [ 1, 3, 5, 7 ], EVEN: [ 2, 4, 6, 8 ] }

请注意,这在更广泛的范围内是无状态的。我假设 RxJs 做了与此类似的事情,其中​​ EVEN 和 ODD 将返回 observables,并且它以类似集合的方式有状态地跟踪组。如果我错了,请纠正我,重点是我认为 RxJs 必须维护所有分组的有状态列表。

我的问题是,如果分组值的数量(在这个例子中只有 EVEN 和 ODD)不是有限的会发生什么?例如,为您提供唯一标识符以在流的整个生命周期内保持一致性的流。如果您要按此标识符进行分组,RxJs 的 groupBy 操作员是否会继续制作越来越多的组,即使旧的标识符将永远不会再被重新访问?

【问题讨论】:

  • 我认为这不是特定于groupBy。在无限列表上累积结果的所有操作都有可能在某个时候耗尽内存。

标签: functional-programming rxjs reactive-programming


【解决方案1】:

如果您的流是无限的并且您的 Key Selector 可以产生无限组,那么 - 是的,您有内存泄漏。

您可以为每个分组的 observable 设置一个持续时间选择器持续时间选择器是为每个组创建的,并在组到期时发出信号。

rxjs 5+:groupBy 第三个参数。

rxjs 4:改用 groupedByUntil 运算符。

这是一个无限流的示例,其中每个分组的 Observable 在 3 秒后关闭。

Rx.Observable.interval(200)
  .groupBy(
    x => Math.floor(x / 10),
    x => x,
    x$ => Rx.Observable.timer(3000).finally(() => console.log(`closing group ${x$.key}`))
  )
  .mergeMap(x$ => x$.map(x => `group ${x$.key}: ${x}`))
  .subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.8/Rx.js"></script>

【讨论】:

    【解决方案2】:

    我的问题是,如果分组值的数量(在这个例子中只有偶数和奇数)不是有限的会发生什么?

    这只能在无限流中发生(因为不能有比源流上的值更多的组)。答案很简单:您将不断创建新的 observables。

    每个GroupedObservable 的寿命与源完全相同(当源完成时,组完成),如您所见in the docs

    从技术上讲,这里没有内存泄漏,因为您正在积极地观察一个无限的 observable。一旦源 observable 完成,所有组也将完成:

    source$
      .takeUntil(stop$)
      .groupBy(…)
    

    但从技术层面来说:将一个无限的 observable 分组到一个独特的属性上而不从源头取消订阅不会对你的内存使用有很大的帮助,不。

    如果您按此标识符进行分组,RxJs 的 groupBy 操作员是否会继续创建越来越多的组,即使旧的标识符将永远不会被再次访问?

    这里要指出的是,rxjs 对此无能为力。它无法知道一个组是否已经完成,或者它是否会在稍后的某个时间收到另一个值。

    【讨论】:

      猜你喜欢
      • 2013-04-20
      • 1970-01-01
      • 1970-01-01
      • 2021-09-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-05-08
      • 2015-11-17
      相关资源
      最近更新 更多