【问题标题】:Cloud Function to Trigger DataPrep Dataflow Job触发 DataPrep 数据流作业的云函数
【发布时间】:2018-10-17 15:10:09
【问题描述】:

我有一个小管道正在尝试执行:

  1. 文件放入 GCS Bucket > 2. 当文件放入 GCS Bucket 时,Cloud Function 触发 Dataflow 作业(不工作)> 3. 写入 Big Query 表(这部分工作)

我通过 Dataprep 创建了一个 Dataflow 作业,因为它有很好的 UI 可以在写入 BigQuery 表之前完成我的所有转换(写入 BigQuery 工作正常),并且当文件上传到 GCS 存储桶时会触发 Cloud 函数.但是,Cloud Function 不会触发 Dataflow 作业(我在 Dataprep 中编写)。

请看一下我的云函数下面的示例代码,如果我能得到任何关于为什么 Dataflow 作业没有触发的指示。

/**
 * Triggered from a message on a Cloud Storage bucket.
 *
 * @param {!Object} event The Cloud Functions event.
 * @param {!Function} The callback function.
 */
exports.processFile = (event, callback) => {
  console.log('Processing file: ' + event.data.name);
  callback();

  const google = require('googleapis');

 exports.CF_GCStoDataFlow_v2 = function(event, callback) {
  const file = event.data;
  if (file.resourceState === 'exists' && file.name) {
    google.auth.getApplicationDefault(function (err, authClient, projectId) {
      if (err) {
        throw err;
      }

      if (authClient.createScopedRequired && authClient.createScopedRequired()) {
        authClient = authClient.createScoped([
          'https://www.googleapis.com/auth/cloud-platform',
          'https://www.googleapis.com/auth/userinfo.email'
        ]);
      }

      const dataflow = google.dataflow({ version: 'v1b3', auth: authClient });

      dataflow.projects.templates.create({
        projectId: projectId,
        resource: {
          parameters: {
            inputFile: `gs://${file.bucket}/${file.name}`,
            outputFile: `gs://${file.bucket}/${file.name}`
          },
          jobName: 'cloud-dataprep-csvtobq-v2-281345',
          gcsPath: 'gs://mygcstest-pipeline-staging/temp/'
        }
      }, function(err, response) {
        if (err) {
          console.error("problem running dataflow template, error was: ", err);
        }
        console.log("Dataflow template response: ", response);
        callback();
      });

    });
  }
 };
};

【问题讨论】:

  • 您已附上 Dtatproc 作业提交 UI 屏幕截图。这是一个错误,还是您以某种方式在工作流程中使用了 Dataproc?
  • 这是给之前的评论者的,他建议激活 dataproc 作业,(见下文)
  • 对于这一行,console.log('Processing file: ' + event.data.name); 我收到错误“无法读取未定义的属性‘名称’”

标签: google-cloud-functions google-cloud-dataflow


【解决方案1】:

这个 sn-p 可能会有所帮助,它使用数据流 api(启动)的不同方法,它对我有用,请注意您需要指定模板的 url 并检查元数据文件(您可以在同一个文件中找到它通过dataprep接口执行时作为模板的目录)文件包含正确的参数

