【问题标题】:Airflow - xcom_pull in the bigquery operatorAirflow - bigquery 运算符中的 xcom_pull
【发布时间】:2020-06-02 02:15:25
【问题描述】:

我尝试使用xcomm_pull 插入由python_operator 计算的data_key_param 并将其传递给bigquery_operator。 python 运算符将输出作为字符串返回,例如“2020-05-31”。

运行 BigqueryOperator 时出现错误:“依赖项阻止任务从获取计划” - 无法转换文字“{xcom_pull(task_ids[\'set_date_key_param\'])[0] }”

任务执行后从Airflow GUI返回的sql属性值:
选择 DATE_KEY,计数 (*) 作为 COUNT
FROM my-project.my_datasets.source_table
WHERE DATE_KEY = {{ task_instance.xcom_pull(task_ids='set_date_key_param') }}
GROUP BY DATE_KEY

下面的代码(我已经尝试使用'{{'和'}}'来包围task_instance.xcom...):

def set_date_key_param():

    # a business logic here
    return "2020-05-31" # example results

# task 1

set_date_key_param = PythonOperator(
    task_id='set_date_key_param',
    provide_context=True,
    python_callable=set_date_key_param,
    dag=dag
)

# taks 2

load_data_to_bq_table = BigQueryOperator(
    task_id='load_data_to_bq_table',
    sql="""SELECT DATE_KEY, count(*) as COUNT
    FROM `{project}.{dataset}.source_table` 
    WHERE DATE_KEY = {{{{ task_instance.xcom_pull(task_ids='set_date_key_param') }}}}
    GROUP BY DATE_KEY""".format(
        project=PROJECT_ID,
        env=ENV
),
use_legacy_sql=False,
destination_dataset_table="{project}.{dataset}.target_table".format(
    project=PROJECT_ID,
    dataset=BQ_TARGET_DATASET,
),
write_disposition="WRITE_TRUNCATE",
create_disposition="CREATE_NEVER",
trigger_rule='all_success',
dag=dag

)

set_date_key_param >> load_data_to_bq_table

【问题讨论】:

  • 你能试试\"{{ task_instance.xcom_pull(task_ids='set_date_key_param') }}\" 和@SergiyKolesnikov 说了什么吗?
  • \"{{ task_instance.xcom_pull(task_ids='set_date_key_param') }}\" - 不,不起作用:(
  • 你好@TomaszKubat,请介意分享代码。我也面临着类似的问题。我正在使用 python 运算符,我在其中调用上下文为 TRUE 的大查询运算符。还是不行。

标签: airflow


【解决方案1】:

我认为字符串格式和 jinja 模板相互冲突。

在您使用 xcom 的用例中,我认为使用 jinja 模板是有意义的。

load_data_to_bq_table = BigQueryOperator(
    task_id='load_data_to_bq_table',
    sql="""SELECT DATE_KEY, count(*) as COUNT
        FROM `{{ params.project }}.{{ params.dataset }}.source_table` 
        WHERE DATE_KEY = \"{{ task_instance.xcom_pull(task_ids='set_date_key_param') }}\"
        GROUP BY DATE_KEY""",
    params={
        'project': PROJECT_ID,
        'env': ENV   # env or dataset??, match this name to the params key in sql
    }
)

【讨论】:

  • 是否应该将参数作为 Python 字典传递,并用“:”分隔键值?像“项目”:PROJECT_ID?我遇到了“=”的语法错误?
  • 或者你的意思是'query_params'而不是'params'?
  • 但你是对的!当我排除字符串格式并仅保留 jinja 模板时,参数替换有效!
  • params 没用? params 应该是 jinja 模板 afaik 的关键。
  • 似乎在将 sql_params 传递给 bigquery_operator 时存在 Airflow 1.10.2 已知错误:stackoverflow.com/questions/56287061/… 我已经更改了 python_operator 的 bigquery_operator。
【解决方案2】:

您将 Python 可调用对象和保存第一个 Python 运算符的变量命名为相同:set_date_key_param。重命名 Python 可调用对象(例如 set_date)并相应地更改 Python 运算符的参数。

【讨论】:

  • 代码如上?保留任务名称并将函数/调用名称更改为 set_date?不起作用。
猜你喜欢
  • 2020-11-08
  • 2021-02-10
  • 2020-12-29
  • 2019-07-07
  • 2021-03-17
  • 2022-11-17
  • 2019-12-02
  • 2021-02-26
  • 2017-12-26
相关资源
最近更新 更多