【问题标题】:Execute dataflow job from App Engine从 App Engine 执行数据流作业
【发布时间】:2017-09-14 16:33:54
【问题描述】:

我在 GCP 技术方面相对较新。目前,我正在做 POC 来创建一个计划的数据流作业,该作业将数据从谷歌云存储摄取(插入)到 BigQuery。在阅读了一些教程和文档后,我想出了以下内容:

  1. 我首先创建了一个读取 avro 文件并将其加载到 BigQuery 的数据流作业。此数据流已经过测试并且运行良好。

    (self.pipeline
         | output_table + ': read table ' >> ReadFromAvro(storage_input_path)
         | output_table + ': filter columns' >> beam.Map(self.__filter_columns, columns=columns)
         | output_table + ': write to BigQuery' >> beam.Write(
            beam.io.BigQuerySink(output_table,               
       create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,                               
       write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
    
  2. 为了创建计划作业,我随后创建了一个简单的 Web 服务,如下所示:

    import logging
    from flask import Flask
    from common.tableLoader import TableLoader
    from ingestion import IngestionToBigQuery
    from common.configReader import ConfigReader
    app = Flask(__name__)
    @app.route('/')
    def hello():
         """Return a friendly HTTP greeting."""
        logging.getLogger().setLevel(logging.INFO)
        config = ConfigReader('columbus-config')  # TODO read from args
        tables = TableLoader('experience')
        ingestor = IngestionToBigQuery(config.configuration, tables.list_of_tables)
        ingestor.ingest_table()
        return 'Hello World!'```
    
  3. 我还创建了 app.yaml:

     runtime: python
     env: flex
     entrypoint: gunicorn -b :$PORT recsys_data_pipeline.main:app
     threadsafe: yes
     runtime_config:
        python_version: 2
        resources:
        memory_gb: 2.0
    

然后,我使用此命令gcloud app deploy 部署它,但出现以下错误:

default[20170417t173837]  ERROR:root:The gcloud tool was not found.
default[20170417t173837]  Traceback (most recent call last):    
File "/env/local/lib/python2.7/site-packages/apache_beam/internal/gcp/auth.py", line 109, in _refresh      ['gcloud', 'auth', 'print-access-token'], stdout=processes.PIPE)    
File "/env/local/lib/python2.7/site-packages/apache_beam/utils/processes.py", line 52, in Popen      return subprocess.Popen(*args, **kwargs)    
File "/usr/lib/python2.7/subprocess.py", line 710, in __init__      errread, errwrite)    File "/usr/lib/python2.7/subprocess.py", line 1335, in _execute_child      raise child_exception  OSError: [Errno 2] No such file or directory

从上面的消息中,我发现错误来自apache_beam auth.py class,具体来说,它来自以下函数:

def _refresh(self, http_request):
   """Gets an access token using the gcloud client."""
   try:
     gcloud_process = processes.Popen(['gcloud', 'auth', 'print-access-token'], stdout=processes.PIPE)
   except OSError as exn:
     logging.error('The gcloud tool was not found.', exc_info=True)
     raise AuthenticationException('The gcloud tool was not found: %s' % exn)
  output, _ = gcloud_process.communicate()
  self.access_token = output.strip()

当凭据(service_acount_nameservice_acount_key 未提供时调用:

if google_cloud_options.service_account_name:
      if not google_cloud_options.service_account_key_file:
        raise AuthenticationException(
            'key file not provided for service account.')
      if not os.path.exists(google_cloud_options.service_account_key_file):
        raise AuthenticationException(
            'Specified service account key file does not exist.')

else:
      try:
        credentials = _GCloudWrapperCredentials(user_agent)
        # Check if we are able to get an access token. If not fallback to
        # application default credentials.
        credentials.get_access_token()
        return credentials

所以我有两个问题:

  1. 有没有办法在我的代码或配置文件(例如:app.yaml)中的某处“附加”凭据(service_acount_nameservice_acount_key)?
  2. 从 Google 应用引擎触发数据流作业的最佳做法是什么?

非常感谢,任何建议和 cmets 都会非常有帮助!

【问题讨论】:

    标签: python google-app-engine google-cloud-platform google-cloud-dataflow apache-beam


    【解决方案1】:

    请查看https://github.com/amygdala/gae-dataflow 的官方示例。

    【讨论】:

    • hei @jkff 感谢您的回复。我尝试了上面链接中提供的分步操作,但是在部署 app.yaml 时仍然遇到相同的错误。 ERROR:root:The gcloud tool was not found.
    • 您是否在 app.yaml 中使用 custom 运行时,并且目录中旁边是否有来自 Amy 示例的 Dockerfile
    • 我确实在 app.yaml 中使用了 custom 运行时。我还创建了Dockerfile,与 Amy 的示例相同。
    • @bohr - 在上面的示例中,您使用的是runtime: python。仔细检查一下,您是否像 here 那样将其更改为 runtime: custom?您应该会在部署过程中看到 dockerfile 构建输出(包括 gcloud 安装)。
    • 嗨@AmyU。我终于设法提交了这份工作。感谢提供的链接。我决定创建一个新项目并开始采用您的示例。谢谢
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-02-28
    • 2018-09-23
    • 2021-10-17
    • 2020-08-06
    • 2010-11-17
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多