【问题标题】:Creating an Observable using startWith from an Observable从 Observable 中使用 startWith 创建 Observable
【发布时间】:2017-05-14 08:50:17
【问题描述】:

我正在使用另一个 Observable 的输入过滤 Observable - 过滤的输入来自用户。

过滤是使用 RxJS 运算符 combineLatest 完成的。使用这意味着在订阅此流时,在两个源 Observables 都发出发射之前不会发射任何值 - 我希望创建的流在创建时发射(没有任何过滤),在任何用户输入发生之前。

我认为我应该使用startWith 运算符,以便流在创建时产生发射,但我不知道如何从 Observable 中播种。使用 Observable,因为数据来自 Firebase,并由 FirebaseListObservable's 处理。

下面是我目前正在做的拼凑版本。

let tagInput = document.getElementById('tags');
let tagExclusionStream = Observable
  .fromEvent(tagInput, 'input')
  .map((e: any) => createsArrayFromInput(e.target.value));

let allTags: Observable<any[]> = getAllTags();

let filteredTags = allTags
  .combineLatest(tagExclusionStream, (tags, tagExclusions) => {
     return tags.filter((tag: any) => tagExclusions.indexOf(tag.$key) == -1)
  });

// I want this to print out without needing the tagExclusionStream to emit first 
filteredTags.subscribe(tags => console.log("Tags:", tags))

如果我在这里的方法完全关闭/有更好的方法,因为我是 RxJS 的新手,请告诉我。

【问题讨论】:

    标签: angular rxjs rxjs5


    【解决方案1】:

    startWith 只是使用concat internally。 也一样

    const startValue$ = of('start with this')
    
    concat(startValue$, source$)
    

    或者编写你自己的操作符

    function startFrom<T, O extends ObservableInput<any>>(start: O): OperatorFunction<T, T | ObservedValueOf<O>> {
      return (source: Observable<T>) => concat(start, source)
    }
    
    source$.pipe(
      startFrom(startValue$)
    )
    

    确保 startValue$ 完成

    一旦startValue$ 完成,新的 Observable 将切换到 source$。 因此,如果 Observable 一开始是一个长期存在的 Observable,例如ReplaySubject 你只关心你必须确保你传入的 Observable 完成的最新值,例如take(1)

    const { ReplaySubject, concat, of } = rxjs;
    const { take } = rxjs.operators;
    
    const source$ = of('source-1', 'source-2')
    const replay = new ReplaySubject(1)
    replay.next('subject-1')
    replay.next('subject-2')
    
    const startWith$ = replay.pipe(take(1)) // <-- take one item and complete
    const o$ = concat(startWith$, source$)
    o$.subscribe(console.log)
    &lt;script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.5/rxjs.umd.min.js" integrity="sha256-85uCh8dPb35sH3WK435rtYUKALbcvEQFC65eg+raeuc=" crossorigin="anonymous"&gt;&lt;/script&gt;

    【讨论】:

      【解决方案2】:

      我认为这样可以解决问题:

      let filteredTags = allTags
        .combineLatest(tagExclusionStream.startWith(''), (tags, tagExclusions) => {
           return tags.filter((tag: any) => tagExclusions.indexOf(tag.$key) == -1)
        });
      

      或者,如果您在不同的地方使用tagExclusionStream,您可以这样做:

      let tagExclusionStream = Observable
        .fromEvent(tagInput, 'input')
        .map((e: any) => createsArrayFromInput(e.target.value))
        .startWith('');
      

      【讨论】:

      • 我喜欢这种方法 - 在流上创建一个发射,否则在用户输入之前不会发射。这行得通,所以感谢您指出这一点!我必须将 startWith 的参数更改为以 &lt;any&gt; 开头以避免 TypeScript 错误。我还将参数更改为[] vs '',因为这就是我的tagExclusionStream 发出的。
      猜你喜欢
      • 1970-01-01
      • 2019-06-13
      • 2017-06-20
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-01-13
      相关资源
      最近更新 更多