【发布时间】:2018-10-17 15:10:09
【问题描述】:
我有一个小管道正在尝试执行:
- 文件放入 GCS Bucket > 2. 当文件放入 GCS Bucket 时,Cloud Function 触发 Dataflow 作业(不工作)> 3. 写入 Big Query 表(这部分工作)
我通过 Dataprep 创建了一个 Dataflow 作业,因为它有很好的 UI 可以在写入 BigQuery 表之前完成我的所有转换(写入 BigQuery 工作正常),并且当文件上传到 GCS 存储桶时会触发 Cloud 函数.但是,Cloud Function 不会触发 Dataflow 作业(我在 Dataprep 中编写)。
请看一下我的云函数下面的示例代码,如果我能得到任何关于为什么 Dataflow 作业没有触发的指示。
/**
* Triggered from a message on a Cloud Storage bucket.
*
* @param {!Object} event The Cloud Functions event.
* @param {!Function} The callback function.
*/
exports.processFile = (event, callback) => {
console.log('Processing file: ' + event.data.name);
callback();
const google = require('googleapis');
exports.CF_GCStoDataFlow_v2 = function(event, callback) {
const file = event.data;
if (file.resourceState === 'exists' && file.name) {
google.auth.getApplicationDefault(function (err, authClient, projectId) {
if (err) {
throw err;
}
if (authClient.createScopedRequired && authClient.createScopedRequired()) {
authClient = authClient.createScoped([
'https://www.googleapis.com/auth/cloud-platform',
'https://www.googleapis.com/auth/userinfo.email'
]);
}
const dataflow = google.dataflow({ version: 'v1b3', auth: authClient });
dataflow.projects.templates.create({
projectId: projectId,
resource: {
parameters: {
inputFile: `gs://${file.bucket}/${file.name}`,
outputFile: `gs://${file.bucket}/${file.name}`
},
jobName: 'cloud-dataprep-csvtobq-v2-281345',
gcsPath: 'gs://mygcstest-pipeline-staging/temp/'
}
}, function(err, response) {
if (err) {
console.error("problem running dataflow template, error was: ", err);
}
console.log("Dataflow template response: ", response);
callback();
});
});
}
};
};
【问题讨论】:
-
您已附上 Dtatproc 作业提交 UI 屏幕截图。这是一个错误,还是您以某种方式在工作流程中使用了 Dataproc?
-
这是给之前的评论者的,他建议激活 dataproc 作业,(见下文)
-
对于这一行,
console.log('Processing file: ' + event.data.name);我收到错误“无法读取未定义的属性‘名称’”
标签: google-cloud-functions google-cloud-dataflow