【发布时间】:2020-06-02 02:15:25
【问题描述】:
我尝试使用xcomm_pull 插入由python_operator 计算的data_key_param 并将其传递给bigquery_operator。 python 运算符将输出作为字符串返回,例如“2020-05-31”。
运行 BigqueryOperator 时出现错误:“依赖项阻止任务从获取计划” - 无法转换文字“{xcom_pull(task_ids[\'set_date_key_param\'])[0] }”
任务执行后从Airflow GUI返回的sql属性值:
选择 DATE_KEY,计数 (*) 作为 COUNT
FROM my-project.my_datasets.source_table
WHERE DATE_KEY = {{ task_instance.xcom_pull(task_ids='set_date_key_param') }}
GROUP BY DATE_KEY
下面的代码(我已经尝试使用'{{'和'}}'来包围task_instance.xcom...):
def set_date_key_param():
# a business logic here
return "2020-05-31" # example results
# task 1
set_date_key_param = PythonOperator(
task_id='set_date_key_param',
provide_context=True,
python_callable=set_date_key_param,
dag=dag
)
# taks 2
load_data_to_bq_table = BigQueryOperator(
task_id='load_data_to_bq_table',
sql="""SELECT DATE_KEY, count(*) as COUNT
FROM `{project}.{dataset}.source_table`
WHERE DATE_KEY = {{{{ task_instance.xcom_pull(task_ids='set_date_key_param') }}}}
GROUP BY DATE_KEY""".format(
project=PROJECT_ID,
env=ENV
),
use_legacy_sql=False,
destination_dataset_table="{project}.{dataset}.target_table".format(
project=PROJECT_ID,
dataset=BQ_TARGET_DATASET,
),
write_disposition="WRITE_TRUNCATE",
create_disposition="CREATE_NEVER",
trigger_rule='all_success',
dag=dag
)
set_date_key_param >> load_data_to_bq_table
【问题讨论】:
-
你能试试
\"{{ task_instance.xcom_pull(task_ids='set_date_key_param') }}\"和@SergiyKolesnikov 说了什么吗? -
\"{{ task_instance.xcom_pull(task_ids='set_date_key_param') }}\" - 不,不起作用:(
-
你好@TomaszKubat,请介意分享代码。我也面临着类似的问题。我正在使用 python 运算符,我在其中调用上下文为 TRUE 的大查询运算符。还是不行。
标签: airflow