【问题标题】:Cancelling expand if other event occurs如果发生其他事件,则取消展开
【发布时间】:2021-04-16 11:26:41
【问题描述】:

我是 ReactiveX 的新手,一直在尝试使用 RxJS 构建一个“CS:Go 炸弹爆炸通知器”程序。

到目前为止,我有以下内容:

// TimeSubject is the current heartbeat of the game (i.e. current client time),
// it is not emitted every 1 second (it will be emitted at least once every 30
// seconds and also whenever any other game event occurs)

// RoundSubject stores information about the current and previous round state
combineLatest([TimeSubject, RoundSubject])
    .pipe(
        // detect when the bomb is planted
        filter(([_, { current }]) => current.bomb === 'planted'),
        // create a timer object to keep track of the time until explosion
        map(([time, _]) => ({ plantTime: time, explosionTime: time + 40, currentTime: time })),
        // ignore other "bomb planted" events for the next 50 seconds
        throttleTime(50 * 1000),
        // count down until bomb is exploded
        // problem: RoundSubject can emit an event indicating the bomb got defused
        //          or the round ended before the bomb got to explode,
        //          but how do I catch that since I am throttling the events from that channel?
        expand(({ plantTime, explosionTime, currentTime }) =>
            explosionTime > currentTime
                ? of(({ plantTime, explosionTime, currentTime: currentTime + 1 }))
                    .pipe(delay(1000))
                : EMPTY)
    ).subscribe(({ plantTime, explosionTime, currentTime }) => {
        if (plantTime === currentTime)
            console.log('Bomb planted and will explode in 40 seconds!');
        else if (explosionTime >= currentTime) {
            const secondsToExplode = explosionTime - currentTime;
            console.log(`.. explodes in: ${secondsToExplode} seconds`);
        }
    });

这里的问题是RoundSubject 可以发出类似RoundEndedDefused 的事件,在这两种情况下都应该取消计时器。

目前我对可用的运营商了解得不够多,无法了解如何以一种好的方式解决此问题。此外,我觉得我的代码与expand 相当复杂,所以如果您知道更好的方法,请告诉我:-)。

谢谢。

