【问题标题】:Composer does not see dataflow job succeededComposer 没有看到数据流作业成功
【发布时间】:2020-02-21 01:45:03
【问题描述】:

我正在使用 Gcloud Composer 启动 Dataflow 作业。

我的 DAG 包含两个 Dataflow 作业,它们应该一个接一个地运行。

import datetime

from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
from airflow import models


default_dag_args = {

    'start_date': datetime.datetime(2019, 10, 23),
    'dataflow_default_options': {
               'project': 'myproject',
               'region': 'europe-west1',
               'zone': 'europe-west1-c',
               'tempLocation': 'gs://somebucket/',
               }
}

with models.DAG(
        'some_name',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    parameters = {'params': "param1"}

    t1 = DataflowTemplateOperator(
        task_id='dataflow_example_01',
        template='gs://path/to/template/template_001',
        parameters=parameters,
        dag=dag)

    parameters2 = {'params':"param2"}

    t2 = DataflowTemplateOperator(
        task_id='dataflow_example_02',
        template='gs://path/to/templates/template_002',
        parameters=parameters2,
        dag=dag
    )

    t1 >> t2

当我签入数据流时,作业已成功,它应该创建的所有文件都已创建,但它似乎在美国地区运行,云作曲家环境在欧洲西部。

在气流中,我可以看到第一个作业仍在运行,所以第二个作业没有启动

我应该向 DAG 添加什么才能使其成功?如何在欧洲跑步?

任何关于如何进行的建议或解决方案将不胜感激。谢谢!

【问题讨论】:

    标签: python google-cloud-dataflow google-cloud-composer


    【解决方案1】:

    过去我不得不解决这个问题。在 Airflow 1.10.2(或更低)中,代码调用 service.projects().templates().launch() 端点。这已在 1.10.3 中得到修复,而使用区域性的:service.projects().locations().templates().launch()

    截至 2019 年 10 月,可用于 Composer 环境的最新 Airflow 版本是 1.10.2。如果您立即需要解决方案,可以将修复程序反向移植到 Composer。

    为此,我们可以为我们自己的名为RegionalDataflowTemplateOperator 的版本覆盖DataflowTemplateOperator

    class RegionalDataflowTemplateOperator(DataflowTemplateOperator):
      def execute(self, context):
        hook = RegionalDataFlowHook(gcp_conn_id=self.gcp_conn_id,
                            delegate_to=self.delegate_to,
                            poll_sleep=self.poll_sleep)
    
        hook.start_template_dataflow(self.task_id, self.dataflow_default_options,
                                     self.parameters, self.template)
    

    这将利用修改后的RegionalDataFlowHook 覆盖DataFlowHook 运算符的start_template_dataflow 方法来调用正确的端点:

    class RegionalDataFlowHook(DataFlowHook):
      def _start_template_dataflow(self, name, variables, parameters,
                                   dataflow_template):
          ...
          request = service.projects().locations().templates().launch(
              projectId=variables['project'],
              location=variables['region'],
              gcsPath=dataflow_template,
              body=body
          )
          ...
          return response
    

    然后,我们可以使用我们的新运算符和 Google 提供的模板(用于测试目的)创建一个任务:

    task = RegionalDataflowTemplateOperator(
        task_id=JOB_NAME,
        template=TEMPLATE_PATH,
        parameters={
            'inputFile': 'gs://dataflow-samples/shakespeare/kinglear.txt',
            'output': 'gs://{}/europe/output'.format(BUCKET)
        },
        dag=dag,
    )
    

    完整的工作 DAG here。对于更简洁的版本,可以将操作符移到单独的模块中。

    【讨论】:

    • 感谢这项工作,但区域数据流中的请求应该是:request = service.projects().locations().templates().launch( projectId=PROJECT, location=REGION, gcsPath=dataflow_template , body=body ) 在我看来
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-11-15
    • 1970-01-01
    相关资源
    最近更新 更多