【问题标题】:Wait for an async operation in onNext of RxJS Observable在 RxJS Observable 的 onNext 中等待异步操作
【发布时间】:2014-02-19 09:32:59
【问题描述】:

我有一个以正常方式使用的 RxJS 序列...

但是,在可观察的“onNext”处理程序中,一些操作将同步完成,但其他操作需要异步回调,在处理输入序列中的下一项之前需要等待。

...有点困惑如何做到这一点。有任何想法吗?谢谢!

someObservable.subscribe(
    function onNext(item)
    {
        if (item == 'do-something-async-and-wait-for-completion')
        {
            setTimeout(
                function()
                {
                    console.log('okay, we can continue');
                }
                , 5000
            );
        }
        else
        {
            // do something synchronously and keep on going immediately
            console.log('ready to go!!!');
        }
    },
    function onError(error)
    {
        console.log('error');
    },
    function onComplete()
    {
        console.log('complete');
    }
);

【问题讨论】:

    标签: javascript asynchronous rxjs


    【解决方案1】:

    您要执行的每个操作都可以建模为可观察对象。甚至同步操作也可以这样建模。然后你可以使用map将你的序列转换为序列序列,然后使用concatAll将序列展平。

    someObservable
        .map(function (item) {
            if (item === "do-something-async") {
                // create an Observable that will do the async action when it is subscribed
                // return Rx.Observable.timer(5000);
    
                // or maybe an ajax call?  Use `defer` so that the call does not
                // start until concatAll() actually subscribes.
                return Rx.Observable.defer(function () { return Rx.Observable.ajaxAsObservable(...); });
            }
            else {
                // do something synchronous but model it as an async operation (using Observable.return)
                // Use defer so that the sync operation is not carried out until
                // concatAll() reaches this item.
                return Rx.Observable.defer(function () {
                    return Rx.Observable.return(someSyncAction(item));
                });
            }
        })
        .concatAll() // consume each inner observable in sequence
        .subscribe(function (result) {
        }, function (error) {
            console.log("error", error);
        }, function () {
            console.log("complete");
        });
    

    要回复您的某些 cmets...在某些时候您需要对函数流施加一些期望。在大多数语言中,当处理可能是异步的函数时,函数签名是异步的,函数的实际异步与同步性质被隐藏为函数的实现细节。无论您使用的是 javaScript promises、Rx observables、c# Tasks、c++ Futures 等,都是如此。函数最终返回一个 promise/observable/task/future/etc,如果函数实际上是同步的,那么它返回的对象是刚刚完成。

    话虽如此,既然这是 JavaScript,你可以作弊:

    var makeObservable = function (func) {
        return Rx.Observable.defer(function () {
            // execute the function and then examine the returned value.
            // if the returned value is *not* an Rx.Observable, then
            // wrap it using Observable.return
            var result = func();
            return result instanceof Rx.Observable ? result: Rx.Observable.return(result);
        });
    }
    
    someObservable
        .map(makeObservable)
        .concatAll()
        .subscribe(function (result) {
        }, function (error) {
            console.log("error", error);
        }, function () {
            console.log("complete");
        });
    

    【讨论】:

    • 看起来不错,但不幸的是,我们不知道哪些项目将同步运行,哪些项目将异步运行。
    • 直观地说,(在非 rx 世界中),解决方案是暂停处理输入序列,直到事件发生......这让我想到使用你的解决方案加上某种混合一个可观察的 fromEvent 来“勾选”输入队列?
    • 我真的很想在 onNext 中执行该操作,以某种方式告诉 Rx 继续...这样我们的开发人员可以调用一个函数说...'可以继续现在'。如果有人添加异步操作,我担心以后的维护,深入执行曾经是同步操作的操作,如何让他们以最少的麻烦来完成它...进入队列的项目,是要调用的函数()..我不知道哪些是同步与异步。他们确实将上下文参数作为参数......所以他们能够将自己标记为完成。
    • 我想我必须将每个函数包装在一个返回适当可观察对象的代理中......似乎有点矫枉过正,但你是对的,这是可组合模式:/
    • 如果我理解正确的话,我添加了一种处理您的用例的方法。
    【解决方案2】:

    首先,将您的异步操作移出subscribe,它不适合异步操作。

    您可以使用mergeMap(别名flatMap)或concatMap。 (我提到了他们两个,但concatMap 实际上是mergeMapconcurrent 参数设置为 1。)设置不同的并发参数很有用,因为有时您希望限制并发查询的数量,但是仍然运行几个并发。

    source.concatMap(item => {
      if (item == 'do-something-async-and-wait-for-completion') {
        return Rx.Observable.timer(5000)
          .mapTo(item)
          .do(e => console.log('okay, we can continue'));
        } else {
          // do something synchronously and keep on going immediately
          return Rx.Observable.of(item)
            .do(e => console.log('ready to go!!!'));
        }
    }).subscribe();
    

    我还将展示如何对通话进行速率限制。 忠告: 仅在您实际需要时限制速率,例如在调用每秒或每分钟只允许一定数量请求的外部 API 时。否则最好只限制并发操作的数量,让系统以最大速度移动。

    我们从下面的sn-p开始:

    const concurrent;
    const delay;
    source.mergeMap(item =>
      selector(item, delay)
    , concurrent)
    

    接下来,我们需要为concurrentdelay 选择值并实现selectorconcurrentdelay 密切相关。例如,如果我们想每秒运行 10 个项目,我们可以使用 concurrent = 10delay = 1000(毫秒),也可以使用 concurrent = 5delay = 500concurrent = 4delay = 400。每秒的项目数将始终为concurrent / (delay / 1000)

    现在让我们实现selector。我们有几个选择。我们可以为selector 设置一个最小的执行时间,我们可以给它添加一个恒定的延迟,我们可以在结果可用时立即发出结果,我们只能在最小延迟过去后发出结果等等。它是甚至可以使用 timeout 运算符添加超时。方便。

    设置最短时间,尽早发送结果:

    function selector(item, delay) {
       return Rx.Observable.of(item)
         .delay(1000) // replace this with your actual call.
         .merge(Rx.Observable.timer(delay).ignoreElements())
    }
    

    设置最短时间,延迟发送结果:

    function selector(item, delay) {
       return Rx.Observable.of(item)
         .delay(1000) // replace this with your actual call.
         .zip(Rx.Observable.timer(delay), (item, _))
    }
    

    添加时间,提前发送结果:

    function selector(item, delay) {
       return Rx.Observable.of(item)
         .delay(1000) // replace this with your actual call.
         .concat(Rx.Observable.timer(delay).ignoreElements())
    }
    

    添加时间,延迟发送结果:

    function selector(item, delay) {
       return Rx.Observable.of(item)
         .delay(1000) // replace this with your actual call.
         .delay(delay)
    }
    

    【讨论】:

      【解决方案3】:

      另一个执行手动异步操作的简单示例。

      请注意,这不是一个好的反应式做法!如果只想等待 1000ms,请使用 Rx.Observable.timer 或延迟运算符。

      someObservable.flatMap(response => {
        return Rx.Observable.create(observer => {
          setTimeout(() => {
            observer.next('the returned value')
            observer.complete()
          }, 1000)
        })
      }).subscribe()
      

      现在,将 setTimeout 替换为您的异步函数,例如 Image.onload 或 fileReader.onload ...

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2019-12-03
        • 2018-06-03
        • 1970-01-01
        • 2017-02-15
        • 1970-01-01
        • 2021-12-01
        • 1970-01-01
        相关资源
        最近更新 更多