【问题标题】:how to make synchronous http calls using promises in node.js如何使用 node.js 中的 Promise 进行同步 http 调用
【发布时间】:2016-07-30 20:09:50
【问题描述】:

我想遍历一组学生并为每个学生进行 http 调用并解析响应并插入到 mongodb 中,所以我想为每个学生一个接一个地执行此操作,直到插入所有数据然后继续下一个,这样对 CPU 和 RAM 内存会更好...

到目前为止,我正在这样做,但由于某种原因,这不是我想要的......

var startDate = new Date("February 20, 2016 00:00:00");  //Start from February
var from = new Date(startDate).getTime() / 1000;
startDate.setDate(startDate.getDate() + 30);
var to = new Date(startDate).getTime() / 1000;

iterateThruAllStudents(from, to);

function iterateThruAllStudents(from, to) {
    Student.find({status: 'student'})
        .populate('user')
        .exec(function (err, students) {
            if (err) {
                throw err;
            }

            async.eachSeries(students, function iteratee(student, callback) {
                if (student.worksnap.user != null) {
                    var worksnapOptions = {
                        hostname: 'worksnaps.com',
                        path: '/api/projects/' + project_id + '/time_entries.xml?user_ids=' + student.worksnap.user.user_id + '&from_timestamp=' + from + '&to_timestamp=' + to,
                        headers: {
                            'Authorization': 'Basic xxx='
                        },
                        method: 'GET'
                    };

                    promisedRequest(worksnapOptions)
                        .then(function (response) { //callback invoked on deferred.resolve
                            parser.parseString(response, function (err, results) {
                                var json_string = JSON.stringify(results.time_entries);
                                var timeEntries = JSON.parse(json_string);
                                _.forEach(timeEntries, function (timeEntry) {
                                    _.forEach(timeEntry, function (item) {
                                        saveTimeEntry(item);
                                    });
                                });
                                callback(null);
                            });
                        }, function (newsError) { //callback invoked on deferred.reject
                            console.log(newsError);
                        });
                }
            });
        });
}

function saveTimeEntry(item) {
    Student.findOne({
            'worksnap.user.user_id': item.user_id[0]
        })
        .populate('user')
        .exec(function (err, student) {
            if (err) {
                throw err;
            }
            student.timeEntries.push(item);
            student.save(function (err) {
                if (err) {
                    console.log(err);
                } else {
                    console.log(Math.random());
                }
            });

        });
}

function promisedRequest(requestOptions) {
    //create a deferred object from Q
    process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
    var deferred = Q.defer();
    var req = http.request(requestOptions, function (response) {
        //set the response encoding to parse json string
        response.setEncoding('utf8');
        var responseData = '';
        //append data to responseData variable on the 'data' event emission
        response.on('data', function (data) {
            responseData += data;
        });
        //listen to the 'end' event
        response.on('end', function () {
            //resolve the deferred object with the response
            console.log('http call finished');
            deferred.resolve(responseData);
        });
    });

    //listen to the 'error' event
    req.on('error', function (err) {
        //if an error occurs reject the deferred
        deferred.reject(err);
    });
    req.end();
    //we are returning a promise object
    //if we returned the deferred object
    //deferred object reject and resolve could potentially be modified
    //violating the expected behavior of this function
    return deferred.promise;
}

.then 中的 saveEntry() 似乎同时被所有学生调用,这似乎有问题。

我是 Javascript 新手,尤其是在涉及承诺、回调时... 任何人都知道实现这样的事情......

【问题讨论】:

  • 什么是parser.parseString()?这是同步操作还是异步操作?
  • 另外,你应该知道在异步回调中使用throw 绝对没有好处,除非它在promise .then() 处理程序中。它只是回到异步操作的内部,你永远无法在任何地方捕捉到它。在这种情况下不要使用 throw。
  • 另外,为什么_.forEach(timeEntry)?那时timeEntry 是什么?
  • @jfriend00 parser.parseString() 我不知道它是异步的还是同步的,我不得不用它来解析 xml...似乎异步...你知道我是否可以添加然后() 到那里,所以我可以在那里添加 saveTimeEntry() ?

标签: javascript node.js promise q


【解决方案1】:

