【问题标题】:trying to create dynamic subdags from parent dag based on array of filenames尝试根据文件名数组从父 dag 创建动态 subdag
【发布时间】:2020-06-01 20:16:36
【问题描述】:

我正在尝试使用气流将 s3 文件从“非删除”存储桶(意味着我无法删除文件)移动到 GCS。我不能保证每天都会有新文件,但我必须每天检查新文件。

我的问题是 subdags 的动态创建。如果有文件,我需要 subdags。如果没有文件,我不需要 subdags。我的问题是上游/下游设置。在我的代码中,它确实会检测文件,但不会像应有的那样启动 subdags。我错过了一些东西。

这是我的代码:

from airflow import models
from  airflow.utils.helpers import chain
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.contrib.operators.s3_to_gcs_operator import S3ToGoogleCloudStorageOperator
from airflow.utils import dates
from airflow.models import Variable
import logging

args = {
    'owner': 'Airflow',
    'start_date': dates.days_ago(1),
    'email': ['sinistersparrow1701@gmail.com'],
    'email_on_failure': True,
    'email_on_success': True,
}

bucket = 'mybucket'
prefix = 'myprefix/'
LastBDEXDate = int(Variable.get("last_publish_date"))
maxdate = LastBDEXDate
files = []

parent_dag = models.DAG(
    dag_id='My_Ingestion',
    default_args=args,
    schedule_interval='@daily',
    catchup=False
)

def Check_For_Files(**kwargs):
    s3 = S3Hook(aws_conn_id='S3_BOX')
    s3.get_conn()
    bucket = bucket
    LastBDEXDate = int(Variable.get("last_publish_date"))
    maxdate = LastBDEXDate
    files = s3.list_keys(bucket_name=bucket, prefix='myprefix/file')
    for file in files:
        print(file)
        print(file.split("_")[-2])
        print(file.split("_")[-2][-8:])  ##proves I can see a date in the file name is ok.
        maxdate = maxdate if maxdate > int(file.split("_")[-2][-8:]) else int(file.split("_")[-2][-8:])
    if maxdate > LastBDEXDate:
        return 'Start_Process'
    return 'finished'

def create_subdag(dag_parent, dag_id_child_prefix, file_name):
    # dag params
    dag_id_child = '%s.%s' % (dag_parent.dag_id, dag_id_child_prefix)

    # dag
    subdag = models.DAG(dag_id=dag_id_child,
              default_args=args,
              schedule_interval=None)

    # operators
    s3_to_gcs_op = S3ToGoogleCloudStorageOperator(
        task_id=dag_id_child,
        bucket=bucket,
        prefix=file_name,
        dest_gcs_conn_id='GCP_Account',
        dest_gcs='gs://my_files/To_Process/',
        replace=False,
        gzip=True,
        dag=subdag)


    return subdag

def create_subdag_operator(dag_parent, filename, index):
    tid_subdag = 'file_{}'.format(index)
    subdag = create_subdag(dag_parent, tid_subdag, filename)
    sd_op = SubDagOperator(task_id=tid_subdag, dag=dag_parent, subdag=subdag)
    return sd_op

def create_subdag_operators(dag_parent, file_list):
    subdags = [create_subdag_operator(dag_parent, file, file_list.index(file)) for file in file_list]
    # chain subdag-operators together
    chain(*subdags)
    return subdags

check_for_files = BranchPythonOperator(
    task_id='Check_for_s3_Files',
    provide_context=True,
    python_callable=Check_For_Files,
    dag=parent_dag
)

finished = DummyOperator(
    task_id='finished',
    dag=parent_dag
)

decision_to_continue = DummyOperator(
    task_id='Start_Process',
    dag=parent_dag
)

if len(files) > 0:
    subdag_ops = create_subdag_operators(parent_dag, files)
    check_for_files >> decision_to_continue >> subdag_ops[0] >> subdag_ops[-1] >> finished


check_for_files >> finished

【问题讨论】:

  • 在这些 DAGS 的后端运行什么样的作业是这些 spark 作业或一些 python 脚本,您使用什么来运行它,例如 livy 或其他一些方法
  • 对不起,我不明白这个问题。你能重述一下吗?
  • 我的意思是你只使用简单的 python 脚本而不使用任何 spark 作业对吗?
  • 是的。气流中默认的简单运算符。我想根据 S3 中的标记文件以动态速率添加现有操作符,我想摄取到 GCS 中。
  • 为什么files 是一个空列表?

标签: python airflow directed-acyclic-graphs


【解决方案1】:

以下是在气流中创建动态 DAG 或子 DAG 的推荐方法,尽管还有其他方法,但我想这在很大程度上适用于您的问题。

首先,创建一个文件(yaml/csv),其中包括所有s3 文件和位置的列表,在您的情况下,您已经编写了一个将它们存储在列表中的函数,我会说将它们存储在一个单独的yaml 文件中并在运行时在气流环境中加载它,然后创建 DAG。

下面是一个示例yaml 文件: dynamicDagConfigFile.yaml

job: dynamic-dag
bucket_name: 'bucket-name'
prefix: 'bucket-prefix'
S3Files:
    - File1: 'S3Loc1'
    - File2: 'S3Loc2'
    - File3: 'S3Loc3'

