【发布时间】:2021-01-29 22:33:00
【问题描述】:
我知道PythonOperator/BashOperator我们可以使用Xcom进行交流。
例如
def func(**context):
context['task_instance'].xcom_pull()
但是,我想知道如何在运行时为自定义运算符访问 xcom。
我的操作员是这样的:
class ECHOXOperator(BaseOperator):
@apply_defaults
def __init__(self, x, *args, **kwargs):
self.x = x
super(ECHOXOperator, self).__init__(*args, **kwargs)
def execute(self, context):
print(self.x)
所以在我的 DAG 中:
我可以的
task2 = ECHOXOperator(x = 'Hello")
而且效果很好。但是如何从上游任务访问 x?
类似:
def task1(**context):
task_instance = context['task_instance']
task_instance.xcom_push(key="x", value="Hello")
generate_data = PythonOperator(
task_id="task1",
python_callable=task1,
dag=dag,
)
task2 = ECHOXOperator(x = task_instance.xcom_pull('task1', 'x'), provide_context=True)
task1 >> task2
这不起作用,因为 ECHOXOperator 中的 task_instance 未定义。
谢谢
【问题讨论】:
标签: airflow