【发布时间】:2018-07-12 18:38:44
【问题描述】:
我目前正在将之前在 bash 脚本中实现的工作流转换为 Airflow DAG。在 bash 脚本中,我只是在运行时导出变量
export HADOOP_CONF_DIR="/etc/hadoop/conf"
现在我想在 Airflow 中做同样的事情,但还没有找到解决方案。我发现的一种解决方法是在任何方法或运算符之外使用os.environ[VAR_NAME]='some_text' 设置变量,但这意味着它们会在脚本加载时导出,而不是在运行时导出。
现在,当我尝试在 PythonOperator 调用的函数中调用 os.environ[VAR_NAME] = 'some_text' 时,它不起作用。我的代码是这样的
def set_env():
os.environ['HADOOP_CONF_DIR'] = "/etc/hadoop/conf"
os.environ['PATH'] = "somePath:" + os.environ['PATH']
os.environ['SPARK_HOME'] = "pathToSparkHome"
os.environ['PYTHONPATH'] = "somePythonPath"
os.environ['PYSPARK_PYTHON'] = os.popen('which python').read().strip()
os.environ['PYSPARK_DRIVER_PYTHON'] = os.popen('which python').read().strip()
set_env_operator = PythonOperator(
task_id='set_env_vars_NOT_WORKING',
python_callable=set_env,
dag=dag)
现在当我的 SparkSubmitOperator 被执行时,我得到了异常:
Exception in thread "main" java.lang.Exception: When running with master 'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.
与此相关的我的用例是我有SparkSubmitOperator,我在其中将作业提交给YARN,因此必须在环境中设置HADOOP_CONF_DIR 或YARN_CONF_DIR。遗憾的是,我无法在我的 .bashrc 或任何其他配置中设置它们,这就是我需要在运行时设置它们的原因。
最好在执行SparkSubmitOperator 之前将它们设置在运算符中,但如果有可能将它们作为参数传递给SparkSubmitOperator,那至少可以。
【问题讨论】:
-
我刚刚发现了这个:github.com/apache/incubator-airflow/pull/3268/commits,这似乎在下一个气流版本中为 SparkSubmitOperator 解决了这个问题,但是如果我有另一个 Operator 呢?
-
这行不通,因为 afaik 每个任务都作为“单独的”Python 脚本进程执行,如果你愿意的话。由于第一次执行设置变量,第二次执行运行 Spark 运算符,第二次执行永远不会看到变量。
标签: python apache-spark environment-variables airflow