【问题讨论】:

    标签: rxjs reactivex


    【解决方案1】:

    如有疑问,命名

    到目前为止,用 RxJS 在单个管道中编写带有 5 个以上运算符的 observables。很容易迷失剧情。

    不要害怕创建多个命名流;事情会读得更多 自然。

    // This creates a stream that emits every time the bomb's status changes to the
    // provided value.
    const bombChangedStatusTo = (status) =>
      RoundSubject.pipe(
        pluck('current'),
        distinctUntilKeyChanged('bomb'),
        filter((bombStatus) => bombStatus === status)
      );
    
    const bombPlanted$ = bombChangedStatusTo('planted');
    const bombDefused$ = bombChangedStatusTo('defused');
    

    另一个答案是正确的,expand 在这里是矫枉过正。假设我们知道开始时间,倒计时可以像映射某个间隔发出的值一样简单(请参阅最后一节,了解为什么我们实际上不需要plantTime)。

    // we use share() since we'll subscribe to this more than once, it
    // ensures that we're subscribing to the exact same interval each time
    const clockInterval$ = interval(1000).pipe(
      startWith(null), // emit immediately instead of after 1s
      map(() => Math.floor(Date.now()/1000)),
      share()
    );
    
    const countDown = (startTime) =>
      clockInterval$.pipe(
        map((currentTime) => ({
          explosionTime: startTime + 40,
          currentTime
        })),
        takeWhile(
          ({ currentTime, explosionTime }) => currentTime < explosionTime,
          true // include the emission that triggered completion
        )
      );
    

    这里我们使用exhaustMap 来确保每个“炸弹”只运行一个计时器 种植”事件(见 docs)。没有必要 使用throttleTime,这将给我们两个计时器计数到40,而不是仅仅 一个。

    const bombClock$ = bombPlanted$.pipe(
      withLatestFrom(clockInterval$), // <-- reusing the shared clock
      exhaustMap(([_, plantTime]) =>
        countDown(plantTime).pipe(
          takeUntil(bombDefused$) // stop the timer if the bomb is defused
        )
      )
    );
    

    如果我们使用bombPlanted$ 触发“炸弹被植入”副作用,我们不会 不再需要将plantTime 作为bombClock$ 值上的属性传递

    bombPlanted$.subscribe(() => {
      console.log('Bomb planted and will explode in 40 seconds!');
    });
    
    bombClock$.subscribe(({ explosionTime, currentTime }) => {
      if (explosionTime >= currentTime) {
        const secondsToExplode = explosionTime - currentTime;
        console.log(`.. explodes in: ${secondsToExplode} seconds`);
      } else {
        console.log('The bomb has exploded');
      }
    });
    

    【讨论】:

    • 非常感谢@backtick,多么棒的解释:-)。一个问题(也是我最终使用expand 的原因):TimeSubject 不会每秒发出一个事件。它会在其他事件进入时发出(即,当炸弹被放置/拆除或回合结束时)并且至少每 30 秒发出一次。对此有什么提示吗?我编辑了问题以明确这一点。
    • 啊,好的!它应该像用interval(1000).pipe(map(() =&gt; Date.now()/1000)) 替换countdown 实现中的TimeSubject 一样简单。假设您想要以秒为单位的当前时间。您甚至可以将其命名为clock$!我不会在这里讨论一两个微妙之处,但当我回到电脑前时,我会更新答案。
    • 我已根据您的反馈更新了答案。
    【解决方案2】:

    如果您想捕获RoundEndedDefused 事件,您可以创建一个新的流来侦听这些事件并在需要时取消计时器。

    let cancelStream$ = RoundedSubject.pipe(
      filter(({current}) => current.bomb === 'RoundEnded' || current.bomb === 'Defused'),
      
    )
    
    // TimeSubject is the current heartbeat of the game (i.e. current client time)
    // RoundSubject stores information about the current and previous round state
    combineLatest([TimeSubject, RoundSubject])
        .pipe(
            takeUntil(cancelStream$),
            // detect when the bomb is planted
            filter(([_, { current }]) => current.bomb === 'planted'),
            // create a timer object to keep track of the time until explosion
            map(([time, _]) => ({ plantTime: time, explosionTime: time + 40, currentTime: time })),
            // ignore other "bomb planted" events for the next 50 seconds
            throttleTime(50 * 1000),
            // count down until bomb is exploded
            // problem: RoundSubject can emit an event indicating the bomb got defused
            //          or the round ended before the bomb got to explode,
            //          but how do I catch that since I am throttling the events from that channel?
            expand(({ plantTime, explosionTime, currentTime }) =>
                explosionTime > currentTime
                    ? of(({ plantTime, explosionTime, currentTime: currentTime + 1 }))
                        .pipe(delay(1000))
                    : EMPTY)
        ).subscribe(({ plantTime, explosionTime, currentTime }) => {
            if (plantTime === currentTime)
                console.log('Bomb planted and will explode in 40 seconds!');
            else if (explosionTime >= currentTime) {
                const secondsToExplode = explosionTime - currentTime;
                console.log(`.. explodes in: ${secondsToExplode} seconds`);
            }
        });

    【讨论】:

      【解决方案3】:

      首先:寻求解决方案

      这里是你可能做的一个快速模型:

      /**********
       * Custom Operator to throttle only specific emissions
       *********/
      function priorityThrottleTime<T>(
        thrTime: number,
        priorityStr = "priority"
      ): MonoTypeOperatorFunction<T> {
        return s => defer(() => {
          const priorityTimeStamp = new Map<string, number>();
          return s.pipe(
            filter(v => Date.now() - (
              priorityTimeStamp.get(v[priorityStr]) ||
              0) >= thrTime
            ),
            tap(v => {
              if(v[priorityStr] != null){
                priorityTimeStamp.set(
                  v[priorityStr],
                  Date.now()
                )
              }
            })
          );
        });
      }
      
      // TimeSubject is the current heartbeat of the game (i.e. current client time)
      // RoundSubject stores information about the current and previous round state
      roundSubject.pipe(
        // detect when the bomb is planted, map to priority: 1, 
        // otherwise map without priority
        map(round => 
          round.current.bomb === 'planted' ?
          ({priority: 1, payload: round}) :
          ({payload: round})
        ),
        // same prioroty events ("bomb planted" events) 
        // ignored for the next 50 seconds
        priorityThrottleTime(50 * 1000),
        // Throttling is done, get our payload back
        map(({payload}) => payload),
        // create a new observable depending on what the round is doing
        switchMap(({current}) => 
          current.bomb !== 'planted' ? 
          EMPTY :
          timeSubject.pipe(
            // Grab the next heartbeat
            take(1),
            // create a timer object to keep track of the time until explosion
            map(time => ({ 
              plantTime: time, 
              explosionTime: time + 40, 
              currentTime: time 
            })),
            // count down until bomb is exploded
            expand(({ plantTime, explosionTime, currentTime }) =>
              currentTime > explosionTime ?
              EMPTY :
              of(({ 
                plantTime, 
                explosionTime, 
                currentTime: currentTime + 1 
              })).pipe(delay(1000))
            )
          )
        )
      ).subscribe(({ plantTime, explosionTime, currentTime }) => {
        if (plantTime === currentTime)
          console.log('Bomb planted and will explode in 40 seconds!');
        else if (explosionTime >= currentTime) {
          const secondsToExplode = explosionTime - currentTime;
          console.log(`.. explodes in: ${secondsToExplode} seconds`);
        }
      });
      

      一些解释

      '我想取消基于某些上游事件的正在进行的 observable' 模式应该始终将您指向switchMap

      在上面的 sn-p 中,任何没有被限制的事件要么启动一个新的炸弹,要么什么都不做。无论哪种方式,switchMap 都会取消任何正在进行的炸弹(如果有的话)。

      您可能会发现有很大的空间可以改变这种行为,但我不确定您想要什么,所以我把它留给您。

      priorityThrottleTime

      priorityThrottleTime 对于您需要的东西来说确实有点过头了,但我很久以前就写过它,没有时间简化。您可以重新编写它以获取谓词,并且仅在谓词返回 true 时进行节流。这样您就可以避免映射进出上面看到的 ({priority, payload}) 对象的麻烦。

      展开

      Expand 是一个相当臃肿的计时器工具。这不是一个真正的问题,但我可以通过计算您已经维护的 timeSubject 的 40 秒来简化它。

      这样您就不需要检查每次迭代。

      timeSubject.pipe(
        // first emissions is time 0,  
        // then take 40 seconds, then stop
        take(41),
        // create the timer object
        map((currentTime, timePassed) => ({
          plantTime: currentTime - timePassed,
          explosionTime: 40 - timePassed,
          currentTime
        }))
      )
      

      这确实假设你的 timeSubject 每秒抽动一次,否则你可以改变的是有点像这样:

      timeSubject.pipe(
        take(1),
        // create a timer that tics every second
        switchMap(plantTime => timer(0, 1000).pipe(
          // take 40 seconds, then stop
          take(41),
          // create a timer object
          map(timePassed => ({
            plantTime,
            explosionTime: plantTime + 40,
            currentTime: plantTime + timePassed,
          }))
        ))
      )
      

      更新

      好的,这是一种新的节流方法,尽管我还没有真正测试过它。虽然它会简化你的代码,所以也许值得一试。

      function throttleTimeOn<T>(
        thrTime: number,
          pred: (x:T) => boolean
      ): MonoTypeOperatorFunction<T> {
        return s => defer(() => {
          let throttleTimeStamp = 0;
          return s.pipe(
            filter(v => {
              const isThrot = pred(v);
              if(!isThrot) return true;
              else return Date.now() - 
                throttleTimeStamp >= thrTime;
            }),
            tap(v => { if(pred(v)) {
              throttleTimeStamp = Date.now();
            }})
          );
        });
      }
      

      这里正在使用:

      roundSubject.pipe(
        // events that meet predicate ("bomb planted" events) 
        // ignored for the next 50 seconds
        throttleTimeOn(
          50 * 1000,
          ({current}) => current.bomb === 'planted'
        ),
        // create a new observable depending on what the round is doing
        switchMap(({current}) => 
          current.bomb !== 'planted' ? 
          EMPTY :
          timeSubject.pipe(
            // first emissions is time 0,  
            // then take 40 seconds, then stop
            take(41),
            // create the timer object
            map((currentTime, timePassed) => ({
              plantTime: currentTime - timePassed,
              explosionTime: 40 - timePassed,
              currentTime
            }))
          )
        )
      ).subscribe(({ plantTime, explosionTime, currentTime }) => {
        if (plantTime === currentTime)
          console.log('Bomb planted and will explode in 40 seconds!');
        else if (explosionTime >= currentTime) {
          const secondsToExplode = explosionTime - currentTime;
          console.log(`.. explodes in: ${secondsToExplode} seconds`);
        }
      });
      

      【讨论】:

        猜你喜欢
        • 2010-12-09
        • 1970-01-01
        • 1970-01-01
        • 2010-12-15
        • 1970-01-01
        • 2014-01-26
        • 2012-09-20
        • 2014-08-14
        • 2017-05-02
        相关资源
        最近更新 更多