【问题标题】:Observables: Chaining promises & firing events on each promiseObservables:在每个 Promise 上链接 Promise 和触发事件
【发布时间】:2018-04-11 15:16:16
【问题描述】:

我刚刚阅读了这个 SO 问题 RxJS Promise Composition (passing data) 并有一个问题。

设置同前述问题:我有3个promise。

 const p1 = () => Promise.resolve(1);
 const p2 = x => { const val = x + 1; return Promise.resolve(val); };
 const p3 = x => {
      const isEven = x => x % 2 === 0;
      return Promise.resolve(isEven(x));
 };

根据答案,我将它们链接起来,如下所示:

 var chainedPromises$ = 
     Rx.Observable.just()
             .flatMap(p1)
             .flatMap(p2)
             .flatMap(p3);

但是,chainedPromises$ 仅在所有 Promise 链的末尾触发一次,解析值为 true。即:

chainedPromises$.subscribe(function(x){console.log(x)}); // Logs `true`

如何更改 chainedPromises$ 使其在每个承诺结束时触发一次?即:

chainedPromises$.subscribe(function(x){console.log(x)});
// Logs `1`
// Logs `2`
// Logs `true`

【问题讨论】:

  • Observable 和 Promises 不是一回事,您在这里尝试做的事情没有意义。如果您要让 Oserable 返回多个结果,那么您可以使用 next 将它们关闭

标签: javascript promise rxjs observable


【解决方案1】:

看起来您需要发出生成的值在链的下一步中使用它。我不知道这是否有好处,但我通过明确区分来让它发挥作用。

Observable
  .fromPromise(p1())
  .flatMap(val => Observable.merge(
    Observable.fromPromise(p2(val))
      .flatMap(val => Observable.merge(
        Observable.fromPromise(p3(val)),
        Observable.of(val)
      )),
    Observable.of(val)
  ))
  .subscribe(console.log);

您可以看到it 工作。

【讨论】:

    【解决方案2】:

    不要使用mergeMap 将一个值“映射”到另一个值,而使用concat 仅在前一个 Observable 完成时订阅 Observable。这样,您将获得与一次处理一个 Observable 相同的效果,但每次发射都会传播:

    Observable.concat(p1, p2, p3)
      .subscribe(console.log);
    

    编辑:

    Observable.of(null)
      .expand((value, index) => {
        switch (index) {
          case 0:
            return p1();
          case 1:
            return p2(value);
          case 2:
            return p3(value);
          default:
            return Observable.empty();
        }
      })
      .filter((value, index) => index > 0) // Ignore the `null` value that was necessary to start the chain
      .subscribe(console.log);
    

    这打印:

    1
    2
    true
    

    查看现场演示(打开控制台):https://stackblitz.com/edit/rxjs5-fucqhq?file=index.ts

    【讨论】:

    • 我猜你的意思是Rx.Observable.concat(p1(),p2(),p3())
    • 当链接承诺时,前一个承诺的值被发送到下一个承诺。 Concat 没有做到这一点。
    • @amaurymartiny 这正是你想要的,对吧?
    • @martin 您的示例与 concat 日志 1Nanfalse。我想要一个记录 12true 的 Observable。
    • @amaurymartiny 我听说过 Observable 的一个主要规则,不要尝试将它们用于所有事情。 async / await 更适合您的要求。
    【解决方案3】:

    对我来说,你在这里浪费了 observable 的全部意义。

    如果您有一组 Promise 想要订阅者作为流接收,您只需将其传递给 next,那么您现在也可以订阅一个流。

    下面是一个例子。 ->

    const p1 = () => Promise.resolve(1);
    const p2 = x => { const val = x + 1; return  Promise.resolve(val); };
    const p3 = x => {
      const isEven = x => x % 2 === 0;
      return Promise.resolve(isEven(x));
    };
    
    const chainedProm = (observer) => {
      let params;
      return async (p) => 
        observer.next(params = await p(params));
    };
    
    var observable = Rx.Observable.create(async function (observer) {
      const doProm = chainedProm(observer);
      await doProm(p1);
      await doProm(p2);
      await doProm(p3);
      observer.complete();
    });
    
    
    observable.subscribe(function(x){console.log(x)});
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.9/Rx.min.js"></script>

    【讨论】:

    • 有没有更多的声明方式来做到这一点?
    • 我现在还更新了一个名为 chainedProm 的实用程序函数,如果您期望一个 Promise 链,您可以将其用于任何 observable。
    猜你喜欢
    • 2015-07-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-04-27
    • 2016-09-10
    • 2015-01-19
    • 2017-08-22
    • 1970-01-01
    相关资源
    最近更新 更多