【问题标题】:Executing sequential and concurrent tasks within one DAG在一个 DAG 中执行顺序和并发任务
【发布时间】:2021-02-26 02:43:18
【问题描述】:

我是 Airflow 的新手,对于如何在一个 DAG 中正确地同时运行一些任务和其他顺序的任务有一些基本的问题。

在我的 DAG 中,基本步骤是:刷新数据、运行 3 个单独的脚本、部署。这些应用程序中的每一个都在单独的 Docker 容器中运行。

在下面的示例中,一切都是按顺序完成的,但是,我的目标是刷新数据,然后执行 this、that 和 the_other_thing并行,然后部署。

refresh >> [this, that, the_other_thing] >> deploy

我只想在[this, that, the_other_thing] 完成后进行部署,但目前还不清楚这三个中的哪一个会最后完成。在一个 DAG 中执行此序列的最佳实践是什么?设置concurrency=3 并在for loop 中执行[this, that, the_other_thing] 是否足够?任何建议表示赞赏

from builtins import range
from datetime import timedelta, datetime

from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from airflow.hooks.base_hook import BaseHook

image = 'myserver.com:8080/my_project:latest'

args = {
    'owner': 'Airflow',
    'start_date': datetime(2020,01,01),
    'depends_on_past': False,
    "retries": 2,
    'retry_delay': timedelta(minutes=5)
}

conn_foo_db = BaseHook.get_connection('foobar_db')
conn_docker = BaseHook.get_connection('my_registry')

dag = DAG(
    dag_id='analysis',
    default_args=args,
    schedule_interval='0 3 * * *',
    dagrun_timeout=timedelta(minutes=180),
    max_active_runs=1,
    concurrency=1,
    tags=['daily']
)

refresh_data = BashOperator(
    task_id='refresh_data',
    bash_command='docker run '
                 '-i --rm '
                 f"-e DB_PASSWORD='{ conn_foo_db.password }' "
                 f' { image }  '
                 'app=refresh',
    dag=dag,
)

this = BashOperator(
    task_id='run_app_this',
    bash_command='docker run '
                 '-i --rm '
                 f"-e DB_PASSWORD='{ conn_foo_db.password }' "
                 f' { image }  '
                 'app=do_this ',
    dag=dag,
)

that = BashOperator(
    task_id='run_app_that',
    bash_command='docker run '
                 '-i --rm '
                 f"-e DB_PASSWORD='{ conn_foo_db.password }' "
                 f' { image }  '
                 'app=do_that',
    dag=dag,
)

the_other_thing = BashOperator(
    task_id='run_app_the_other_thing',
    bash_command='docker run '
                 '-i --rm '
                 f"-e DB_PASSWORD='{ conn_foo_db.password }' "
                 f' { image }  '
                 'app=do_the_other_thing ',
    dag=dag,
)

deploy = BashOperator(
    task_id='deploy',
    bash_command='docker run '
                 '-i --rm '
                 f"-e DB_PASSWORD='{ conn_foo_db.password }' "
                 f' { image }  '
                 'app=deploy ',
    dag=dag,
)

refresh_data >> run_app_this >> run_app_that >> run_app_the_other_thing >> deploy_to_dashboard

if __name__ == "__main__":
    dag.cli()

【问题讨论】:

    标签: python docker airflow airflow-scheduler directed-acyclic-graphs


    【解决方案1】:

    是的,你的假设是正确的。 可能的代码是:

    tasks_list = ["this", "that", "the_other_thing"]
    
    refresh_data = BashOperator(
        task_id='refresh_data_task',
        bash_command='docker run '
                     '-i --rm '
                     f"-e DB_PASSWORD='{ conn_foo_db.password }' "
                     f' { image }  '
                     'app=refresh',
        dag=dag,
    )
    
    deploy = BashOperator(
        task_id='deploy_task',
        bash_command='docker run '
                     '-i --rm '
                     f"-e DB_PASSWORD='{ conn_foo_db.password }' "
                     f' { image }  '
                     'app=deploy ',
        dag=dag,
    )
    
    for task in tasks_list:
        task_op = BashOperator(
            task_id=f'run_{task}_task',
            bash_command='docker run '
                         '-i --rm '
                         f"-e DB_PASSWORD='{conn_foo_db.password}' "
                         f' {image}  '
                         f'app=do_{task}',
            dag=dag,
        )
        refresh_data >> task_op >> deploy
    

    由于默认触发规则是ALL_SUCESS,所以deploy只有在tasks_list中的所有任务都成功后才会开始运行。

    一些注意事项:

    1. 您多次使用相同的代码,您可能需要考虑创建某种配置文件,其中包含设置操作符所需的依赖项和所有信息,然后使用工厂方法从该文件构造您的操作符,从而避免重复DAG 文件中的代码。
    2. 避免访问存储在操作员范围之外的 Airflow Metastore 中的连接。这是一个不好的做法。 Airflow 会定期扫描您的 DAG 文件(根据 min_file_process_interval),这将导致数据库上的大量数据。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-12-25
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-06-25
      相关资源
      最近更新 更多