【问题标题】:How do we set OS environment variables in Airflow我们如何在 Airflow 中设置操作系统环境变量
【发布时间】:2021-11-10 23:24:28
【问题描述】:

我有一个气流 dag,我想要做的是读取存储在气流 UI 中的变量(用户名和密码),并将这些变量值作为导出值传递到操作系统中。原因是因为我使用的是 dbt yml 文件,它需要我读取环境变量 'dbt_user'。 (唯一的另一种方法是在不安全的yaml文件中设置密码。

default:
  target: dev
  outputs:
    dev:
      type: snowflake
      account: xxxx

      user: "{{ env_var('dbt_user') }}"

我尝试编写一个 dag 来执行 bashoperator 导出,但它似乎没有设置环境变量。

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime
from airflow.models import Variable
import os


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2020,8,1),
    'retries': 0
}


with DAG('sample', default_args=default_args, schedule_interval='@once') as dag:
    task_1 = BashOperator(
        task_id='get_variables',
        bash_command='export dbt_user={{ var.value.dbt_user }} ',
        env = os.environ.copy(),
        dag=dag
    )

    task_2 = BashOperator(
        task_id='load_seed_data_once',
        bash_command='echo $dbt_user',
        dag=dag
    )

task_1 >> task_2

当我尝试回显时,我们看不到任何设置。有谁知道如何使用 bashoperator 设置环境变量?

[2021-11-04 12:00:34,452] {subprocess.py:63} INFO - Running command: ['bash', '-c', 'echo $dbt_user']
[2021-11-04 12:00:34,463] {subprocess.py:74} INFO - Output:
[2021-11-04 12:00:34,464] {subprocess.py:78} INFO - 
[2021-11-04 12:00:34,465] {subprocess.py:82} INFO - Command exited with return code 0
[2021-11-04 12:00:34,494] {taskinstance.py:1211} INFO - Marking task as SUCCESS. dag_id=sample, task_id=load_seed_data_once, execution_date=20211104T120032, start_date=20211104T120034, end_date=20211104T120034
[2021-11-04 12:00:34,517] {taskinstance.py:1265} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-11-04 12:00:34,555] {local_task_job.py:149} INFO - Task exited with return code 0

更新:

我也尝试过通过 python 运算符进行操作,但效果不佳。它给了我从 None 提高 KeyError(key) KeyError:'variable_1'

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime
from airflow.models import Variable
import os


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2020,8,1),
    'retries': 0
}

def set_env():
    os.environ["variable_1"] = "value_1"

def print_env_var():
    print(os.environ["variable_1"])


with DAG('sample', default_args=default_args, schedule_interval='@once') as dag:
    set_env_task = PythonOperator(
        task_id='python_task', 
        python_callable=set_env,
        dag=dag
    )



    print_env_task = PythonOperator(
        task_id='load_seed_data_once',
        python_callable=print_env_var,
        dag=dag
    )

set_env_task >> print_env_task

【问题讨论】:

    标签: airflow


    【解决方案1】:

    BashOperatorPythonOperator - 我认为的任何运算符 - 启动一个新的子 shell,并且只会在运行时(例如 composek8s deploy)或通过启动 airflow 之前的脚本(例如入口点)。

    这就是为什么你在BashOperator 中有env 参数来传递你想为脚本设置的任何环境变量的字典。 您也可以从那里的 AF 变量中传递 dbt_user 和密码,因为 env 是模板化的。

    
    env={'dbt_user': '{{ var.value.dbt_user }}'}
    

    您也可以在 dag 默认值中设置 env 以使其可用于所有任务,因此您无需单独设置。

    最后,如果您使用LocalExecutor,您可以在第一个 bash 中执行以下操作:

    echo "export dbt_user={{ var.value.dbt_user }} >> ~/.bashrc
    

    它将使导出的变量在任何新的 shell 中都可以访问。 请注意,当新容器启动时,这不适用于KubernetesExecutor - 但有一些方法可以解决它。

    【讨论】:

    • 嗨嗨,你的意思是。我在哪里设置 env={'dbt_user': '{{ var.value.dbt_user }}'}?
    • 我对@9​​87654336@ 不熟悉,所以我假设您从 bash 任务开始。如果是这样,您将像这样推送 env: task = BashOperator( ... env = {'dbt_user': '{{ var.value.dbt_user }}'}, dag=dag )
    • 好的,我搞定了。
    • @Adam 你是怎么让它工作的?我正在尝试解决同样的问题。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-05-28
    • 2014-01-03
    • 2013-11-11
    • 2017-07-02
    • 2018-11-18
    • 1970-01-01
    • 2016-01-13
    相关资源
    最近更新 更多