【发布时间】:2019-08-28 21:27:02
【问题描述】:
我是 Python 和 Airflow 的一个相对较新的用户,我很难让 spark-submit 在 Airflow 任务中运行。我的目标是让以下 DAG 任务成功运行
from datetime import datetime, timedelta
from airflow import DAG
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'matthew',
'start_date': datetime(2019, 7, 8)
}
dag = DAG('CustomCreate_test2',
default_args=default_args,
schedule_interval=timedelta(days=1))
t3 = BashOperator(
task_id='run_test',
bash_command='spark-submit --class CLASSPATH.CustomCreate ~/IdeaProjects/custom-create-job/build/libs/custom-create.jar',
dag=dag
)
我知道问题在于 Airflow 而不是 bash,因为当我在终端中运行命令 spark-submit --class CLASSPATH.CustomCreate ~/IdeaProjects/custom-create-job/build/libs/custom-create.jar 时,它会成功运行。
我从 Airflow 日志中收到以下错误
...
[2019-08-28 15:55:34,750] {bash_operator.py:132} INFO - Command exited with return code 1
[2019-08-28 15:55:34,764] {taskinstance.py:1047} ERROR - Bash command failed
Traceback (most recent call last):
File "/Users/matcordo2/.virtualenv/airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 922, in _run_raw_task
result = task_copy.execute(context=context)
File "/Users/matcordo2/.virtualenv/airflow/lib/python3.7/site-packages/airflow/operators/bash_operator.py", line 136, in execute
raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed
...
我也尝试过使用SparkSubmitOperator(...),但没有成功运行它,我只得到如下错误日志
...
[2019-08-28 15:54:49,749] {logging_mixin.py:95} INFO - [[34m2019-08-28 15:54:49,749[0m] {[34mspark_submit_hook.py:[0m427} INFO[0m - at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)[0m
[2019-08-28 15:54:49,803] {taskinstance.py:1047} ERROR - Cannot execute: ['spark-submit', '--master', 'yarn', '--num-executors', '2', '--total-executor-cores', '1', '--executor-cores', '1', '--executor-memory', '2g', '--driver-memory', '1g', '--name', 'CustomCreate', '--class', 'CLASSPATH.CustomCreate', '--verbose', '--queue', 'root.default', '--deploy-mode', 'cluster', '~/IdeaProjects/custom-create-job/build/libs/custom-create.jar']. Error code is: 1.
...
在BashOperator(...) 任务中运行spark-submit ... 命令之前,我必须使用SparkSubmitOperator(...) 做些什么吗?
有没有办法直接从SparkSubmitOperator(...) 任务运行我的spark-submit 命令?
我需要对 Airflow 的 Admin->Connections 页面中的spark_default 做些什么吗?
Airflow的Admin->Users页面有什么必须设置的吗? 是否必须设置任何内容以允许 Airflow 运行 spark 或运行由特定用户创建的 jar 文件?如果是,是什么/如何?
【问题讨论】:
-
虽然我无法预测您的设置问题(听起来像 @ 等 shell 上的 环境变量 / 二进制文件可发现性中的问题987654334@ /
zsh),如果您需要解决方法,请参阅this
标签: python bash apache-spark airflow spark-submit