【发布时间】:2021-11-10 23:24:28
【问题描述】:
我有一个气流 dag,我想要做的是读取存储在气流 UI 中的变量(用户名和密码),并将这些变量值作为导出值传递到操作系统中。原因是因为我使用的是 dbt yml 文件,它需要我读取环境变量 'dbt_user'。 (唯一的另一种方法是在不安全的yaml文件中设置密码。
default:
target: dev
outputs:
dev:
type: snowflake
account: xxxx
user: "{{ env_var('dbt_user') }}"
我尝试编写一个 dag 来执行 bashoperator 导出,但它似乎没有设置环境变量。
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime
from airflow.models import Variable
import os
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020,8,1),
'retries': 0
}
with DAG('sample', default_args=default_args, schedule_interval='@once') as dag:
task_1 = BashOperator(
task_id='get_variables',
bash_command='export dbt_user={{ var.value.dbt_user }} ',
env = os.environ.copy(),
dag=dag
)
task_2 = BashOperator(
task_id='load_seed_data_once',
bash_command='echo $dbt_user',
dag=dag
)
task_1 >> task_2
当我尝试回显时,我们看不到任何设置。有谁知道如何使用 bashoperator 设置环境变量?
[2021-11-04 12:00:34,452] {subprocess.py:63} INFO - Running command: ['bash', '-c', 'echo $dbt_user']
[2021-11-04 12:00:34,463] {subprocess.py:74} INFO - Output:
[2021-11-04 12:00:34,464] {subprocess.py:78} INFO -
[2021-11-04 12:00:34,465] {subprocess.py:82} INFO - Command exited with return code 0
[2021-11-04 12:00:34,494] {taskinstance.py:1211} INFO - Marking task as SUCCESS. dag_id=sample, task_id=load_seed_data_once, execution_date=20211104T120032, start_date=20211104T120034, end_date=20211104T120034
[2021-11-04 12:00:34,517] {taskinstance.py:1265} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-11-04 12:00:34,555] {local_task_job.py:149} INFO - Task exited with return code 0
更新:
我也尝试过通过 python 运算符进行操作,但效果不佳。它给了我从 None 提高 KeyError(key) KeyError:'variable_1'
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime
from airflow.models import Variable
import os
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020,8,1),
'retries': 0
}
def set_env():
os.environ["variable_1"] = "value_1"
def print_env_var():
print(os.environ["variable_1"])
with DAG('sample', default_args=default_args, schedule_interval='@once') as dag:
set_env_task = PythonOperator(
task_id='python_task',
python_callable=set_env,
dag=dag
)
print_env_task = PythonOperator(
task_id='load_seed_data_once',
python_callable=print_env_var,
dag=dag
)
set_env_task >> print_env_task
【问题讨论】:
标签: airflow