【问题标题】:Handling a stream of events based on timing in rxjs根据 rxjs 中的时间处理事件流
【发布时间】:2017-11-05 10:58:42
【问题描述】:

我有一个进程会每隔一段时间向我发送数据包,我需要根据数据包到达的时间等来管理该流。在某些时候,我也会关闭流和进程。

现在,我正在使用一组计时器来执行此操作,但我希望我可以使用 rxjs 来执行此操作,因为它似乎非常适合这种事情。到目前为止,我还没有取得太大的成功。

问题

流应该定期向我发送数据包,但它通常会偏离很多,有时会卡住。

我想在以下情况下关闭流:

  1. 如果超过startDelay 将第一个数据包发送给我。
  2. 第一个数据包发送后,如果两个数据包之间有超过middleDelay的停顿。
  3. 经过一段固定时间maxChannelTime

当我由于上述任何原因即将关闭流时,我首先要求它礼貌地关闭,以便它可以进行一些清理。有时它还会在清理过程中向我发送最终数据包。但我希望在关闭流并忽略更多消息之前等待清理和最后一个数据到达的时间不超过cleanupTime

细化

我将通过使用 Observable 包装事件来创建“流”。我这样做没有问题。

通过“关闭”一个流,我的意思是告诉进程停止发送数据,并可能关闭(即死亡)。

【问题讨论】:

    标签: javascript node.js asynchronous rxjs


    【解决方案1】:

    棘手的问题。

    我将其分为两个阶段 - “调节”(因为我们要检查定期间隔)和“清理”。

    向后工作,输出是

    const regulated = source.takeUntil(close)
    const cleanup = source.skipUntil(close).takeUntil(cleanupCloser)
    const output = regulated.merge(cleanup)
    

    'Closers' 是在需要关闭时发出的 observables(每个超时值关闭一个)。

    const startTimeout = 600
    const intervalTimeout = 200
    const maxtimeTimeout = 3000
    const cleanupTimeout = 300
    
    const startCloser = Observable.timer(startTimeout)  // emit once after initial delay
      .takeUntil(source)                                // cancel after source emits
      .mapTo('startTimeoutMarker')
    
    const intervalCloser = source.switchMap(x =>    // reset interval after each source emit
        Observable.timer(intervalTimeout)           // emit once after intervalTimeout
          .mapTo('intervalTimeoutMarker')
      )
    
    const maxtimeCloser = Observable.timer(maxtimeTimeout)  // emit once after maxtime
      .takeUntil(startCloser)                               // cancel if startTimeout
      .takeUntil(intervalCloser)                            // cancel if intervalTimeout
      .mapTo('maxtimeTimeoutMarker')
    
    const close = Observable.merge(startCloser, intervalCloser, maxtimeCloser).take(1)
    
    const cleanupCloser = close.switchMap(x =>      // start when close emits
         Observable.timer(cleanupTimeout)           // emit once after cleanup time
      ) 
      .mapTo('cleanupTimeoutMarker')
    

    这是一个工作示例CodePen(请一次运行一个测试)

    【讨论】:

    • 很好的解决方案!谢谢你。如果需要,我还可以查看将来如何添加其他关闭条件。
    【解决方案2】:

    如果不知道如何使用 RxJS 创建“流”或以后想如何使用它们,很难给出任何建议。

    一般而言,您只需使用takeUntil()switchMap()timeout() 即可实现您想要的。

    Observable.defer(...)
      .startWith(undefined) // Trigger the first `timeout`
      .switchMap((val, i) => {
        if (i === 0) { // waiting for the first value
          return Observable.of().timeout(startDelay);
        } else {
          return Observable.of(val).timeout(middleDelay);
        }
      })
      .takeUntil(Observable.timer(maxChannelTime));
    

    我不知道你说的“在某个时候关闭流”是什么意思。你期待errorcomplete 通知吗?此解决方案将在超时到期时发出 error 并在 takeUntil 发出时发出 complete

    【讨论】:

      【解决方案3】:

      最后,这就是我所做的。我的回答主要基于 Richard Matsen 的回答,所以我将他的回答保留为已接受。

      结果证明我需要做一些额外的更改。

      此代码是接收数据消息流并返回包含所有收集的数据和终止原因的单例 observable 的代码。

      let startCloser$ = Observable.timer(this.options.maxStartDelay).takeUntil(dataStream$).mapTo(TerminationReason.StartTimeout);
      
      let intervalCloser$ = dataStream$.switchMap(x => Observable.timer(this.options.timeBetweenPackets).mapTo(TerminationReason.Inactivity));
      
      let maxTimeCloser$ = Observable.timer(this.options.totalConnectionTime).takeUntil(startCloser$).takeUntil(intervalCloser$).mapTo(TerminationReason.ChannelTimeout);
      
      //we need to publishReplay it so we can get the reason afterwards...
      let close$ = startCloser$.merge(intervalCloser$, maxTimeCloser$).take(1).publishReplay(1);
      //basically treating close$ like a promise
      close$.connect();
      
      //cleanupAction has side-effects so it must only be subscribed to once.
      let cleanupAction$ = Observable.defer(async () => {
          //it's just a promise that yields nothing and waits until requestTermination has terminated
          //requestTermination is an async function and it already has a timeout thing in promise-language
          await this.requestTermination();
      });
      
      let result$ = dataStream$.takeUntil(close$).concat(dataStream$.takeUntil(cleanupAction$)).toArray().switchMap(arrs => {
          //switchMap will only resolve once because the observable is a singleton
      
          return close$.map(reason => {
              //this should fire immediately because close is publishReplay(1) and has already happened
              let totalArr = _.flattenDeep(arrs);
              return {
                  reason : reason,
                  data : totalArr
              }
          })
      });
      
      return result$;
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2018-01-11
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-07-12
        • 2015-04-13
        • 1970-01-01
        相关资源
        最近更新 更多