dataflow.projects.templates.launch({
   projectId: projectId,
   location: location,
   gcsPath: jobTemplateUrl,
   resource: {
     parameters: {
       inputLocations : `{"location1" :"gs://${file.bucket}/${file.name}"}`,
       outputLocations: `{"location1" : "gs://${destination.bucket}/${destination.name}"}"}`,
     },
      environment: {
        tempLocation: `gs://${destination.bucket}/${destination.tempFolder}`,
        zone: "us-central1-f"
     },
     jobName: 'my-job-name',

   }
 }

【讨论】:

    【解决方案2】:

    您是否提交了 Dataproc 作业?它开始运行了吗? 下面的文档可以提供一些入门的想法!

    https://cloud.google.com/dataproc/docs/concepts/jobs/life-of-a-job

    【讨论】:

    • 但是我使用 DataPrep 创建 Dataflow 作业:cloud.google.com/dataprep(不是 DataProc)
    • 如何创建 Dataflow 作业并不重要。为了触发触发,Dataproc 必须正在运行,以便它可以根据触发条件启动您的数据流作业。
    • 您好,如果我理解您正确创建数据处理作业需要触发云功能?如果是这样,我浏览了 dataproc 中的选项(提交作业),但我不清楚它是如何与云功能集成的?以及需要创建什么样的工作才能触发云功能?..(我在上面添加了截图)请告诉我我错过了什么?
    • 这里发生了两件事。 1)您正在尝试创建一个简单的数据流。 2)您正在尝试创建一个触发器,该触发器每次都会触发此作业,满足触发条件。您的屏幕截图适用于 1),对于 2),您必须创建一个触发器(作业)并运行它。我相信您错过了触发作业的运行部分。本质上是两份工作。
    • 关注这个话题。这位用户谈到了他是如何运行他的触发器作业的。 stackoverflow.com/questions/49348220/…
    【解决方案3】:

    看起来您将CF_GCStoDataFlow_v2 放入processFile,因此代码的数据流部分没有执行。

    你的函数应该是这样的:

    /**
     * Triggered from a message on a Cloud Storage bucket.
     *
     * @param {!Object} event The Cloud Functions event.
     * @param {!Function} The callback function.
     */
    exports.CF_GCStoDataFlow_v2 = (event, callback) => {
    
      const google = require('googleapis');
    
      if (file.resourceState === 'exists' && file.name) {
        google.auth.getApplicationDefault(function (err, authClient, projectId) {
          if (err) {
            throw err;
          }
    
          if (authClient.createScopedRequired && authClient.createScopedRequired()) {
            authClient = authClient.createScoped([
              'https://www.googleapis.com/auth/cloud-platform',
              'https://www.googleapis.com/auth/userinfo.email'
            ]);
          }
    
          const dataflow = google.dataflow({ version: 'v1b3', auth: authClient });
    
          dataflow.projects.templates.create({
            projectId: projectId,
            resource: {
              parameters: {
                inputFile: `gs://${file.bucket}/${file.name}`,
                outputFile: `gs://${file.bucket}/${file.name}`
              },
              jobName: '<JOB_NAME>',
              gcsPath: '<BUCKET_NAME>'
            }
          }, function(err, response) {
            if (err) {
              console.error("problem running dataflow template, error was: ", err);
            }
            console.log("Dataflow template response: ", response);
            callback();
          });
    
        });
      }
    
      callback();
    };
    

    确保将“Function to execute”下的值更改为 CF_GCStoDataFlow_v2

    【讨论】:

    • 进行了更改并将“要执行的函数”更改为 CF_GCStoDataFlow_v5,但是在日志中出现错误:“错误:在 Function.Module._resolveFilename 中找不到模块“googleapis”。我错过了什么吗?我必须对我的 packages.json 文件进行任何更改吗?日志 sn-p 在下面,谢谢
    • '错误:在 Function.Module._resolveFilename (module.js:469) at Function.Module._load (module.js:417) at Module.require (module. js:497) 在需要 (internal/module.js:20) 在 exports.CF_GCStoDataFlow_v5 (index.js:9) 在 (/var/tmp/worker/worker.js:705) 在 (/var/tmp/worker/ worker.js:670) 在 _combinedTickCallback (internal/process/next_tick.js:73) 在 process._tickDomainCallback (next_tick.js:128)'
    • 需要在package.json中添加googleapi依赖。示例:{ "name": "sample-cloud-storage", "version": "0.0.1", "dependencies":{ "googleapis": "^21.3.0" } }
    猜你喜欢
    • 2022-07-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-01-10
    相关资源
    最近更新 更多