【问题标题】:rxJs callback with observable being asynchronous可观察到的 rxJs 回调是异步的
【发布时间】:2017-01-24 14:01:33
【问题描述】:

我的 rxjs 流程中有一个地图操作

streaming.map((data) => {
  //example async call
  methodCall.then((response) => {
    return data.test
  })
})
.filter((value) => ...);

问题是在 data.test 自然返回之前调用过滤器。所以我尝试将其转换为 switchMap 并返回一个 observable

streaming.switchMap((data) => {
  return Observable.create((observer) => {
    //example async call
    methodCall.then((response) => {
      observer.next(data.test);
      observer.complete();
    });
  });
})
.filter((value) => ......);

我假设在这种情况下,因为我们要返回一个带有明确计时的可观察对象,所以过滤器将仅在调用observer.complete 之后调用,但过滤器被过早调用且值变量未定义。

我通常如何使用 rxJs 完成此操作

【问题讨论】:

    标签: javascript asynchronous rxjs reactive-programming


    【解决方案1】:

    我认为您缺少退货声明或其他内容。这就是为什么它会给你意想不到的结果。 switchMap() 操作符订阅从其回调返回的 Observable 并重新发送其所有项目,直到返回另一个 Observable。它不会等待完成通知。

    这应该模拟您的示例:

    function methodCall() {
      return new Promise(resolve => setTimeout(() => {
        resolve(123);
      }, 1000));
    }
    
    Observable.of(42)
      .do(value => console.log('start: ' + value))
      .switchMap((data) => {
        return Observable.create((observer) => {
          //example async call
          methodCall().then((response) => {
            observer.next(response);
            observer.complete();
          });
        });
      })
      .filter(value => true)
      .subscribe(value => console.log('next: ' + value));
    

    观看现场演示:https://jsbin.com/focili/2/edit?js,console

    注意123 在 Promise 解决时发出。

    顺便说一句,也许你根本不需要使用switchMap()。 RxJS 5 以相同的方式对待 Observables、Promises、数组、类数组对象等。这意味着您可以将 Observable 替换为上述任何内容。例如,您可以只使用concatMap(),结果将是相同的:

    function methodCall() {
      return new Promise(resolve => setTimeout(() => {
        resolve(123);
      }, 1000));
    }
    
    Observable.of(42)
      .do(value => console.log('start: ' + value))
      .concatMap(value => methodCall())
      .filter(value => true)
      .subscribe(value => console.log('next: ' + value));
    

    观看现场演示:https://jsbin.com/wipofiv/2/edit?js,console

    请注意,concatMap() 收到了 Promise,但它仍然按预期工作,您甚至不需要将其转换为 Observable。

    【讨论】:

      【解决方案2】:

      通常您可以使用fromPromisePromise 转换为Observable

      streaming.switchMap((data) => Observable.fromPromise(methodCall))
               .filter((value) => ...);
      

      这一切都很糟糕,你只是喜欢 RxJS 编程。

      可能在filter 之前,您需要应用一些.map 来返回data.test 而不是data

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2020-07-27
        • 2018-04-18
        • 2019-11-05
        • 1970-01-01
        • 2020-06-11
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多