【问题标题】:Proper way to create dynamic workflows in Airflow在 Airflow 中创建动态工作流的正确方法
【发布时间】:2017-05-21 22:19:38
【问题描述】:

问题

Airflow 中是否有任何方法可以创建一个工作流,以便在任务 A 完成之前,任务 B.* 的数量是未知的?我查看了 subdags,但它似乎只能用于必须在创建 Dag 时确定的一组静态任务。

dag 触发器会起作用吗?如果可以,请提供一个例子。

我有一个问题,在任务 A 完成之前,无法知道计算任务 C 所需的任务 B 的数量。每个任务 B.* 都需要几个小时来计算,并且不能合并。

              |---> Task B.1 --|
              |---> Task B.2 --|
 Task A ------|---> Task B.3 --|-----> Task C
              |       ....     |
              |---> Task B.N --|

想法 #1

我不喜欢这个解决方案,因为我必须创建一个阻塞的 ExternalTask​​Sensor 并且所有任务 B.* 将需要 2-24 小时才能完成。所以我不认为这是一个可行的解决方案。当然有更简单的方法吗?还是 Airflow 不是为此而设计的?

Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C

Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
               |-- Task B.1 --|
               |-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
               |     ....     |
               |-- Task B.N --|

编辑 1:

到目前为止,这个问题仍然没有很好的答案。有几个人联系了我,寻求解决方案。

【问题讨论】:

  • 所有任务 B* 是否都相似,因为它们可以循环创建?
  • 是的,一旦任务 A 完成,所有 B.* 任务都可以在循环中快速创建。任务 A 大约需要 2 小时才能完成。
  • 您找到解决问题的方法了吗?你介意张贴吗?
  • 创意 #1 的有用资源:linkedin.com/pulse/…
  • 这是我写的一篇文章,解释了如何做到这一点linkedin.com/pulse/dynamic-workflows-airflow-kyle-bridenstine

标签: python workflow airflow


【解决方案1】:

此功能正在积极开发中,暂定用于 Airflow 2.3(2022 年第一季度)。在此处查看 Airflow Improvement Proposal 42(动态任务映射):

相关链接:

来自 AIP 的原型代码展示了如何通过 s3 中的动态文件列表彻底删除。注意部分(部分使用一些运算符参数)和映射函数的使用:

from airflow.providers.amazon.aws.operators.s3_delete_objects import S3DeleteObjectsOperator
 
@task
def get_files_from_s3():
    # S3 list operation
    ...
 
my_files = get_files_from_s3()
s3_delete_files = S3DeleteObjectsOperator.partial(
   aws_conn_id="my-aws-conn-id",
   bucket="my-bucket"
).map(key=my_files)

