【问题标题】:RxJs Interval with takeUntil to publish last valueRxJs 间隔与 takeUntil 发布最后一个值
【发布时间】:2019-01-16 22:19:08
【问题描述】:

我有一些代码会轮询直到任务完成

见下文

this.simulationStatus =
  interval(2000).pipe(
    switchMap(
      () => from(this.simulationService.getSimulationStatus(this.route.snapshot.paramMap.get('jobId')))),
    takeUntil(this.stopPoll),
    tap(simulation => {
      if (simulation && simulation.complete) {
        if (this.stopCount == 1) {
          // Get once after complete
          this.stopPoll.next(true);
        }
        this.stopCount++;
      }
    })
  );

我曾尝试使用 takeUntil 和 takeWhile,但问题是一旦任务完成,最后一个值就永远不会发布。

为了解决这个问题,我必须在 stopPoll 主题中包含 tap 方法,并增加 stopCount 以获取最后一个值。

所以上面的工作,但感觉有点乱,我敢肯定一定有更好的方法来实现这一点?

我本来希望 takeUntil 发布最后一个值或有一个覆盖来告诉它,例如 takeUntil(observable, {publishLast: true})

BTW 更新,可观察对象由 Angular 6 模板订阅 提前致谢

【问题讨论】:

    标签: angular6 rxjs6 rxjs-pipeable-operators


    【解决方案1】:

    这同时在 rxjs 中实现为 takeWhile(condition, ?inclusive):

    timer(0, 10).pipe(
        takeWhile((x) => x < 3, true)
    )
    

    发射 0、1、2、3

    【讨论】:

      【解决方案2】:

      您可以做的一件事是使用自定义的类似 takeWhile 的运算符,如下所示:

      const completeWith = <T>(predicate: (arg: T) => boolean) => (
        source: Observable<T>,
      ) =>
        new Observable<T>(observer =>
          source.subscribe(
            value => {
              observer.next(value);
              if (predicate(value)) {
                observer.complete();
              }
            },
            error => observer.error(error),
            () => observer.complete(),
          ),
        );
      

      将其视为 takeWhite 的变体似乎不是一个好主意,因为它不仅采用条件成立,而且还发出额外的价值。

      一个更优雅的解决方案可能是让模拟状态 observable 发出两种值:下一个通知和完成通知,类似于物化/非物化操作符的工作方式。

      【讨论】:

      • 看起来不错,我可能会用这个。我同意你的 cmets 关于 takeWhile 的名称。但是,我正在使用 takeUntil 并且我认为鉴于它的名称,它有一个重载选项来发出最后一个值是有道理的。看看 StackOverflow,其他人肯定遇到过这种情况,需要内置这个功能。
      • 我找到的最优雅的解决方案。谢谢!
      【解决方案3】:

      如果你想完成 observable,你也可以使用 next() 创建 subject 和 emit。

      this.stopPoll: Subject<any> = new Subject<any>();
      

      如果您想完成订阅。你可以调用 this.stopPoll.next(true);

      你可以在subscribe()中访问数据

      this.simulationStatus.subscribe(success=>{}, failure=>{}, complete=>{});
      

      【讨论】:

      • 是的,这正是我正在做的。并在 tap 方法中调用 this.stopPoll.next(true),但我有 7 甚至 9 行额外的代码,包括 stopCount 和 stopPoll 的声明,只是因为默认情况下未发布最后一个值。
      • 可以在complete方法中获取最后一个值。即 subscribe() 的第三个选项
      • 另外,这是一个订阅 observable 的角度模板,可能也应该提到这一点,但是是的,我可以手动订阅,它可能更干净
      • 订阅完成 => {} 将起作用,但是我失去了与模板的异步绑定,因此我需要再次将其推送到模板绑定到的另一个主题上。所以还是有点乱:/!要么,要么失去效率低下的 OnPush changeDetectionStrategy
      • 用 takeWhile stackoverflow.com/questions/44641246/… 发现这个所有解决方案都是 hacky。我建议这是 rxjs 中缺少的功能
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-02-16
      • 2017-12-22
      • 1970-01-01
      • 2018-01-18
      • 1970-01-01
      相关资源
      最近更新 更多