【问题标题】:RxJS Observable fire onCompleted after a number of async actionsRxJS Observable 在许多异步操作后触发 onCompleted
【发布时间】:2017-02-15 05:06:21
【问题描述】:

我正在尝试创建一个 observable,它从多个异步操作(来自 Jenkins 服务器的 http 请求)中生成值,一旦所有操作完成,它将让订阅者知道。我觉得我一定是误解了什么,因为这没有达到我的预期。

'use strict';

let Rx = require('rx');
let _ = require('lodash');
let values = [
    {'id': 1, 'status': true},
    {'id': 2, 'status': true},
    {'id': 3, 'status': true}
];

function valuesObservable() {

    return Rx.Observable.create(function(observer) {
        _.map(values, function(value) {
            var millisecondsToWait = 1000;
            setTimeout(function() { // just using setTimeout here to construct the example
                console.log("Sending value: ", value);
                observer.onNext(value)
            }, millisecondsToWait);
        });
        console.log("valuesObservable Sending onCompleted");
        observer.onCompleted()
    });
}

let observer = Rx.Observer.create((data) => {
    console.log("Received Data: ", data);
    // do something with the info
}, (error) => {
    console.log("Error: ", error);
}, () => {
    console.log("DONE!");
    // do something else once done
});

valuesObservable().subscribe(observer);

运行这个,我得到输出:

valuesObservable Sending onCompleted
DONE!
Sending value:  { id: 1, status: true }
Sending value:  { id: 2, status: true }
Sending value:  { id: 3, status: true }

虽然我希望看到的更像是:

Sending value:  { id: 1, status: true }
Received Data:  { id: 1, status: true }
Sending value:  { id: 2, status: true }
Received Data:  { id: 2, status: true }
Sending value:  { id: 3, status: true }
Received Data:  { id: 3, status: true }
valuesObservable Sending onCompleted
DONE!

我实际上并不关心列表中项目的顺序,我只想让观察者接收它们。

我相信发生的事情是 Javascript 异步触发超时功能,并立即进入observer.onCompleted() 行。一旦订阅观察者接收到 onCompleted 事件(这是正确的词吗?),它决定它已经完成并自行处理。然后,当异步操作完成并且 observable 触发 onNext 时,观察者将不再存在以对其执行任何操作。

如果我在这点上是对的,我仍然不知道如何让它以我想要的方式运行。我是否在没有意识到的情况下偶然发现了反模式?有没有更好的方法来处理整个事情?


编辑:

自从我使用 setTimeout 构建示例后,我意识到我可以通过给 observable 一个超时时间来使用它来部分解决我的问题。

function valuesObservable() {

    return Rx.Observable.create(function(observer) {
        let observableTimeout = 10000;
        setTimeout(function() {
            console.log("valuesObservable Sending onCompleted");
            observer.onCompleted();
        }, observableTimeout);
        _.map(values, function(value) {
            let millisecondsToWait = 1000;
            setTimeout(function() {
                console.log("Sending value: ", value);
                observer.onNext(value)
            }, millisecondsToWait);
        });
    });
}

这会按照我想要的顺序(数据,然后完成)从可观察对象中获取所有信息,但是根据超时的选择,我可能会错过一些数据,或者必须等待很长时间才能完成事件.这只是我必须忍受的异步编程的固有问题吗?

【问题讨论】:

  • setTimeout 是异步的,为什么您希望您的订阅在完成之前等待它?
  • @Baumi 我想我解释说我不希望它等待,但我问这个问题是为了获得一些会导致它等待的构造的帮助。这还不清楚吗?您能否建议我如何改写这句话以更好地理解自己?
  • 对不起,伙计,我的错误 - 不知怎的,我直到最后才读到这篇文章......
  • 朋友别担心。

标签: javascript asynchronous rxjs observable


【解决方案1】:

感谢@paulpdaniels,这是完成我想要的最终代码,包括对 Jenkins 的调用:

'use strict';

let Rx = require('rx');
let jenkinsapi = require('jenkins'); // https://github.com/silas/node-jenkins/issues
let jenkinsOpts = {
    "baseUrl": "http://localhost:8080",
    "options": {"strictSSL": false},
    "job": "my-jenkins-job",
    "username": "jenkins",
    "apiToken": "f4abcdef012345678917a"
};
let jenkins = jenkinsapi(JSON.parse(JSON.stringify(jenkinsOpts)));

