【问题标题】:RxJS merge when previous stream receives first value当前一个流接收到第一个值时,RxJS 合并
【发布时间】:2020-12-19 19:04:59
【问题描述】:

简要说明我想要什么:

我有一组可观察对象,我希望所有这些对象的结果都来自同一个流。最初我希望激活第一个 observable,一旦 observable 接收到它的第一个值,我希望激活下一个 observable。

详细解释我想要什么:

我正在寻找的解决方案介于 RxJS 创建运算符“merge”和“concat”之间。

我有一个可观察的数组。随着时间的推移,每个可观测数据都会有几个排放。我想将这些 observables 依次排列,这样一开始只有第一个 observables 被激活(这类似于“concat”的工作方式)

然后,一旦第一个 observable 收到它的第一个值,我希望第二个 observable 被激活。一旦第二个 observable 收到它的第一个值,我希望第三个 observable 被激活,依此类推。 (这与“concat”不同。“concat”等待前一个可观察对象完成,但在我的用例中,我想等待前一个可观察对象收到它的第一个值)

只有一个结果流会从所有激活的 observables 中发出值(这类似于“合并”的工作原理)

我不认为有一个特定的 RxJS 运算符可以解决这个问题,但我希望可以通过混合多个运算符找到解决方案。

【问题讨论】:

    标签: javascript rxjs


    【解决方案1】:

    一旦第一个 observable 收到它的第一个值

    Observables 不接收值,订阅者接收;这种区别在这里很重要,因为我们订阅每个 observable 的时间点很重要。例如,如果我们不考虑订阅者,那么

    merge(
      a$,
      a$.pipe(first(), switchMapTo(b$)),
      b$.pipe(first(), switchMapTo(c$)),
    )
    

    将是一个解决方案——但我在这里的假设是这不是你的意思。不过,它确实让我们更接近解决方案。我们需要的是这样的:

    merge(
      a$,
      a$.pipe(first(), switchMapTo(b$)),
      a$.pipe(first(), switchMapTo(b$.pipe(first(), switchMapTo(c$)))),
      // …
    )
    

    当然,我们需要为此找到一个通用的解决方案。一种天真的方法就是“递归”:

    const activatedMerge = (sources) => {
      const activatedSources = sources.reduce((result, source, idx) => {
        result.push(idx === 0 ? source : result[idx-1].pipe(first(), switchMapTo(source)));
        return result;
      }, []);
    
      return merge(...activatedSources);
    };
    
    activatedMerge([a$, b$, c$]).subscribe(console.log);
    

    可能有比这更好的解决方案,不幸的是我没时间了。无论如何,希望这会有所帮助。

    【讨论】:

    • 关于“一旦第一个 observable 收到它的第一个值”,你是对的,我描述得很糟糕。非常感谢您的回答,我会仔细研究一下,看看是否可以让它适用于我的用例。
    【解决方案2】:

    级联合并

    这应该按照您的描述进行,它只是在每次从前一个流中看到第一个值时递归地合并一个新流。

    function cascadeMerge<T>(...observables: Observable<T>[]): Observable<T>{
      if(observables.length < 1) return EMPTY;
      return observables[0].pipe(
        mergeMap((v,i) => {
          if(i === 0 && observables.length > 1){
            return concat(
              of(v),
              cascadeMerge(...observables.slice(1))
            )
          }
          return of(v);
        })
      );
    }
    

    这样做的好处是:

    merge(
      a$,
      a$.pipe(first(), switchMapTo(b$))
    )
    

    是你只subscribe 每个流一次。您不必担心热与冷 observables 或是否在多播。

    cascadeMerge(a$, b$, c$).subscribe(console.log);
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-01-25
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-11-14
      相关资源
      最近更新 更多