首先,如果您对所有异步操作使用 Promise,多个嵌套操作将更容易编码和可靠地处理错误。这意味着学习如何使用内置在数据库中的 Promise(我假设您使用的是 mongoose),然后包装任何其他异步操作以使用 Promise。以下是一些关于在 mongoose 中使用 Promise 的链接:

Switching to use promises in Mongoose

Plugging in your own promise library into Mongoose

因此,查看 Mongoose 文档,您可以看到 .exec().save() 已经返回承诺,因此我们可以直接使用它们。

添加这行代码将告诉 Mongoose 使用 Q 承诺(因为这是您展示的承诺库):

// tell mongoose to use Q promises
mongoose.Promise = require('q').Promise;

然后,你需要promisify一些不使用promise的操作,比如解析步骤:

function parse(r) {
    var deferred = Q.defer();
    parser.parseString(r, function(err, results) {
        if (err) {
            deferred.reject(err);
        } else {
            deferred.resolve(results);
        }
    });
    return deferred.promise;
}

saveTimeEntry() 可以很容易地写成返回一个承诺,只需使用数据库中已有的承诺支持:

function saveTimeEntry(item) {
    return Student.findOne({'worksnap.user.user_id': item.user_id[0]}).populate('user').exec().then(function(student) {
        student.timeEntries.push(item);
        return student.save();
    });
}

所以,现在您已经拥有了使用 Promise 重写主要逻辑的所有正确部分。这里要记住的关键是,如果您从 .then() 处理程序返回一个 Promise,它将将该 Promise 链接到父 Promise 上。我们将在您的处理中大量使用它。此外,对遍历数组的 Promise 进行排序的常见设计模式是使用 array.reduce(),如下所示:

return array.reduce(function(p, item) {
    return p.then(function() {
         return someAsyncPromiseOperation(item);
    });
}, Q());

我们将在几个地方使用该结构来使用 Promise 重写核心逻辑:

// tell mongoose to use Q promises
mongoose.Promise = require('q').Promise;

iterateThruAllStudents(from, to).then(function() {
    // done successfully here
}, function(err) {
    // error occurred here
});

function iterateThruAllStudents(from, to, callback) {
    return Student.find({status: 'student'}).populate('user').exec().then(function (students) {
        // iterate through the students array sequentially
        students.reduce(function(p, student) {
            return p.then(function() {
                if (student.worksnap.user != null) {
                    var worksnapOptions = {
                        hostname: 'worksnaps.com',
                        path: '/api/projects/' + project_id + '/time_entries.xml?user_ids=' + student.worksnap.user.user_id + 
                              '&from_timestamp=' + from + '&to_timestamp=' + to,
                        headers: {'Authorization': 'Basic xxx='},
                        method: 'GET'
                    };

                    return promisedRequest(worksnapOptions).then(function(response) {
                        return parse(response).then(function(results) {
                            // assuming results.time_entries is an array
                            return results.time_entries.reduce(function(p, item) {
                                return p.then(function() {
                                    return saveTimeEntry(item);
                                });
                            }, Q());
                        });
                    });
                }
            });
        }, Q())
    });
}

function parse(r) {
    var deferred = Q.defer();
    parser.parseString(r, function(err, results) {
        if (err) {
            deferred.reject(err);
        } else {
            deferred.resolve(results);
        }
    });
    return deferred.promise;
}

function saveTimeEntry(item) {
    return Student.findOne({'worksnap.user.user_id': item.user_id[0]}).populate('user').exec().then(function(student) {
        student.timeEntries.push(item);
        return student.save();
    });
}

function promisedRequest(requestOptions) {
    //create a deferred object from Q
    process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
    var deferred = Q.defer();
    var req = http.request(requestOptions, function (response) {
        //set the response encoding to parse json string
        response.setEncoding('utf8');
        var responseData = '';
        //append data to responseData variable on the 'data' event emission
        response.on('data', function (data) {
            responseData += data;
        });
        //listen to the 'end' event
        response.on('end', function () {
            //resolve the deferred object with the response
            console.log('http call finished');
            deferred.resolve(responseData);
        });
    });

    //listen to the 'error' event
    req.on('error', function (err) {
        //if an error occurs reject the deferred
        deferred.reject(err);
    });
    req.end();
    return deferred.promise;
}