您可以修改 Check_For_Files 函数以将它们存储在 yaml 文件中。

现在我们可以继续创建动态 dag:

首先使用虚拟运算符定义两个任务,即开始任务和结束任务。这些任务是我们将在 DAG 的基础上通过在它们之间动态创建任务来构建的任务:

start = DummyOperator(
    task_id='start',
    dag=dag
)

end = DummyOperator(
    task_id='end',
    dag=dag)

动态 DAG: 我们将在气流中使用PythonOperators。该函数应接收任务 ID 作为参数;要执行的 python 函数,即 Python 运算符的 python_callable;以及在执行期间使用的一组参数。

包含一个参数task id。因此,我们可以在动态生成的任务之间交换数据,例如通过XCOM

您可以在此动态 dag 中指定您的操作函数,例如 s3_to_gcs_op

def createDynamicDAG(task_id, callableFunction, args):
    task = PythonOperator(
        task_id = task_id,
        provide_context=True,
        #Eval is used since the callableFunction var is of type string
        #while the python_callable argument for PythonOperators only receives objects of type callable not strings.
        python_callable = eval(callableFunction),
        op_kwargs = args,
        xcom_push = True,
        dag = dag,
    )
    return task

最后根据 yaml 文件中的位置可以创建动态 dag,首先读取 yaml 文件如下并创建动态 dag:

with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
    # use safe_load instead to load the YAML file
    configFile = yaml.safe_load(f)

    #Extract file list
    S3Files = configFile['S3Files']

    #In this loop tasks are created for each table defined in the YAML file
    for S3File in S3Files:
        for S3File, fieldName in S3File.items():

            #Remember task id is provided in order to exchange data among tasks generated in dynamic way.
            get_s3_files = createDynamicDAG('{}-getS3Data'.format(S3File), 
                                            'getS3Data', 
                                            {}) #your configs here.

            #Second step is upload S3 to GCS
            upload_s3_toGCS = createDynamicDAG('{}-uploadDataS3ToGCS'.format(S3File), 'uploadDataS3ToGCS', {'previous_task_id':'{}-'})

#write your configs again here like S3 bucket name prefix extra or read from yaml file, and other GCS config.

最终 DAG 定义:

想法是这样的

#once tasks are generated they should linked with the
#dummy operators generated in the start and end tasks. 
start >> get_s3_files
get_s3_files >> upload_s3_toGCS
upload_s3_toGCS >> end

按顺序完整气流代码:

import yaml
import airflow
from airflow import DAG
from datetime import datetime, timedelta, time
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator

start = DummyOperator(
    task_id='start',
    dag=dag
)


def createDynamicDAG(task_id, callableFunction, args):
    task = PythonOperator(
        task_id = task_id,
        provide_context=True,
        #Eval is used since the callableFunction var is of type string
        #while the python_callable argument for PythonOperators only receives objects of type callable not strings.
        python_callable = eval(callableFunction),
        op_kwargs = args,
        xcom_push = True,
        dag = dag,
    )
    return task


end = DummyOperator(
    task_id='end',
    dag=dag)



with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
    configFile = yaml.safe_load(f)

    #Extract file list
    S3Files = configFile['S3Files']

    #In this loop tasks are created for each table defined in the YAML file
    for S3File in S3Files:
        for S3File, fieldName in S3File.items():

            #Remember task id is provided in order to exchange data among tasks generated in dynamic way.
            get_s3_files = createDynamicDAG('{}-getS3Data'.format(S3File), 
                                            'getS3Data', 
                                            {}) #your configs here.

            #Second step is upload S3 to GCS
            upload_s3_toGCS = createDynamicDAG('{}-uploadDataS3ToGCS'.format(S3File), 'uploadDataS3ToGCS', {'previous_task_id':'{}-'})

#write your configs again here like S3 bucket name prefix extra or read from yaml file, and other GCS config.


start >> get_s3_files
get_s3_files >> upload_s3_toGCS
upload_s3_toGCS >> end

【讨论】:

  • 非常感谢。所以我遇到的问题之一是如果没有新文件会发生什么?我面临的一个问题是,这个地方总会有文件,但不能保证有新文件可以提取,这意味着upload_s3_toGCS 部分将不存在,并且气流错误。
  • 您可以通过从yaml 文件中删除这些文件来解决问题,一旦所有这些文件都上传到GCS,这样yaml 文件中只会出现新文件。如果没有新文件,yaml 文件将为空并且不会创建动态 dag。这就是为什么 yaml 文件比将文件存储在列表中要好得多的原因。
  • yaml 文件在某种程度上也有助于维护 s3 文件的日志记录,如果假设某些 s3 文件无法上传到 GCS,那么您还可以维护一个对应于的标志该文件,然后在下一次 DAG 运行时重试。
  • 如果没有新文件,您可以在 DAG 之前放置一个if 条件,如果有新文件,它将检查yaml 文件中的新文件,否则执行它,否则跳过它。跨度>
  • 这里的问题是设置了下游。如果下游设置没有实际作业(因为不存在文件),则会出错。
猜你喜欢
  • 1970-01-01
  • 2017-01-15
  • 1970-01-01
  • 2021-09-27
  • 1970-01-01
  • 2017-06-17
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多