【问题标题】:Apache Airflow How to xcom_pull() value into a DAG?Apache Airflow 如何将 xcom_pull() 值转换为 DAG?
【发布时间】:2017-12-26 09:07:22
【问题描述】:

我有一个自定义运算符,它按如下方式推送 XCOM 值:

...
task_instance = context['task_instance']
task_instance.xcom_push("list_of_files",file_list)
...

它工作正常。我有一个 dag 定义文件(my_dag.py),我在其中使用自己的运算符创建任务,它推送 XCOM 值,然后我想通过使用这个 xcom 值在循环中执行。怎么拉?

【问题讨论】:

    标签: python airflow directed-acyclic-graphs


    【解决方案1】:

    您无法访问 dag 中的 XCOM 变量,它只能在操作符中使用,方法是向操作符构造函数提供 provide_context=True 参数。

    如果您想在 DAG 结构本身中使用运算符的数据,则需要执行运算符在运算符之外执行的实际任务。

    def get_file_list():
        hook = SomeHook()
        hook.run('something to get file list')
    
    dag = DAG('tutorial', default_args=default_args)
    
    for file in get_file_list():
        task = SomeOperator(params={'file': file}) # Do something with the file passed as a parameter
    

    【讨论】:

      【解决方案2】:

      从 dag 本身而不是从 dag 中的任务访问 xcom 通常是不好的做法。也就是说,有时这是必要的。例如,在动态创建 dag 时可能需要这样做。

      这是我在 dag 中提取一些未运行的作业的示例。我在 subdag 的上下文中使用它,所以我可以放心,假设方法正在运行,xcom 将始终包含信息。

          xcom_unrun_jobs = None
          if len(parent_dag.get_active_runs()) > 0:
              tis = parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1]
              xcom_unrun_jobs = tis.xcom_pull(dag_id=parent_dag._dag_id, task_ids=unrun_job_task_id)
      

      【讨论】:

      • 为什么这是不好的做法?我有一个想法,但解释清楚会对我有帮助!
      猜你喜欢
      • 1970-01-01
      • 2018-05-13
      • 1970-01-01
      • 2018-07-01
      • 1970-01-01
      • 1970-01-01
      • 2016-10-30
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多