function jobInfoObservable(jenkins, jobName) {
    // returns an observable with a containing a single list of builds for a given job
    let selector = {tree: 'builds[number,url]'};

    return Rx.Observable.fromNodeCallback(function(callback) {
        jenkins.job.get(jobName, selector, callback);
    })();
}

function buildIDObservable(jenkins, jobName) {
    // returns an observable containing a stream of individual build IDs for a given job
    return jobInfoObservable(jenkins, jobName).flatMap(function(jobInfo) {
        return Rx.Observable.from(jobInfo.builds)
    });
}

function buildInfoObservable(jenkins, jobName) {
    // returns an observable containing a stream of http response for each build in the history for this job
    let buildIDStream = buildIDObservable(jenkins, jobName);
    let selector = {'tree': 'actions[parameters[name,value]],building,description,displayName,duration,estimatedDuration,executor,id,number,result,timestamp,url'};

    return buildIDStream.flatMap(function(buildID) {
        return Rx.Observable.fromNodeCallback(function(callback) {
            jenkins.build.get(jobName, buildID.number, selector, callback);
        })();
    });
}

let observer = Rx.Observer.create((data) => {
    console.log("Received Data: ", data);
    // do something with the info
}, (error) => {
    console.log("Error: ", error);
}, () => {
    console.log("DONE!");
    // do something else once done
});

buildInfoObservable(jenkins, jenkinsOpts.job).subscribe(observer);

通过依赖 Rx 内置运算符,我设法避免完全搞乱时序逻辑。这也比嵌套多个 Rx.Observable.create 语句要干净得多。

【讨论】:

    【解决方案2】:

    是的,有更好的方法。现在的问题是您依赖于时间延迟进行同步,而实际上您可以使用 Observable 运算符来代替。

    第一步是不要直接使用setTimeout。而是使用timer

    Rx.Observable.timer(waitTime);
    

    接下来,您可以将值数组提升到 Observable 中,这样每个值都会作为事件发出:

    Rx.Observable.from(values);
    

    最后,您将使用 flatMap 将这些值转换为 Observables 并将它们展平为最终序列。结果是一个Observable,它在每次源timers 之一发出时发出,并在所有源Observables 完成时完成。

    Rx.Observable.from(values)
      .flatMap(
        // Map the value into a stream
        value => Rx.Observable.timer(waitTime),
        // This function maps the value returned from the timer Observable
        // back into the original value you wanted to emit
        value => value
      )
    

    因此完整的valuesObservable 函数如下所示:

    function valuesObservable(values) {
      return Rx.Observable.from(values)
        .flatMap(
          value => Rx.Observable.timer(waitTime),
          value => value
        )
        .do(
          x => console.log(`Sending value: ${value}`),
          null,
          () => console.log('Sending values completed')
        );
    }
    

    请注意,如果您不使用演示流,上述方法也可以使用,即如果您有真正的 http 流,您甚至可以使用 merge(或 concat 以保持顺序)进行简化

    Rx.Observable.from(streams)
        .flatMap(stream => stream);
    
    // OR
    Rx.Observable.from(streams).merge();
    
    // Or simply
    Rx.Observable.mergeAll(streams);
    

    【讨论】:

    • 这很棒,正是我正在寻找的。顺便说一句,您的代码中有一个小错误:您的 do 块接受参数 x 但记录 value
    【解决方案3】:

    构造 observable 的最佳方法是使用现有的原语,然后使用现有运算符的组合。这避免了一些令人头疼的问题(取消订阅、错误管理等)。那么Rx.Observable.create 在没有其他东西适合您的用例时肯定很有用。我想知道generateWithAbsoluteTime 是否适合。

    无论如何,您遇到的问题是您在向观察者发送数据之前完成了观察者。所以基本上你需要想出一个更好的完成信号。也许:

    • 如果没有发出新值,则在发出最后一个值后完成 x 秒
    • 当一个值等于某个“结束”值时完成

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-07-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-08-15
      相关资源
      最近更新 更多