【问题标题】:Custom Operator XCom during run time in Airflow在 Airflow 运行期间自定义 Operator XCom
【发布时间】:2021-01-29 22:33:00
【问题描述】:

我知道PythonOperator/BashOperator我们可以使用Xcom进行交流。

例如

def func(**context):
    context['task_instance'].xcom_pull()

但是,我想知道如何在运行时为自定义运算符访问 xcom

我的操作员是这样的:

class ECHOXOperator(BaseOperator):
    @apply_defaults
    def __init__(self, x, *args, **kwargs):
        self.x = x
        super(ECHOXOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        print(self.x)

所以在我的 DAG 中:

我可以的

task2 = ECHOXOperator(x = 'Hello")

而且效果很好。但是如何从上游任务访问 x?

类似:

def task1(**context):
    task_instance = context['task_instance']
    task_instance.xcom_push(key="x", value="Hello")

generate_data = PythonOperator(
    task_id="task1",
    python_callable=task1,
    dag=dag,
)

task2 = ECHOXOperator(x = task_instance.xcom_pull('task1', 'x'), provide_context=True)

task1 >> task2

这不起作用,因为 ECHOXOperator 中的 task_instance 未定义。

谢谢

【问题讨论】:

    标签: airflow


    【解决方案1】:

    您应该在自定义运算符中将 x 作为 templated_fields 传递。

    class ECHOXOperator(BaseOperator):
        template_fields = ['x']
    
        @apply_defaults
        def __init__(self, x, *args, **kwargs):
            self.x = x
            super(ECHOXOperator, self).__init__(*args, **kwargs)
    
        def execute(self, context):
            print(self.x)
    

    现在您可以执行以下操作来获取上一个任务中传递的 Xcom 的值:

    def task1(**context):
        task_instance = context['task_instance']
        task_instance.xcom_push(key="x", value="Hello")
    
    generate_data = PythonOperator(
        task_id="task1",
        python_callable=task1,
        dag=dag,
    )
    
    task2 = ECHOXOperator(x = "{{ ti.xcom_pull('task1', 'x') }}")
    
    task1 >> task2
    

    关于 templated_fields 和 Jinja 模板的更多信息:https://airflow.readthedocs.io/en/latest/concepts.html#id1

    【讨论】:

    • 谢谢。如果我想传递其他数据类型怎么办?例如,listdict。我应该使用pickle吗?
    • 您也可以传递其他数据类型,它们将被递归呈现。例如,您可以传递类似 task2 = ECHOXOperator(x = ["{{ ti.xcom_pull('task1', 'x') }}", "hello", "hi"]) 的内容
    • 阅读第一行,它起作用了。我希望我能在第一天降落在这里。非常感谢
    猜你喜欢
    • 1970-01-01
    • 2019-05-18
    • 2019-01-22
    • 1970-01-01
    • 1970-01-01
    • 2021-04-04
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多