【问题标题】:Replay series of events with timestamps using RxJS使用 RxJS 重放带有时间戳的一系列事件
【发布时间】:2018-10-01 21:39:13
【问题描述】:

如果我有一个包含 UTC 时间戳和事件数据的事件数组,如下所示: [{utcts: , 数据: , ... ];

您将如何使用 RxJS 以数组中每个项目之间的正确时间差“重播”这些事件?假设数组按 utcts 字段排序,因此第一项具有最低值。

这里有一组非常基本的入门数据:

var testdata = [
  {utcts: 1, data: 'a'},
  {utcts: 4, data: 'b'},
  {utcts: 6, data: 'c'},
  {utcts: 10, data: 'd'}
];

假设 utcts 只是从 0 秒开始重播事件开始的秒数。

【问题讨论】:

    标签: rxjs rxjs5


    【解决方案1】:

    使用delayWhen 为您提供定时回放。

    由于给定的utcts是相对(非绝对)时间,不需要刷新数据对象内的时间戳。

    我在控制台日志中添加了一个时间戳,这样我们就可以看到经过的输出时间。

    请注意,额外的几毫秒是典型的 rxjs 处理时间。

    console.clear()
    
    const testdata = [
      {utcts: 1, data: 'a'},
      {utcts: 4, data: 'b'},
      {utcts: 6, data: 'c'},
      {utcts: 10, data: 'd'}
    ];
    
    const replayData = (data) => Rx.Observable.from(data)
      .delayWhen(event => Rx.Observable.of(event).delay(event.utcts * 1000))
    
    // Show replay items with output time (in milliseconds)
    const start = new Date()
    replayData(testdata)
      .timestamp()
      .subscribe(x => console.log(x.value, 'at', x.timestamp - start, 'ms'))
      
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>

    参考delayWhen, timestamp


    这也有效,可以说更简单,不确定哪个是最好的。

    mergeMap() 将内部 observable 展平,这是应用延迟所必需的。

    const replayData = (data) => Rx.Observable.from(data)
      .mergeMap(event => Rx.Observable.of(event).delay(event.utcts * 1000))
    

    【讨论】:

    • 我一直在摆弄你的方法,但无法让它发挥作用。我用 4 个事件的示例数据更新了问题,我希望在开始后 1、4、6 和 10 秒重播这些事件。
    【解决方案2】:

    粗略的伪(直接从我的脑海中冒出来而没有运行验证)可能类似于

    Observable.scan((acc, value) => ({
       delay: delay === NaN ? value.utcts - delay,
       value
     }), { delay: NaN, value: null })
    .mergeMap(({delay, value}) => Observable.from(value).delay(delay))
    

    scan 运算符类似于reduce,但发出中间值。使用时间之间的计算差异来延迟这些值,然后在每个给定的延迟时间发出值。还有其他几种方法可以以相同的方式工作。

    【讨论】:

    • 这似乎很接近,但我不能让它工作。我在问题中添加了一些示例数据。
    【解决方案3】:

    这应该在https://rxviz.com 中工作(复制粘贴在那里):

    const { delay, mergeMap } = RxOperators;
    const { from, Observable, of } = Rx;
    
    const testdata = [
      {utcts: 0.2, data: 'a'},
      {utcts: 2.0, data: 'b'},
      {utcts: 2.8, data: 'c'},
      {utcts: 4.0, data: 'd'}
    ];
    from(testdata).pipe(
      mergeMap(event => of(event).pipe(
        delay(event.utcts * 1000)
      ))
    )
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2014-02-07
      • 1970-01-01
      • 2014-11-08
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多