【问题标题】:Airflow parameters to Postgres OperatorPostgres Operator 的气流参数
【发布时间】:2019-12-16 23:23:04
【问题描述】:

我正在尝试将执行日期作为运行时参数传递给 postgres 运算符

class MyPostgresOperator(PostgresOperator):
    template_fields = ('sql','parameters')

task = MyPostgresOperator(
  task_id='test_date',
  postgres_conn_id='redshift',
  sql="test_file.sql",
  parameters={'crunch_date':'{{ ds }}'},
  dag=dag
)

然后我尝试在sql查询中使用这个参数来接受dag传递的值

select 
{{ crunch_date }} as test1,

dag 正确发送参数,但是查询只是采用空值而不是传递的执行日期。有没有办法让带有 redshift 的 postgresql 接受这个参数的正确值?

【问题讨论】:

  • 它的红移@a_horse_with_no_name
  • 去掉{{ds}}周围的空格
  • @psychoCoder 我试过了,但它不起作用。虽然 dag 正确发送参数,但我只有 sql 文件无法识别参数的问题

标签: amazon-redshift airflow


【解决方案1】:

您必须按如下方式更新您的 sql 查询:

select 
{{ ds }} as test1,

您将无法在另一个中使用一个模板化字段。如果要在任务中传递参数并在 Jinja 模板中使用它,请使用 params 参数。

更新

但请注意params 不是模板字段。如果你模板化它,它不会渲染,因为嵌套模板不起作用。

task = MyPostgresOperator(
  task_id='test_date',
  postgres_conn_id='redshift',
  sql="test_file.sql",
  params={'textstring':'abc'},
  dag=dag
)

test_file.sql 在哪里:

select 
{{ params.textstring }} as test1,

查看https://medium.com/datareply/airflow-lesser-known-tips-tricks-and-best-practises-cf4d4a90f8f 中的第 4 点以了解有关params 的更多信息。

【讨论】:

  • 我不确定你是否理解了我的问题。您给出的解决方案已经是我尝试过但失败的方法。我再举一个简单的例子: task = MyPostgresOperator( task_id='test_date', postgres_conn_id='redshift', sql="test_file.sql", parameters={'textstring':'abc'}, dag=dag ) 这里的 textstring 是参数名称和abc是它的值现在我想在sql文件中获取参数值:select {{ textstring }} as test1 但是textstring只是null
  • 非常感谢,你是救世主。这完全符合预期。
  • 这里有一个问题,该解决方案在传递字符串值时完美运行,但不适用于 Airflow ds 参数 sql='ETL models/test_file.sql', params={'crunchdate':'{ { ds }}'},在测试文件中:'{{ params.crunchdate }}' as test1 它给出一个错误:psycopg2.DataError: invalid input syntax for type timestamp: "{{ ds }}"
【解决方案2】:

您可以在查询字符串中使用气流宏 - 需要将其传递给 redshift。

例子:

PostgresOperator(task_id="run_on_redshift",
                 dag=dag,
                 postgres_conn_id=REDSHIFT_CONN_ID,
                 sql="""
                        UNLOAD ('select * from abc.xyz') TO 's3://path/{{ds}}/' iam_role 's3_iam_role' DELIMITER AS '^' ALLOWOVERWRITE addquotes ESCAPE HEADER parallel off;
                     """
                 )

【讨论】:

  • 我知道我可以在 sql 查询中使用宏。但我想做的不是在 dag 中编写整个 SQL 查询,而是只在 dag 中调用 sql 文件,然后将 dag 中的参数传递给这个 sql 文件
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2020-05-02
  • 1970-01-01
  • 2018-04-25
  • 2011-02-17
  • 2019-07-10
  • 2022-12-10
  • 1970-01-01
相关资源
最近更新 更多