【讨论】:

    【解决方案2】:

    一个很好的答案

    太多了?无论如何。

    许多其他答案有点方钉圆孔。添加复杂的新运算符,滥用内置变量,或者有些无法回答问题。我对它们中的任何一个都不是特别满意,因为它们要么在通过 Web UI 查看时隐藏其行为,要么容易出错,要么需要大量自定义代码(这也容易出错)。

    此解决方案使用内置功能,不需要新的运算符和有限的附加代码,DAG 无需任何技巧即可通过 UI 可见,并遵循气流最佳实践(请参阅 idempotency)。

    这个问题的解决方案相当复杂,所以我把它分成了几个部分。它们是:

    • 如何安全地触发动态数量的任务
    • 如何等待所有这些任务完成然后调用最终任务
    • 如何将其集成到您的任务管道中
    • 限制(没有什么是完美的)

    一个任务可以触发动态数量的其他任务吗?

    是的。有点。无需编写任何新的运算符,就可以让一个 DAG 触发动态数量的其他 DAG,只使用内置运算符。然后可以将其扩展为使 DAG 依赖于动态数量的其他 DAG(请参阅等待任务完成)。这类似于flinz's solution,但更健壮且自定义代码更少。

    这是使用一个 BranchPythonOperator 来完成的,它选择性地触发 2 个其他 TriggerDagRunOperator。其中一个递归地重新调用当前 DAG,另一个调用外部 dag,即目标函数。

    可用于触发 dag 的示例配置在 recursive_dag.py 的顶部给出。

    print_conf.py(要触发的示例 DAG)

    from datetime import timedelta
    
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    from airflow.utils.dates import days_ago
    
    def print_output(dag_run):
        dag_conf = dag_run.conf
        if 'output' in dag_conf:
            output = dag_conf['output']
        else:
            output = 'no output found'
        print(output)
    
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'retries': 0,
        'retry_delay': timedelta(minutes=5),
    }
    
    with DAG(
        'print_output',
        start_date=days_ago(2),
        tags=['my_test'],
        default_args=default_args,
        description='A simple test DAG',
        schedule_interval=None
    ) as dag:
        print_output = PythonOperator(
            task_id='print_output_task',
            python_callable=print_output
        )
    

    recursive_dag.py(魔法发生的地方)

    """
    DAG that can be used to trigger multiple other dags.
    For example, trigger with the following config:
    {
        "task_list": ["print_output","print_output"],
        "conf_list": [
            {
                "output": "Hello"
            },
            {
                "output": "world!"
            }
        ]
    }
    """
    
    from datetime import timedelta
    import json
    
    from airflow import DAG
    from airflow.operators.python import BranchPythonOperator
    from airflow.operators.trigger_dagrun import TriggerDagRunOperator
    from airflow.utils.dates import days_ago
    
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'retries': 0,
        'retry_delay': timedelta(minutes=5),
    }
    
    dag_id = 'branch_recursive'
    branch_id = 'branch_operator'
    repeat_task_id = 'repeat_dag_operator'
    repeat_task_conf = repeat_task_id + '_conf'
    next_task_id = 'next_dag_operator'
    next_task_conf = next_task_id + '_conf'
    
    def choose_branch(task_instance, dag_run):
        dag_conf = dag_run.conf
        task_list = dag_conf['task_list']
        next_task = task_list[0]
        later_tasks = task_list[1:]
        conf_list = dag_conf['conf_list']
        # dump to string because value is stringified into
        # template string, is then parsed.
        next_conf = json.dumps(conf_list[0])
        later_confs = conf_list[1:]
    
        task_instance.xcom_push(key=next_task_id, value=next_task)
        task_instance.xcom_push(key=next_task_conf, value=next_conf)
    
        if later_tasks:
            repeat_conf = json.dumps({
                'task_list': later_tasks,
                'conf_list': later_confs
            })
    
            task_instance.xcom_push(key=repeat_task_conf, value=repeat_conf)
            return [next_task_id, repeat_task_id]
    
        return next_task_id
    
    def add_braces(in_string):
        return '{{' + in_string + '}}'
    
    def make_templated_pull(key):
        pull = f'ti.xcom_pull(key=\'{key}\', task_ids=\'{branch_id}\')'
        return add_braces(pull)
    
    with DAG(
        dag_id,
        start_date=days_ago(2),
        tags=['my_test'],
        default_args=default_args,
        description='A simple test DAG',
        schedule_interval=None
    ) as dag:
        branch = BranchPythonOperator(
            task_id=branch_id,
            python_callable=choose_branch
        )
    
        trigger_next = TriggerDagRunOperator(
            task_id=next_task_id,
            trigger_dag_id=make_templated_pull(next_task_id),
            conf=make_templated_pull(next_task_conf)
        )
    
        trigger_repeat = TriggerDagRunOperator(
            task_id=repeat_task_id,
            trigger_dag_id=dag_id,
            conf=make_templated_pull(repeat_task_conf)
        )
    
        branch >> [trigger_next, trigger_repeat]
    

    此解决方案的优点是使用非常有限的自定义代码。 flinz 的解决方案可能会中途失败,导致一些计划任务和其他任务不会。然后在重试时,DAGS 可能被安排运行两次,或者在第一个 dag 上失败,导致失败的任务完成部分工作。这种方法会告诉您哪些 DAG 未能触发,并仅重试未能触发的 DAG。因此,这种方法是幂等的,而另一种则不是。

    一个 DAG 能否依赖于动态数量的其他 DAGS?

    是的,但是...如果任务不并行运行,这很容易做到。并行运行更复杂。

    要按顺序运行,重要的更改是在trigger_next 中使用wait_for_completion=True,使用python 运算符在“trigger_next”之前设置xcom 值,并添加一个启用或禁用重复任务的分支运算符,然后具有线性相关性

    setup_xcom >> trigger_next >> branch >> trigger_repeat
    

    要并行运行,您可以类似地递归链接多个使用模板化 external_dag_id 值的 ExternalTask​​Sensor,以及与触发的 dag 运行相关的时间戳。要获取触发的 dag 时间戳,您可以使用触发 dag 的时间戳来触发 dag。然后这些传感器一个一个地等待所有创建的 DAG 完成,然后触发最终的 DAG。下面的代码,这次我在打印输出 DAG 中添加了一个随机睡眠,以便等待 dags 实际上做一些等待。

    注意:recurse_wait_dag.py 现在定义了 2 个 dag,这两个都需要启用才能正常工作。

    可用于触发 dag 的示例配置在 recurse_wait_dag.py 的顶部给出

    print_conf.py(修改为添加随机睡眠)

    """
    Simple dag that prints the output in DAG config
    Used to demo TriggerDagRunOperator (see recursive_dag.py)
    """
    
    from datetime import timedelta
    from time import sleep
    from random import randint
    
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    from airflow.utils.dates import days_ago
    
    def print_output(dag_run):
        sleep_time = randint(15,30)
        print(f'sleeping for time: {sleep_time}')
        sleep(sleep_time)
        dag_conf = dag_run.conf
        if 'output' in dag_conf:
            output = dag_conf['output']
        else:
            output = 'no output found'
        print(output)
    
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'retries': 0,
        'retry_delay': timedelta(minutes=5),
    }
    
    with DAG(
        'print_output',
        start_date=days_ago(2),
        tags=['my_test'],
        default_args=default_args,
        description='A simple test DAG',
        schedule_interval=None
    ) as dag:
        print_output = PythonOperator(
            task_id='print_output_task',
            python_callable=print_output
        )
    

    recurse_wait_dag.py(更神奇的地方)

    """
    DAG that can be used to trigger multiple other dags,
    waits for all dags to execute, then triggers a final dag.
    For example, trigger the DAG 'recurse_then_wait' with the following config:
    {
        "final_task": "print_output",
        "task_list": ["print_output","print_output"],
        "conf_list": [
            {
                "output": "Hello"
            },
            {
                "output": "world!"
            }
        ]
    }
    """
    
    
    from datetime import timedelta
    import json
    
    from airflow import DAG
    from airflow.operators.python import BranchPythonOperator, PythonOperator
    from airflow.operators.trigger_dagrun import TriggerDagRunOperator
    from airflow.utils.dates import days_ago
    from airflow.sensors.external_task import ExternalTaskSensor
    from airflow.utils import timezone
    
    from common import make_templated_pull
    
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'retries': 0,
        'retry_delay': timedelta(minutes=5),
    }
    
    def to_conf(id):
        return f'{id}_conf'
    
    def to_execution_date(id):
        return f'{id}_execution_date'
    
    def to_ts(id):
        return f'{id}_ts'
    
    recurse_dag_id = 'recurse_then_wait'
    branch_id = 'recursive_branch'
    repeat_task_id = 'repeat_dag_operator'
    repeat_task_conf = to_conf(repeat_task_id)
    next_task_id = 'next_dag_operator'
    next_task_conf = to_conf(next_task_id)
    next_task_execution_date = to_execution_date(next_task_id)
    end_task_id = 'end_task'
    end_task_conf = to_conf(end_task_id)
    
    wait_dag_id = 'wait_after_recurse'
    choose_wait_id = 'choose_wait'
    next_wait_id = 'next_wait'
    next_wait_ts = to_ts(next_wait_id)
    
    def choose_branch(task_instance, dag_run, ts):
        dag_conf = dag_run.conf
        task_list = dag_conf['task_list']
        next_task = task_list[0]
        # can't have multiple dag runs of same DAG with same timestamp 
        assert next_task != recurse_dag_id
        later_tasks = task_list[1:]
        conf_list = dag_conf['conf_list']
        next_conf = json.dumps(conf_list[0])
        later_confs = conf_list[1:]
        triggered_tasks = dag_conf.get('triggered_tasks', []) + [(next_task, ts)]
    
        task_instance.xcom_push(key=next_task_id, value=next_task)
        task_instance.xcom_push(key=next_task_conf, value=next_conf)
        task_instance.xcom_push(key=next_task_execution_date, value=ts)
    
        if later_tasks:
            repeat_conf = json.dumps({
                'task_list': later_tasks,
                'conf_list': later_confs,
                'triggered_tasks': triggered_tasks,
                'final_task': dag_conf['final_task']
            })
    
            task_instance.xcom_push(key=repeat_task_conf, value=repeat_conf)
            return [next_task_id, repeat_task_id]
        
        end_conf = json.dumps({
            'tasks_to_wait': triggered_tasks,
            'final_task': dag_conf['final_task']
        })
        task_instance.xcom_push(key=end_task_conf, value=end_conf)
    
        return [next_task_id, end_task_id]
    
    def choose_wait_target(task_instance, dag_run):
        dag_conf = dag_run.conf
        tasks_to_wait = dag_conf['tasks_to_wait']
        next_task, next_ts = tasks_to_wait[0]
        later_tasks = tasks_to_wait[1:]
        task_instance.xcom_push(key=next_wait_id, value=next_task)
        task_instance.xcom_push(key=next_wait_ts, value=next_ts)
        
        if later_tasks:
            repeat_conf = json.dumps({
                'tasks_to_wait': later_tasks,
                'final_task': dag_conf['final_task']
            })
            task_instance.xcom_push(key=repeat_task_conf, value=repeat_conf)
            
    def execution_date_fn(_, task_instance):
        date_str = task_instance.xcom_pull(key=next_wait_ts, task_ids=choose_wait_id)
        return timezone.parse(date_str)
    
    def choose_wait_branch(task_instance, dag_run):
        dag_conf = dag_run.conf
        tasks_to_wait = dag_conf['tasks_to_wait']
    
        if len(tasks_to_wait) == 1:
            return end_task_id
    
        return repeat_task_id
    
    with DAG(
        recurse_dag_id,
        start_date=days_ago(2),
        tags=['my_test'],
        default_args=default_args,
        description='A simple test DAG',
        schedule_interval=None
    ) as recursive_dag:
        branch = BranchPythonOperator(
            task_id=branch_id,
            python_callable=choose_branch
        )
    
        trigger_next = TriggerDagRunOperator(
            task_id=next_task_id,
            trigger_dag_id=make_templated_pull(next_task_id, branch_id),
            execution_date=make_templated_pull(next_task_execution_date, branch_id),
            conf=make_templated_pull(next_task_conf, branch_id)
        )
    
        trigger_repeat = TriggerDagRunOperator(
            task_id=repeat_task_id,
            trigger_dag_id=recurse_dag_id,
            conf=make_templated_pull(repeat_task_conf, branch_id)
        )
    
        trigger_end = TriggerDagRunOperator(
            task_id=end_task_id,
            trigger_dag_id=wait_dag_id,
            conf=make_templated_pull(end_task_conf, branch_id)
        )
    
        branch >> [trigger_next, trigger_repeat, trigger_end]
    
    with DAG(
        wait_dag_id,
        start_date=days_ago(2),
        tags=['my_test'],
        default_args=default_args,
        description='A simple test DAG',
        schedule_interval=None
    ) as wait_dag:
        py_operator = PythonOperator(
            task_id=choose_wait_id,
            python_callable=choose_wait_target
        )
    
        sensor = ExternalTaskSensor(
            task_id='do_wait',
            external_dag_id=make_templated_pull(next_wait_id, choose_wait_id),
            execution_date_fn=execution_date_fn
        )
    
        branch = BranchPythonOperator(
            task_id=branch_id,
            python_callable=choose_wait_branch
        )
    
        trigger_repeat = TriggerDagRunOperator(
            task_id=repeat_task_id,
            trigger_dag_id=wait_dag_id,
            conf=make_templated_pull(repeat_task_conf, choose_wait_id)
        )
    
        trigger_end = TriggerDagRunOperator(
            task_id=end_task_id,
            trigger_dag_id='{{ dag_run.conf[\'final_task\'] }}'
        )
    
        py_operator >> sensor >> branch >> [trigger_repeat, trigger_end]
    

    与您的代码集成

    这很好,但你想实际使用它。那么,你需要做什么?该问题包括一个尝试执行以下操作的示例:

                 |---> Task B.1 --|
                 |---> Task B.2 --|
    Task A ------|---> Task B.3 --|-----> Task C
                 |       ....     |
                 |---> Task B.N --|
    

    要实现问题目标(下面的示例实现),您需要将任务 A、B 和 C 分离到各自的 DAG 中。然后,在 DAG A 的末尾添加一个新的运算符来触发上述 DAG 'recurse_then_wait'。向这个 dag 传递一个配置,其中包括每个 B DAG 所需的配置,以及 B dag id(这可以很容易地更改为使用不同的 dag,发疯)。然后包含 DAG C 的名称,即最终的 DAG,将在最后运行。此配置应如下所示:

    {
        "final_task": "C_DAG",
        "task_list": ["B_DAG","B_DAG"],
        "conf_list": [
            {
                "b_number": 1,
                "more_stuff": "goes_here"
            },
            {
                "b_number": 2,
                "foo": "bar"
            }
        ]
    }
    

    实现后应该是这样的:

    trigger_recurse.py

    from datetime import timedelta
    import json
    
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    from airflow.operators.trigger_dagrun import TriggerDagRunOperator
    from airflow.utils.dates import days_ago
    
    from recurse_wait_dag import recurse_dag_id
    
    def add_braces(in_string):
        return '{{' + in_string + '}}'
    
    def make_templated_pull(key, task_id):
        pull = f'ti.xcom_pull(key=\'{key}\', task_ids=\'{task_id}\')'
        return add_braces(pull)
    
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'retries': 0,
        'retry_delay': timedelta(minutes=5),
    }
    
    setup_trigger_conf_id = 'setup_trigger_conf'
    trigger_conf_key = 'trigger_conf'
    
    def setup_trigger_conf(task_instance):
        trigger_conf = {
            'final_task': 'print_output',
            'task_list': ['print_output','print_output'],
            'conf_list': [
                {
                    'output': 'Hello'
                },
                {
                    'output': 'world!'
                }
            ]
        }
    
        print('Triggering the following tasks')
        for task, conf in zip(trigger_conf['task_list'], trigger_conf['conf_list']):
            print(f'    task: {task} with config {json.dumps(conf)}')
        print(f'then waiting for completion before triggering {trigger_conf["final_task"]}')
    
        task_instance.xcom_push(key=trigger_conf_key, value=json.dumps(trigger_conf))
    
    with DAG(
        'trigger_recurse_example',
        start_date=days_ago(2),
        tags=['my_test'],
        default_args=default_args,
        description='A simple test DAG',
        schedule_interval=None
    ) as dag:
        py_operator = PythonOperator(
            task_id=setup_trigger_conf_id,
            python_callable=setup_trigger_conf
        )
    
        trigger_operator = TriggerDagRunOperator(
            task_id='trigger_call_and_wait',
            trigger_dag_id=recurse_dag_id,
            conf=make_templated_pull(trigger_conf_key, setup_trigger_conf_id)
        )
    
        py_operator >> trigger_operator
    

    所有这些最终看起来像下面这样,垂直和水平线显示一个 DAG 触发另一个 DAG 的位置:

    A
    |
    Recurse - B.1
    |
    Recurse - B.2
    |
    ...
    |
    Recurse - B.N
    |
    Wait for B.1
    |
    Wait for B.2
    |
    ...
    |
    Wait for B.N
    |
    C
    

    限制

    任务不再在单个图表上可见。这可能是这种方法的最大问题。通过将标签添加到所有关联的 DAG,至少可以一起查看 DAG。然而,将 DAG B 的多个并行运行与 DAG A 的运行联系起来是混乱的。但是,由于单个 DAG 运行显示其输入配置,这意味着每个 DAG B 运行不依赖于 DAG A,仅依赖于它的输入配置。因此,这种关系至少可以部分忽略。

    任务不能再使用 xcom 进行通信。 B 任务可以通过 DAG 配置从任务 A 接收输入,但是任务 C 无法从 B 任务获得输出。所有 B 任务的结果应该放在一个已知位置,然后由任务 C 读取。

    'recurse_and_wait' 的配置参数可能会被改进以结合 task_list 和 conf_list,但这解决了上述问题。

    最终 DAG 没有配置。这应该很容易解决。

    【讨论】:

      【解决方案3】:

      范式转换

      根据这里的所有答案,在我看来,最好的方法不是将动态“工作列表”生成代码视为初始任务,而是将其视为 DAG 定义之前的计算。

      当然,这假设在每次 DAG 运行开始时只进行一次初始计算(如 OP 所述)。如果某些中途任务必须重新定义 DAG,这种方法将不起作用,气流似乎不是为这种模式而构建的。但是,请考虑链接控制器/目标 DAG(见下文)。

      代码示例:

      from airflow.decorators import dag, task
      from airflow.operators.dummy import DummyOperator
      from airflow.providers.postgres.hooks.postgres import PostgresHook
      from airflow.utils.dates import days_ago
      
      DEFAULT_ARGS = {"owner": "airflow"}
      
      
      def get_list_of_things(connection_id):
          list_all_the_things_sql = """
          SELECT * FROM things 
          """
          pg_hook = PostgresHook(postgres_conn_id=connection_id)
          connection = pg_hook.get_conn()
          cursor = connection.cursor()
          cursor.execute(list_all_the_things_sql)  # NOTE: this will execute to build the DAG, so if you grock the code, expect the DAG not to load, unless you have a valid postgres DB with a table named "things" and with things in it.
          res = cursor.fetchall()
          return res
      
      
      @dag(default_args=DEFAULT_ARGS, schedule_interval="@once", start_date=days_ago(2), dag_id='test_joey_dag')
      def dynamicly_generated_dag():
          connection_id = "ProdDB"
      
          @task
          def do_a_thing(row):
              print(row)
              return row
      
          start = DummyOperator(task_id='start')
          end = DummyOperator(task_id='end')
          
      
          data_list = get_list_of_things(connection_id)
          for row in data_list:
              start >> do_a_thing(row) >> end
      
      
      dag = dynamicly_generated_dag()
      

      如果get_list_of_things() 计算时间很长,那么谨慎的做法是预先计算它并使用控制器/目标模式在外部触发此 DAG:
      trigger_controller_dag
      trigger_target_dag

      【讨论】:

        【解决方案4】:

        根据上下文,这可以以异步批处理工作方式实现。 “动态任务”可以被视为要完成的工作项列表,并拆分为异步消息发布到外部消息代理队列以供工作节点拾取。

        一个任务动态生成“工作”并将所有项目(我们事先不知道有多少,甚至不知道具体是哪一个)发布到一个主题/队列中。

        工人从队列中消费“工作任务”。如果使用 Airflow 技术的外部实现,或者作为 Airflow Sensor 任务(可能在单独的 DAG 中)直接实现。当他们完成任务处理后,气流传感器被触发并继续执行流程。

        要恢复单个工作项的流程,请考虑使用 EIP 声明检查模式。

        【讨论】:

          【解决方案5】:

          不明白问题出在哪里?

          Here 是一个标准示例。 现在,如果在函数subdag 中将for i in range(5): 替换为for i in range(random.randint(0, 10)):,那么一切都会正常工作。 现在想象一下,操作符 'start' 将数据放入一个文件中,而不是一个随机值,该函数将读取这些数据。那么操作符'start'会影响任务的数量。

          问题只会出现在 UI 的显示中,因为当进入 subdag 时,任务的数量将等于当前从文件/数据库/XCom 中读取的最后一次。 这会自动限制一次多次启动一个 dag。

          【讨论】:

          • 看似基本的东西在网上很难找到答案。这应该是我所做的所有研究中动态工作流的答案。虽然重要的是要提到 subdag 的一些限制,如死锁、性能等。
          • 我想最初的问题是只用任务而不是 subdags 来实现这个(尽管我同意在这里强调 subdag 可能是一个更合适的工具是很自然的)。
          【解决方案6】:

          作业图不是在运行时生成的。相反,当 Airflow 从您的 dags 文件夹中获取该图形时,它会构建该图形。因此,每次运行时都不可能为作业创建不同的图表。您可以将作业配置为在加载时基于查询构建图表。此后每次运行该图表都将保持不变,这可能不是很有用。

          您可以使用分支运算符设计一个图表,在每次运行时根据查询结果执行不同的任务。

          我所做的是预先配置一组任务,然后获取查询结果并将它们分布到各个任务中。无论如何,这可能会更好,因为如果您的查询返回大量结果,您可能不希望调度程序充斥着大量并发任务。为了更安全,我还使用了一个池来确保我的并发不会因意外的大查询而失控。

          """
           - This is an idea for how to invoke multiple tasks based on the query results
          """
          import logging
          from datetime import datetime
          
          from airflow import DAG
          from airflow.hooks.postgres_hook import PostgresHook
          from airflow.operators.mysql_operator import MySqlOperator
          from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
          from include.run_celery_task import runCeleryTask
          
          ########################################################################
          
          default_args = {
              'owner': 'airflow',
              'catchup': False,
              'depends_on_past': False,
              'start_date': datetime(2019, 7, 2, 19, 50, 00),
              'email': ['rotten@stackoverflow'],
              'email_on_failure': True,
              'email_on_retry': False,
              'retries': 0,
              'max_active_runs': 1
          }
          
          dag = DAG('dynamic_tasks_example', default_args=default_args, schedule_interval=None)
          
          totalBuckets = 5
          
          get_orders_query = """
          select 
              o.id,
              o.customer
          from 
              orders o
          where
              o.created_at >= current_timestamp at time zone 'UTC' - '2 days'::interval
              and
              o.is_test = false
              and
              o.is_processed = false
          """
          
          ###########################################################################################################
          
          # Generate a set of tasks so we can parallelize the results
          def createOrderProcessingTask(bucket_number):
              return PythonOperator( 
                                     task_id=f'order_processing_task_{bucket_number}',
                                     python_callable=runOrderProcessing,
                                     pool='order_processing_pool',
                                     op_kwargs={'task_bucket': f'order_processing_task_{bucket_number}'},
                                     provide_context=True,
                                     dag=dag
                                    )
          
          
          # Fetch the order arguments from xcom and doStuff() to them
          def runOrderProcessing(task_bucket, **context):
              orderList = context['ti'].xcom_pull(task_ids='get_open_orders', key=task_bucket)
          
              if orderList is not None:
                  for order in orderList:
                      logging.info(f"Processing Order with Order ID {order[order_id]}, customer ID {order[customer_id]}")
                      doStuff(**op_kwargs)
          
          
          # Discover the orders we need to run and group them into buckets for processing
          def getOpenOrders(**context):
              myDatabaseHook = PostgresHook(postgres_conn_id='my_database_conn_id')
          
              # initialize the task list buckets
              tasks = {}
              for task_number in range(0, totalBuckets):
                  tasks[f'order_processing_task_{task_number}'] = []
          
              # populate the task list buckets
              # distribute them evenly across the set of buckets
              resultCounter = 0
              for record in myDatabaseHook.get_records(get_orders_query):
          
                  resultCounter += 1
                  bucket = (resultCounter % totalBuckets)
          
                  tasks[f'order_processing_task_{bucket}'].append({'order_id': str(record[0]), 'customer_id': str(record[1])})
          
              # push the order lists into xcom
              for task in tasks:
                  if len(tasks[task]) > 0:
                      logging.info(f'Task {task} has {len(tasks[task])} orders.')
                      context['ti'].xcom_push(key=task, value=tasks[task])
                  else:
                      # if we didn't have enough tasks for every bucket
                      # don't bother running that task - remove it from the list
                      logging.info(f"Task {task} doesn't have any orders.")
                      del(tasks[task])
          
              return list(tasks.keys())
          
          ###################################################################################################
          
          
          # this just makes sure that there aren't any dangling xcom values in the database from a crashed dag
          clean_xcoms = MySqlOperator(
              task_id='clean_xcoms',
              mysql_conn_id='airflow_db',
              sql="delete from xcom where dag_id='{{ dag.dag_id }}'",
              dag=dag)
          
          
          # Ideally we'd use BranchPythonOperator() here instead of PythonOperator so that if our
          # query returns fewer results than we have buckets, we don't try to run them all.
          # Unfortunately I couldn't get BranchPythonOperator to take a list of results like the
          # documentation says it should (Airflow 1.10.2). So we call all the bucket tasks for now.
          get_orders_task = PythonOperator(
                                           task_id='get_orders',
                                           python_callable=getOpenOrders,
                                           provide_context=True,
                                           dag=dag
                                          )
          get_orders_task.set_upstream(clean_xcoms)
          
          # set up the parallel tasks -- these are configured at compile time, not at run time:
          for bucketNumber in range(0, totalBuckets):
              taskBucket = createOrderProcessingTask(bucketNumber)
              taskBucket.set_upstream(get_orders_task)
          
          
          ###################################################################################################
          

          【讨论】:

          • 请注意,似乎有可能作为任务的结果动态创建 subdags,但是,我发现的大多数关于 subdags 的文档强烈建议远离该功能因为在大多数情况下,它导致的问题多于解决的问题。我看到了一些建议,即 subdags 可能会在不久的将来作为内置功能被删除。
          • 另请注意,在我的示例中的for tasks in tasks 循环中,我删除了正在迭代的对象。这是个坏主意。取而代之的是获取键列表并对其进行迭代-或跳过删除。同样,如果 xcom_pull 返回 None (而不是列表或空列表),则 for 循环也会失败。有人可能想在“for”之前运行 xcom_pull,然后检查它是否为 None - 或者确保那里至少有一个空列表。 YMMV。祝你好运!
          • open_order_task 中有什么内容?
          • 你是对的,这是我的例子中的一个错字。它应该是 get_orders_task.set_upstream()。我会解决的。
          • @rotten 您能否详细说明为什么我们不应该使用此功能或在哪里提到它以避免这种情况?我只是在深入研究文档,听起来这种设计模式会积极推荐 subdags?
          【解决方案7】:

          我认为您正在寻找的是动态创建 DAG 几天前我遇到了这种情况,经过一番搜索,我找到了这个blog

          动态任务生成

          start = DummyOperator(
              task_id='start',
              dag=dag
          )
          
          end = DummyOperator(
              task_id='end',
              dag=dag)
          
          def createDynamicETL(task_id, callableFunction, args):
              task = PythonOperator(
                  task_id = task_id,
                  provide_context=True,
                  #Eval is used since the callableFunction var is of type string
                  #while the python_callable argument for PythonOperators only receives objects of type callable not strings.
                  python_callable = eval(callableFunction),
                  op_kwargs = args,
                  xcom_push = True,
                  dag = dag,
              )
              return task
          

          设置 DAG 工作流程

          with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
              # Use safe_load instead to load the YAML file
              configFile = yaml.safe_load(f)
          
              # Extract table names and fields to be processed
              tables = configFile['tables']
          
              # In this loop tasks are created for each table defined in the YAML file
              for table in tables:
                  for table, fieldName in table.items():
                      # In our example, first step in the workflow for each table is to get SQL data from db.
                      # Remember task id is provided in order to exchange data among tasks generated in dynamic way.
                      get_sql_data_task = createDynamicETL('{}-getSQLData'.format(table),
                                                           'getSQLData',
                                                           {'host': 'host', 'user': 'user', 'port': 'port', 'password': 'pass',
                                                            'dbname': configFile['dbname']})
          
                      # Second step is upload data to s3
                      upload_to_s3_task = createDynamicETL('{}-uploadDataToS3'.format(table),
                                                           'uploadDataToS3',
                                                           {'previous_task_id': '{}-getSQLData'.format(table),
                                                            'bucket_name': configFile['bucket_name'],
                                                            'prefix': configFile['prefix']})
          
                      # This is where the magic lies. The idea is that
                      # once tasks are generated they should linked with the
                      # dummy operators generated in the start and end tasks. 
                      # Then you are done!
                      start >> get_sql_data_task
                      get_sql_data_task >> upload_to_s3_task
                      upload_to_s3_task >> end
          

          这是我们的 DAG 将代码放在一起后的样子

          import yaml
          import airflow
          from airflow import DAG
          from datetime import datetime, timedelta, time
          from airflow.operators.python_operator import PythonOperator
          from airflow.operators.dummy_operator import DummyOperator
          
          start = DummyOperator(
              task_id='start',
              dag=dag
          )
          
          
          def createDynamicETL(task_id, callableFunction, args):
              task = PythonOperator(
                  task_id=task_id,
                  provide_context=True,
                  # Eval is used since the callableFunction var is of type string
                  # while the python_callable argument for PythonOperators only receives objects of type callable not strings.
                  python_callable=eval(callableFunction),
                  op_kwargs=args,
                  xcom_push=True,
                  dag=dag,
              )
              return task
          
          
          end = DummyOperator(
              task_id='end',
              dag=dag)
          
          with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
              # use safe_load instead to load the YAML file
              configFile = yaml.safe_load(f)
          
              # Extract table names and fields to be processed
              tables = configFile['tables']
          
              # In this loop tasks are created for each table defined in the YAML file
              for table in tables:
                  for table, fieldName in table.items():
                      # In our example, first step in the workflow for each table is to get SQL data from db.
                      # Remember task id is provided in order to exchange data among tasks generated in dynamic way.
                      get_sql_data_task = createDynamicETL('{}-getSQLData'.format(table),
                                                           'getSQLData',
                                                           {'host': 'host', 'user': 'user', 'port': 'port', 'password': 'pass',
                                                            'dbname': configFile['dbname']})
          
                      # Second step is upload data to s3
                      upload_to_s3_task = createDynamicETL('{}-uploadDataToS3'.format(table),
                                                           'uploadDataToS3',
                                                           {'previous_task_id': '{}-getSQLData'.format(table),
                                                            'bucket_name': configFile['bucket_name'],
                                                            'prefix': configFile['prefix']})
          
                      # This is where the magic lies. The idea is that
                      # once tasks are generated they should linked with the
                      # dummy operators generated in the start and end tasks. 
                      # Then you are done!
                      start >> get_sql_data_task
                      get_sql_data_task >> upload_to_s3_task
                      upload_to_s3_task >> end
          

          非常有帮助 希望它也能帮助别人

          【讨论】:

          • 你自己实现了吗?我累了。但我失败了。
          • 我明白了。我的问题已经解决了。谢谢。我只是没有得到正确的方法来读取 docker 图像中的环境变量。
          • 如果表项可能发生变化,我们不能将它们放在静态 yaml 文件中怎么办?
          • 这真的取决于你在哪里使用它。虽然我会对你的建议感兴趣。 @FrankZhu 应该如何正确完成?
          • 只有在 for 循环的 range 方法中为变量预定义值时,所有这些技术才有效。在这种情况下,tables 变量。如果您需要动态分配此变量(可以说与上一个任务的结果),那么不容易找到稳定的解决方案
          【解决方案8】:

          是的,这是可能的。我创建了一个示例 DAG 来演示这一点。

          import airflow
          from airflow.operators.python_operator import PythonOperator
          import os
          from airflow.models import Variable
          import logging
          from airflow import configuration as conf
          from airflow.models import DagBag, TaskInstance
          from airflow import DAG, settings
          from airflow.operators.bash_operator import BashOperator
          
          main_dag_id = 'DynamicWorkflow2'
          
          args = {
              'owner': 'airflow',
              'start_date': airflow.utils.dates.days_ago(2),
              'provide_context': True
          }
          
          dag = DAG(
              main_dag_id,
              schedule_interval="@once",
              default_args=args)
          
          
          def start(*args, **kwargs):
          
              value = Variable.get("DynamicWorkflow_Group1")
              logging.info("Current DynamicWorkflow_Group1 value is " + str(value))
          
          
          def resetTasksStatus(task_id, execution_date):
              logging.info("Resetting: " + task_id + " " + execution_date)
          
              dag_folder = conf.get('core', 'DAGS_FOLDER')
              dagbag = DagBag(dag_folder)
              check_dag = dagbag.dags[main_dag_id]
              session = settings.Session()
          
              my_task = check_dag.get_task(task_id)
              ti = TaskInstance(my_task, execution_date)
              state = ti.current_state()
              logging.info("Current state of " + task_id + " is " + str(state))
              ti.set_state(None, session)
              state = ti.current_state()
              logging.info("Updated state of " + task_id + " is " + str(state))
          
          
          def bridge1(*args, **kwargs):
          
              # You can set this value dynamically e.g., from a database or a calculation
              dynamicValue = 2
          
              variableValue = Variable.get("DynamicWorkflow_Group2")
              logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))
          
              logging.info("Setting the Airflow Variable DynamicWorkflow_Group2 to " + str(dynamicValue))
              os.system('airflow variables --set DynamicWorkflow_Group2 ' + str(dynamicValue))
          
              variableValue = Variable.get("DynamicWorkflow_Group2")
              logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))
          
              # Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460
              for i in range(dynamicValue):
                  resetTasksStatus('secondGroup_' + str(i), str(kwargs['execution_date']))
          
          
          def bridge2(*args, **kwargs):
          
              # You can set this value dynamically e.g., from a database or a calculation
              dynamicValue = 3
          
              variableValue = Variable.get("DynamicWorkflow_Group3")
              logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue))
          
              logging.info("Setting the Airflow Variable DynamicWorkflow_Group3 to " + str(dynamicValue))
              os.system('airflow variables --set DynamicWorkflow_Group3 ' + str(dynamicValue))
          
              variableValue = Variable.get("DynamicWorkflow_Group3")
              logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue))
          
              # Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460
              for i in range(dynamicValue):
                  resetTasksStatus('thirdGroup_' + str(i), str(kwargs['execution_date']))
          
          
          def end(*args, **kwargs):
              logging.info("Ending")
          
          
          def doSomeWork(name, index, *args, **kwargs):
              # Do whatever work you need to do
              # Here I will just create a new file
              os.system('touch /home/ec2-user/airflow/' + str(name) + str(index) + '.txt')
          
          
          starting_task = PythonOperator(
              task_id='start',
              dag=dag,
              provide_context=True,
              python_callable=start,
              op_args=[])
          
          # Used to connect the stream in the event that the range is zero
          bridge1_task = PythonOperator(
              task_id='bridge1',
              dag=dag,
              provide_context=True,
              python_callable=bridge1,
              op_args=[])
          
          DynamicWorkflow_Group1 = Variable.get("DynamicWorkflow_Group1")
          logging.info("The current DynamicWorkflow_Group1 value is " + str(DynamicWorkflow_Group1))
          
          for index in range(int(DynamicWorkflow_Group1)):
              dynamicTask = PythonOperator(
                  task_id='firstGroup_' + str(index),
                  dag=dag,
                  provide_context=True,
                  python_callable=doSomeWork,
                  op_args=['firstGroup', index])
          
              starting_task.set_downstream(dynamicTask)
              dynamicTask.set_downstream(bridge1_task)
          
          # Used to connect the stream in the event that the range is zero
          bridge2_task = PythonOperator(
              task_id='bridge2',
              dag=dag,
              provide_context=True,
              python_callable=bridge2,
              op_args=[])
          
          DynamicWorkflow_Group2 = Variable.get("DynamicWorkflow_Group2")
          logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group2))
          
          for index in range(int(DynamicWorkflow_Group2)):
              dynamicTask = PythonOperator(
                  task_id='secondGroup_' + str(index),
                  dag=dag,
                  provide_context=True,
                  python_callable=doSomeWork,
                  op_args=['secondGroup', index])
          
              bridge1_task.set_downstream(dynamicTask)
              dynamicTask.set_downstream(bridge2_task)
          
          ending_task = PythonOperator(
              task_id='end',
              dag=dag,
              provide_context=True,
              python_callable=end,
              op_args=[])
          
          DynamicWorkflow_Group3 = Variable.get("DynamicWorkflow_Group3")
          logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group3))
          
          for index in range(int(DynamicWorkflow_Group3)):
          
              # You can make this logic anything you'd like
              # I chose to use the PythonOperator for all tasks
              # except the last task will use the BashOperator
              if index < (int(DynamicWorkflow_Group3) - 1):
                  dynamicTask = PythonOperator(
                      task_id='thirdGroup_' + str(index),
                      dag=dag,
                      provide_context=True,
                      python_callable=doSomeWork,
                      op_args=['thirdGroup', index])
              else:
                  dynamicTask = BashOperator(
                      task_id='thirdGroup_' + str(index),
                      bash_command='touch /home/ec2-user/airflow/thirdGroup_' + str(index) + '.txt',
                      dag=dag)
          
              bridge2_task.set_downstream(dynamicTask)
              dynamicTask.set_downstream(ending_task)
          
          # If you do not connect these then in the event that your range is ever zero you will have a disconnection between your stream
          # and your tasks will run simultaneously instead of in your desired stream order.
          starting_task.set_downstream(bridge1_task)
          bridge1_task.set_downstream(bridge2_task)
          bridge2_task.set_downstream(ending_task)
          

          在运行 DAG 之前,请创建这三个 Airflow 变量

          airflow variables --set DynamicWorkflow_Group1 1
          
          airflow variables --set DynamicWorkflow_Group2 0
          
          airflow variables --set DynamicWorkflow_Group3 0
          

          你会看到 DAG 从这里开始

          运行后到此

          您可以在我关于创建Dynamic Workflows On Airflow 的文章中查看有关此 DAG 的更多信息。

          【讨论】:

          • 但是如果你有多个这个 DAG 的 DagRun 会发生什么。它们是否都共享相同的变量?
          • 是的,他们会使用相同的变量;我在最后的文章中谈到了这一点。您需要动态创建变量并在变量名中使用 dag 运行 ID。我的示例很简单,只是为了演示动态可能性,但您需要使其具有生产质量:)
          • 创建动态任务时是否需要桥梁?一会儿会完整阅读您的文章,但想问一下。我现在正在努力创建一个基于上游任务的动态任务,并且开始弄清楚我哪里出错了。我当前的问题是,由于某种原因,我无法让 DAG 同步到 DAG-Bag。当我在模块中使用静态列表时,我的 DAG 同步,但当我将该静态列表切换为从上游任务构建时停止。
          • @jvans 感谢它很聪明,但可能不是生产质量
          • 好主意!我发现该框架很有用,但我从您的评论中受益,凯尔。因此,当我需要根据本地未保存的信息动态创建任务时,我首先使用运算符从(在我的情况下)S3 获取该信息并设置一个 Airflow 变量。然后我可以使用该变量来设置动态 dags,并且如果远程存储发生更改,仍然依赖它进行更新。这非常好,因为它消除了每次调度程序刷新 DAG 列表时运行更复杂的顶级代码的开销。感谢您在这里的有用讨论!
          【解决方案9】:

          我想我在https://github.com/mastak/airflow_multi_dagrun 找到了一个更好的解决方案,它通过触发多个 dagruns 使用简单的 DagRuns 排队,类似于TriggerDagRuns。大部分功劳归于https://github.com/mastak,尽管我不得不修补some details 以使其与最新的气流配合使用。

          解决方案使用custom operator that triggers several DagRuns:

          from airflow import settings
          from airflow.models import DagBag
          from airflow.operators.dagrun_operator import DagRunOrder, TriggerDagRunOperator
          from airflow.utils.decorators import apply_defaults
          from airflow.utils.state import State
          from airflow.utils import timezone
          
          
          class TriggerMultiDagRunOperator(TriggerDagRunOperator):
              CREATED_DAGRUN_KEY = 'created_dagrun_key'
          
              @apply_defaults
              def __init__(self, op_args=None, op_kwargs=None,
                           *args, **kwargs):
                  super(TriggerMultiDagRunOperator, self).__init__(*args, **kwargs)
                  self.op_args = op_args or []
                  self.op_kwargs = op_kwargs or {}
          
              def execute(self, context):
          
                  context.update(self.op_kwargs)
                  session = settings.Session()
                  created_dr_ids = []
                  for dro in self.python_callable(*self.op_args, **context):
                      if not dro:
                          break
                      if not isinstance(dro, DagRunOrder):
                          dro = DagRunOrder(payload=dro)
          
                      now = timezone.utcnow()
                      if dro.run_id is None:
                          dro.run_id = 'trig__' + now.isoformat()
          
                      dbag = DagBag(settings.DAGS_FOLDER)
                      trigger_dag = dbag.get_dag(self.trigger_dag_id)
                      dr = trigger_dag.create_dagrun(
                          run_id=dro.run_id,
                          execution_date=now,
                          state=State.RUNNING,
                          conf=dro.payload,
                          external_trigger=True,
                      )
                      created_dr_ids.append(dr.id)
                      self.log.info("Created DagRun %s, %s", dr, now)
          
                  if created_dr_ids:
                      session.commit()
                      context['ti'].xcom_push(self.CREATED_DAGRUN_KEY, created_dr_ids)
                  else:
                      self.log.info("No DagRun created")
                  session.close()
          

          然后,您可以从 PythonOperator 中的可调用函数提交几个 dagrun,例如:

          from airflow.operators.dagrun_operator import DagRunOrder
          from airflow.models import DAG
          from airflow.operators import TriggerMultiDagRunOperator
          from airflow.utils.dates import days_ago
          
          
          def generate_dag_run(**kwargs):
              for i in range(10):
                  order = DagRunOrder(payload={'my_variable': i})
                  yield order
          
          args = {
              'start_date': days_ago(1),
              'owner': 'airflow',
          }
          
          dag = DAG(
              dag_id='simple_trigger',
              max_active_runs=1,
              schedule_interval='@hourly',
              default_args=args,
          )
          
          gen_target_dag_run = TriggerMultiDagRunOperator(
              task_id='gen_target_dag_run',
              dag=dag,
              trigger_dag_id='common_target',
              python_callable=generate_dag_run
          )
          

          我用https://github.com/flinz/airflow_multi_dagrun的代码创建了一个fork

          【讨论】:

            【解决方案10】:

            OA:“在 Airflow 中是否有任何方法可以创建一个工作流,使得任务 B.* 的数量在任务 A 完成之前是未知的?”

            简短的回答是否定的。 Airflow 将在开始运行之前构建 DAG 流。

            也就是说我们得出了一个简单的结论,那就是我们没有这种需要。 当您想并行处理某些工作时,您应该评估可用的资源,而不是要处理的项目数。

            我们是这样做的:我们动态生成固定数量的任务,比如 10 个,这将拆分工作。例如,如果我们需要处理 100 个文件,每个任务将处理其中的 10 个。我将在今天晚些时候发布代码。

            更新

            这是代码,抱歉耽搁了。

            from datetime import datetime, timedelta
            
            import airflow
            from airflow.operators.dummy_operator import DummyOperator
            
            args = {
                'owner': 'airflow',
                'depends_on_past': False,
                'start_date': datetime(2018, 1, 8),
                'email': ['myemail@gmail.com'],
                'email_on_failure': True,
                'email_on_retry': True,
                'retries': 1,
                'retry_delay': timedelta(seconds=5)
            }
            
            dag = airflow.DAG(
                'parallel_tasks_v1',
                schedule_interval="@daily",
                catchup=False,
                default_args=args)
            
            # You can read this from variables
            parallel_tasks_total_number = 10
            
            start_task = DummyOperator(
                task_id='start_task',
                dag=dag
            )
            
            
            # Creates the tasks dynamically.
            # Each one will elaborate one chunk of data.
            def create_dynamic_task(current_task_number):
                return DummyOperator(
                    provide_context=True,
                    task_id='parallel_task_' + str(current_task_number),
                    python_callable=parallelTask,
                    # your task will take as input the total number and the current number to elaborate a chunk of total elements
                    op_args=[current_task_number, int(parallel_tasks_total_number)],
                    dag=dag)
            
            
            end = DummyOperator(
                task_id='end',
                dag=dag)
            
            for page in range(int(parallel_tasks_total_number)):
                created_task = create_dynamic_task(page)
                start_task >> created_task
                created_task >> end
            

            代码说明:

            这里我们有一个单一的开始任务和一个单一的结束任务(都是虚拟的)。

            然后从带有 for 循环的 start 任务开始,我们使用相同的 python 可调用对象创建 10 个任务。任务在函数 create_dynamic_task 中创建。

            我们将并行任务总数和当前任务索引作为参数传递给每个 python 可调用对象。

            假设您有 1000 个项目要详细说明:第一个任务将收到输入,它应该详细说明 10 个块中的第一个块。它将 1000 个项目分成 10 个块并详细说明第一个。

            【讨论】:

            • 这是一个很好的解决方案,只要您不需要每个项目的特定任务(如进度、结果、成功/失败、重试等)
            • @Ena parallelTask 未定义:我错过了什么吗?
            • @AnthonyKeane 这是你应该调用的python函数来实际做一些事情。正如代码中所注释的那样,它将把总数和当前数字作为输入来详细说明一大块总元素。
            • 我很好奇这会同时执行 10 次开始和结束任务吗?
            • 不,不会的。它将简单地创建 10 个名为 parallel_task_1、parallel_task_2... 的任务,这些任务将在启动任务之后并行执行
            【解决方案11】:

            我已经找到了一种基于先前任务的结果创建工作流的方法。
            基本上你想要做的是有两个 subdags:

            1. Xcom 在首先执行的 subdag 中推送一个列表(或稍后创建动态工作流所需的任何内容)(参见 test1.py def return_list()
            2. 将主 dag 对象作为参数传递给您的第二个子 dag
            3. 现在,如果您有主 dag 对象,您可以使用它来获取其任务实例的列表。从该任务实例列表中,您可以使用parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1] 过滤掉当前运行的任务,您可以在此处添加更多过滤器。
            4. 使用该任务实例,您可以使用 xcom pull 通过将 dag_id 指定为第一个 subdag 来获取所需的值:dag_id='%s.%s' % (parent_dag_name, 'test1')
            5. 使用列表/值动态创建任务

            现在我已经在本地气流安装中对此进行了测试,并且效果很好。如果同时运行多个 dag 实例,我不知道 xcom pull 部分是否会出现任何问题,但是您可能会使用唯一键或类似的东西来唯一标识 xcom你想要的价值。 可以优化 3. 步骤以 100% 确保获得当前主要 dag 的特定任务,但对于我的使用而言,这表现得足够好,我认为只需要一个 task_instance 对象即可使用 xcom_pull。

            我还在每次执行前清理第一个 subdag 的 xcoms,以确保我不会意外得到任何错误的值。

            我不太会解释,所以我希望下面的代码能让一切清楚:

            test1.py

            from airflow.models import DAG
            import logging
            from airflow.operators.python_operator import PythonOperator
            from airflow.operators.postgres_operator import PostgresOperator
            
            log = logging.getLogger(__name__)
            
            
            def test1(parent_dag_name, start_date, schedule_interval):
                dag = DAG(
                    '%s.test1' % parent_dag_name,
                    schedule_interval=schedule_interval,
                    start_date=start_date,
                )
            
                def return_list():
                    return ['test1', 'test2']
            
                list_extract_folder = PythonOperator(
                    task_id='list',
                    dag=dag,
                    python_callable=return_list
                )
            
                clean_xcoms = PostgresOperator(
                    task_id='clean_xcoms',
                    postgres_conn_id='airflow_db',
                    sql="delete from xcom where dag_id='{{ dag.dag_id }}'",
                    dag=dag)
            
                clean_xcoms >> list_extract_folder
            
                return dag
            

            test2.py

            from airflow.models import DAG, settings
            import logging
            from airflow.operators.dummy_operator import DummyOperator
            
            log = logging.getLogger(__name__)
            
            
            def test2(parent_dag_name, start_date, schedule_interval, parent_dag=None):
                dag = DAG(
                    '%s.test2' % parent_dag_name,
                    schedule_interval=schedule_interval,
                    start_date=start_date
                )
            
                if len(parent_dag.get_active_runs()) > 0:
                    test_list = parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1].xcom_pull(
                        dag_id='%s.%s' % (parent_dag_name, 'test1'),
                        task_ids='list')
                    if test_list:
                        for i in test_list:
                            test = DummyOperator(
                                task_id=i,
                                dag=dag
                            )
            
                return dag
            

            和主要工作流程:

            test.py

            from datetime import datetime
            from airflow import DAG
            from airflow.operators.subdag_operator import SubDagOperator
            from subdags.test1 import test1
            from subdags.test2 import test2
            
            DAG_NAME = 'test-dag'
            
            dag = DAG(DAG_NAME,
                      description='Test workflow',
                      catchup=False,
                      schedule_interval='0 0 * * *',
                      start_date=datetime(2018, 8, 24))
            
            test1 = SubDagOperator(
                subdag=test1(DAG_NAME,
                             dag.start_date,
                             dag.schedule_interval),
                task_id='test1',
                dag=dag
            )
            
            test2 = SubDagOperator(
                subdag=test2(DAG_NAME,
                             dag.start_date,
                             dag.schedule_interval,
                             parent_dag=dag),
                task_id='test2',
                dag=dag
            )
            
            test1 >> test2
            

            【讨论】:

            • 在 Airflow 1.9 上,这些在添加到 DAG 文件夹时没有加载,我错过了什么?
            • @AnthonyKeane 你把 test1.py 和 test2.py 放到你的 dag 文件夹中的 subdags 文件夹了吗?
            • 我做到了。将这两个文件复制到 subdags 并将 test.py 放在 dag 文件夹中,仍然出现此错误。损坏的 DAG:[/home/airflow/gcs/dags/test.py] 没有名为 subdags.test1 的模块注意我正在使用 Google Cloud Composer(Google 的托管 Airflow 1.9.0)
            • 嗨@Christopher Beck 我发现我的错误我需要将_ _init_ _.py 添加到subdags 文件夹。菜鸟错误
            • 为什么这些需要放在单独的文件中?这是必要的还是可以在一个文件中创建相同的 DAG?
            【解决方案12】:

            我发现这个Medium post 与这个问题非常相似。但是它充满了拼写错误,当我尝试实现它时不起作用。

            我对上面的回答如下:

            如果您要动态创建任务,则必须通过迭代不是由上游任务创建的内容,或者可以独立于该任务定义的内容。我了解到您不能通过正如许多其他人之前指出的那样,执行日期或其他气流变量到模板之外的东西(例如,任务)。另见this post

            【讨论】:

            • 如果你看看我的评论,你会发现实际上可以根据上游任务的结果创建任务。
            【解决方案13】:

            这是我在没有任何子标签的情况下使用类似请求的方法:

            首先创建一个返回任何你想要的值的方法

            def values_function():
                 return values
            

            下一个创建动态生成作业的方法:

            def group(number, **kwargs):
                    #load the values if needed in the command you plan to execute
                    dyn_value = "{{ task_instance.xcom_pull(task_ids='push_func') }}"
                    return BashOperator(
                            task_id='JOB_NAME_{}'.format(number),
                            bash_command='script.sh {} {}'.format(dyn_value, number),
                            dag=dag)
            

            然后将它们组合起来:

            push_func = PythonOperator(
                    task_id='push_func',
                    provide_context=True,
                    python_callable=values_function,
                    dag=dag)
            
            complete = DummyOperator(
                    task_id='All_jobs_completed',
                    dag=dag)
            
            for i in values_function():
                    push_func >> group(i) >> complete
            

            【讨论】:

            • 值在哪里定义?
            • 而不是for i in values_function(),我希望得到类似for i in push_func_output的东西。问题是我找不到动态获取该输出的方法。 PythonOperator 的输出在执行后会在 Xcom 中,但我不知道我是否可以从 DAG 定义中引用它。
            • @eldos 在下面看到我的回答
            • 如果我们必须在循环中执行一系列与步骤相关的步骤怎么办? group 函数中是否存在第二个依赖链?
            • 在我的values_function 我有:id_list = kwargs['dag_run'].conf.get('param_id_list') 然后返回 id_list。它将在Broken DAG: [my_dag.py] 'dag_run' 中出现错误。但是,如果我像id_list = [1,2,3] 那样对其进行硬编码,那就没问题了。我可以从参数值中设置id_list 吗?
            猜你喜欢
            • 1970-01-01
            • 2015-03-25
            • 2023-02-03
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 2018-07-10
            • 2015-05-22
            • 2014-03-17
            相关资源
            最近更新 更多