【问题标题】:Export environment variables at runtime with airflow在运行时使用气流导出环境变量
【发布时间】:2018-07-12 18:38:44
【问题描述】:

我目前正在将之前在 bash 脚本中实现的工作流转换为 Airflow DAG。在 bash 脚本中,我只是在运行时导出变量

export HADOOP_CONF_DIR="/etc/hadoop/conf"

现在我想在 Airflow 中做同样的事情,但还没有找到解决方案。我发现的一种解决方法是在任何方法或运算符之外使用os.environ[VAR_NAME]='some_text' 设置变量,但这意味着它们会在脚本加载时导出,而不是在运行时导出。

现在,当我尝试在 PythonOperator 调用的函数中调用 os.environ[VAR_NAME] = 'some_text' 时,它不起作用。我的代码是这样的

def set_env():
    os.environ['HADOOP_CONF_DIR'] = "/etc/hadoop/conf"
    os.environ['PATH'] = "somePath:" + os.environ['PATH']
    os.environ['SPARK_HOME'] = "pathToSparkHome"
    os.environ['PYTHONPATH'] = "somePythonPath"
    os.environ['PYSPARK_PYTHON'] = os.popen('which python').read().strip()
    os.environ['PYSPARK_DRIVER_PYTHON'] = os.popen('which python').read().strip()

set_env_operator = PythonOperator(
    task_id='set_env_vars_NOT_WORKING',
    python_callable=set_env,
    dag=dag)

现在当我的 SparkSubmitOperator 被执行时,我得到了异常:

Exception in thread "main" java.lang.Exception: When running with master 'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.

与此相关的我的用例是我有SparkSubmitOperator,我在其中将作业提交给YARN,因此必须在环境中设置HADOOP_CONF_DIRYARN_CONF_DIR。遗憾的是,我无法在我的 .bashrc 或任何其他配置中设置它们,这就是我需要在运行时设置它们的原因。

最好在执行SparkSubmitOperator 之前将它们设置在运算符中,但如果有可能将它们作为参数传递给SparkSubmitOperator,那至少可以。

【问题讨论】:

  • 我刚刚发现了这个:github.com/apache/incubator-airflow/pull/3268/commits,这似乎在下一个气流版本中为 SparkSubmitOperator 解决了这个问题,但是如果我有另一个 Operator 呢?
  • 这行不通,因为 afaik 每个任务都作为“单独的”Python 脚本进程执行,如果你愿意的话。由于第一次执行设置变量,第二次执行运行 Spark 运算符,第二次执行永远不会看到变量。

标签: python apache-spark environment-variables airflow


【解决方案1】:

根据我在spark submit operator 中看到的内容,您可以将环境变量作为字典传递给 spark-submit。

:param env_vars: Environment variables for spark-submit. It
                 supports yarn and k8s mode too.
:type env_vars: dict

你试过了吗?

【讨论】:

  • 据我所知,这还没有在发布版本中。
  • 您可以随时挑选出这些更改并将其转储到您自己的插件中。或者只是用 master 分支中的一个覆盖您自己版本上的 spark submit 操作符的副本 - 虽然这有点狡猾:)
  • 我刚刚意识到我已经使用 SubDags 创建了工作流的每个部分,因为我需要在运行时创建任务,据我了解,这也意味着如果我只是调用 os.environ [Var] = Var ,它应该在加载主 dag 和执行子 dag 时执行。我不是 100% 高兴它会在加载主 dag 时执行,否则实际上可能没问题 :)
猜你喜欢
  • 2020-04-27
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多