【问题标题】:RxJS: How to not subscribe to initial value and/or undefined?RxJS:如何不订阅初始值和/或未定义?
【发布时间】:2015-04-03 14:11:46
【问题描述】:

作为 RxJS 的新手,我经常创建一个具有未来价值的主题,但最初是 undefined。第一次只能是undefined。我目前使用filter 来跳过undefined 值,但这非常麻烦,因为我只需要一次无处不在。 (也许我在这里做错了什么?)我可以在mySubject 通过onNext 获得第一个值之后以某种方式订阅它吗?

var mySubject = new Rx.BehaviorSubject(undefined);

mySubject.filter(function(value) {
  return value !== undefined;
}).subscribe(function(value) {
  // do something with the value
});

【问题讨论】:

  • 还有一个 skip 方法,您可以调用 observable 跳过序列中任意数量的前导项。
  • @Will - 你能告诉我如何使用跳过订阅 BehaviorSubject 吗?

标签: javascript rxjs


【解决方案1】:

使用new Rx.ReplaySubject(1) 而不是BehaviorSubject

【讨论】:

  • 再次感谢您。第三次……?我确实阅读了 RxJS 文档和一些指南,但有时真的很难为某个问题找到解决方案。
  • 是的,要记住的东西很多
  • 这不再是强相关;从 RxJS 5 开始,ReplaySubject 在首次回答这个问题时失去了 BehaviorSubject 的一些方面。例如,如果您真的想要一个同步快照值,您将返回使用.skip(1) (stackoverflow.com/a/44884189/608220) 或订阅您自己的缓存。另请注意,完成时的功能已更改。
  • 如果您不想要过时的值,请使用主题。当你确实想要一个陈旧的值时,ReplaySubject os。而且与 BehaviorSubject 不同的是,它可以在没有起始值的情况下构造
  • ReplaySubject(1) 为我工作,无需跳过。我需要它是因为,我最终可能会在调用 .next(...) 之后订阅主题(通过可观察对象返回单个对象的准异步函数)。也许我应该只使用准系统 Promise 代替?
【解决方案2】:

正如Will 所提到的,您应该能够使用跳过运算符跳过第一个值:

var mySubject = new Rx.BehaviorSubject(undefined);

mySubject.pipe(skip(1)).subscribe(function(value) {
  // do something with the value
});

【讨论】:

  • 这种方法的问题是如果subscribe 调用是在第一个实际值发出后进行的,它将跳过一个有效值。
  • @JackA。但这也是这种方法的好处:-)
  • @Simon_Weaver 取决于您的用例。如果您只想在订阅时接收“未来”值,那么这种方法将起作用;如果您想在订阅时收到最新的“过去”值,那么它不会。很难说 OP 想要哪个用例,但当我遇到这个时,我肯定想要后者。
【解决方案3】:

mySubject.pipe( skipWhile( v => !v ) );

【讨论】:

  • 应用skipWhile后,它变成了一个Observable,所以它没有说明你需要一个BehaviorSubject的真实用例。
【解决方案4】:

目前我正在使用filter operator,但我不知道它是否是一个好的解决方案:

var mySubject = new Rx.BehaviorSubject().filter(x => !!x);

mySubject.subscribe(value => { /* will receive value from below */);

mySubject.next('value');

mySubject.subscribe(value => { /* also receives the value */ });

【讨论】:

  • 在应用filter之后,它变成了一个Observable,所以它并没有说明你需要一个BehaviorSubject的真实用例。
【解决方案5】:

我发现这在 RxJS 和 RxSwift 中都令人沮丧。 (想要一个价值主题并结合等待第一个价值的能力)。

对于 JS,我目前只是在主题中隐藏了一个过滤版本,如下所示:

    let mySubject = new Rx.BehaviorSubject();
    mySubject.wait = mySubject.pipe(filter(v=>v!==undefined));

因此,主题仍会公开以供发布,但客户不必重复过滤器。

    mySubject.wait.subscribe((v)=>{...}); 

【讨论】:

    【解决方案6】:

    有时会需要 behaviorSubject,其中初始值无关紧要,在流中工作时异步需要当前值, 在我们的例子中,多个链承诺在处理或从流中的任何位置获取数据期间通过用户取消来处理。

    这可以通过以下方式实现。

    // for user related commands
    this.commandSource = new BehaviorSubject(CONTINUE);
    // filtering over initial value which is continue to make it as a different pipe
    const stopPipe = commandSource.pipe(filter(val => val === STOP));
    const fetchStream = Observable.fromPromise(this.fetchDetails);
    
    merge(fetchStream, stopPipe).pipe(
     take(1),
     takeWhile(() => commandSource.value === CONTINUE),
     concatMap((response) => {
      // fetch Another response you can return promise directly in concatMap
      // return array of response [1 ,2 ,3];
      return this.fetchYetAnotherDetails;
     }),
     // we can add this to stop stream in multiple places while processing the response
     takeWhile(() => commandSource.value === CONTINUE),
     // triggers parallelly values from the concatMap that is 1, 2 , 3
     mergeMap(() => // massage the response parallelly using )
     finalize(() => thi
      commandSource.complete())
    ).subscribe(res => {
     // handle each response 1, 2, 3 mapped
    }, () => {
     // handle error
    }, () => {
     // handle complete of the stream
    });
    
    // when user, clicks cancel, this should stop the stream.
    commandSource.next(STOP)
    

    【讨论】:

      猜你喜欢
      • 2017-04-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多