您的代码没有提供的一件事是,发生的任何错误都会一直渗透到从iterateThruAllStudents() 返回的承诺,因此不会隐藏任何错误。


然后,清理它一些以减少嵌套缩进并添加一个有用的实用函数,它看起来像这样:

// tell mongoose to use Q promises
mongoose.Promise = require('q').Promise;

// create utility function for promise sequencing through an array
function sequence(array, iterator) {
    return array.reduce(function(p, item) {
        return p.then(function() {
            return iterator(item);
        });
    }, Q());
}

iterateThruAllStudents(from, to).then(function() {
    // done successfully here
}, function(err) {
    // error occurred here
});

function iterateThruAllStudents(from, to, callback) {
    return Student.find({status: 'student'}).populate('user').exec().then(function (students) {
        // iterate through the students array sequentially
        return sequence(students, function(item) {
            if (student.worksnap.user != null) {
                var worksnapOptions = {
                    hostname: 'worksnaps.com',
                    path: '/api/projects/' + project_id + '/time_entries.xml?user_ids=' + student.worksnap.user.user_id + 
                          '&from_timestamp=' + from + '&to_timestamp=' + to,
                    headers: {'Authorization': 'Basic xxx='},
                    method: 'GET'
                };
                return promisedRequest(worksnapOptions).then(function(response) {
                    return parse(response);
                }).then(function(results) {
                    // assuming results.time_entries is an array
                    return sequence(results.time_entries, saveTimeEntry);
                });
            }
        });
    });
}

function parse(r) {
    var deferred = Q.defer();
    parser.parseString(r, function(err, results) {
        if (err) {
            deferred.reject(err);
        } else {
            deferred.resolve(results);
        }
    });
    return deferred.promise;
}

function saveTimeEntry(item) {
    return Student.findOne({'worksnap.user.user_id': item.user_id[0]}).populate('user').exec().then(function(student) {
        student.timeEntries.push(item);
        return student.save();
    });
}

function promisedRequest(requestOptions) {
    //create a deferred object from Q
    process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
    var deferred = Q.defer();
    var req = http.request(requestOptions, function (response) {
        //set the response encoding to parse json string
        response.setEncoding('utf8');
        var responseData = '';
        //append data to responseData variable on the 'data' event emission
        response.on('data', function (data) {
            responseData += data;
        });
        //listen to the 'end' event
        response.on('end', function () {
            //resolve the deferred object with the response
            console.log('http call finished');
            deferred.resolve(responseData);
        });
    });

    //listen to the 'error' event
    req.on('error', function (err) {
        //if an error occurs reject the deferred
        deferred.reject(err);
    });
    req.end();
    //we are returning a promise object
    //if we returned the deferred object
    //deferred object reject and resolve could potentially be modified
    //violating the expected behavior of this function
    return deferred.promise;
}

当然,这是很多代码,我无法测试它,而且我以前从未编写过 Mongoose 代码,所以这里很可能有一些错误,但希望你能看到大致的想法和解决我可能犯的任何错误。

【讨论】:

  • 嗯,这确实比我做的好太多了......我非常感谢您为研究这个问题付出的努力和时间。我希望它也能帮助除我之外的其他人......我确实像 urs 一样更改了我的代码,它似乎工作得很好:)
【解决方案2】:

好的,我确实解决了这个问题,如果将来有人遇到这个问题,这是我的解决方案。

    var startDate = new Date("February 20, 2016 00:00:00");  //Start from February
var from = new Date(startDate).getTime() / 1000;
startDate.setDate(startDate.getDate() + 30);
var to = new Date(startDate).getTime() / 1000;

iterateThruAllStudents(from, to);

function iterateThruAllStudents(from, to) {
    Student.find({status: 'student'})
        .populate('user')
        .exec(function (err, students) {
            if (err) {
                throw err;
            }

            var counter = 1;
            async.eachSeries(students, function iteratee(student, callback) {
                counter++;
                if (student.worksnap.user != null) {
                    console.log('');
                    console.log('--------------');
                    console.log(student.worksnap.user.user_id);
                    var worksnapOptions = {
                        hostname: 'worksnaps.com',
                        path: '/api/projects/' + project_id + '/time_entries.xml?user_ids=' + student.worksnap.user.user_id + '&from_timestamp=' + from + '&to_timestamp=' + to,
                        headers: {
                            'Authorization': 'Basic xxxxx'
                        },
                        method: 'GET'
                    };

                    promisedRequest(worksnapOptions)
                        .then(function (response) { //callback invoked on deferred.resolve
                            parser.parseString(response, function (err, results) {
                                var json_string = JSON.stringify(results.time_entries);
                                var timeEntries = JSON.parse(json_string);
                                var isEmpty = _.isEmpty(timeEntries); // true
                                if (isEmpty) {
                                    callback(null);
                                }
                                saveTimeEntry(timeEntries).then(function (response) {
                                    console.log('all timeEntries for one student finished....Student: ' + student.worksnap.user.user_id + ' Student Counter: ' + counter);
                                    callback(null);
                                });
                            });
                        }, function (newsError) { //callback invoked on deferred.reject
                            console.log(newsError);
                        });
                } else {
                    callback(null);
                }
            });
        });
}

function saveTimeEntry(timeEntries) {
    var deferred = Q.defer();
    _.forEach(timeEntries, function (timeEntry) {
        _.forEach(timeEntry, function (item) {
            Student.findOne({
                    'worksnap.user.user_id': item.user_id[0]
                })
                .populate('user')
                .exec(function (err, student) {
                    if (err) {
                        //throw err;
                        console.log(err);
                    }
                    student.timeEntries.push(item);
                    student.save(function (err) {
                        if (err) {
                            console.log(err);
                            deferred.reject(err);
                        } else {
                            //console.log(Math.random());
                        }
                    });

                });
        });
        deferred.resolve('finished saving timeEntries for one student...');
    });

    return deferred.promise;
}

function promisedRequest(requestOptions) {
    //create a deferred object from Q
    process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
    var deferred = Q.defer();
    var req = http.request(requestOptions, function (response) {
        //set the response encoding to parse json string
        response.setEncoding('utf8');
        var responseData = '';
        //append data to responseData variable on the 'data' event emission
        response.on('data', function (data) {
            responseData += data;
        });
        //listen to the 'end' event
        response.on('end', function () {
            //resolve the deferred object with the response
            console.log('http call finished');
            deferred.resolve(responseData);
        });
    });

    //listen to the 'error' event
    req.on('error', function (err) {
        //if an error occurs reject the deferred
        console.log('inside On error.');
        console.log(err);
        deferred.reject(err);
    });
    req.end();
    //we are returning a promise object
    //if we returned the deferred object
    //deferred object reject and resolve could potentially be modified
    //violating the expected behavior of this function
    return deferred.promise;
}

【讨论】:

  • 您缺少大量错误处理来正确检测和处理错误。混合常规回调和承诺几乎不是一个好主意。一个好的设计是将所有异步操作切换到 Promise,然后使用 Promise 编写所有异步逻辑。我本来打算为你做的,但你没有回答我在 cmets 中提出的问题,所以我不能。
  • @jfriend00,如果您有更好的解决方案,请发帖,以便我接受它作为最佳答案...这只是一个对我有用的解决方案,但不如应该做的那么干净....
  • 我留下了多个 cmet 向您询问我需要知道答案的问题,以便为您提供更好的答案。我尝试过,但为了创建答案而遇到了丢失的信息。我不明白你为什么不回答我的问题。
  • 我不知道该怎么回答这个为什么 _.forEach(timeEntry),你这是什么意思?我用它来遍历时间条目...事实上我可能添加了一个额外的 foreach,你是对的,我只需要添加它,它仅适用于第一个 foreach 和 Idk 为什么..
  • 在不知道timeEntry 是什么的情况下在黑暗中编程只会给您带来麻烦。如果你不知道它是什么,那么要么做一个console.log(timeEntry) 来看看它到底是什么,或者在调试器中打破它并检查它,这样你就可以为你拥有的数据编写适当的代码。猜测何时编程是一个可怕的想法。
猜你喜欢
  • 2016-07-30
  • 1970-01-01
  • 2016-02-09
  • 1970-01-01
  • 2020-02-08
  • 2016-12-06
  • 2015-12-31
  • 1970-01-01
  • 2017-04-10
相关资源
最近更新 更多