【问题标题】:How to monitor Spark job with Airflow如何使用 Airflow 监控 Spark 作业
【发布时间】:2017-05-18 04:38:05
【问题描述】:

我设置了一些 dags,最终以 spark-submit 命令结束 spark 集群。如果这有所作为,我正在使用集群模式。无论如何,所以我的代码可以工作,但我意识到如果火花作业失败,我不一定会从 Airflow UI 中知道。通过集群模式触发作业,Airflow 将作业移交给可用的工作人员,因此气流不知道火花作业。

我该如何解决这个问题?

【问题讨论】:

    标签: apache-spark airflow apache-airflow


    【解决方案1】:

    Airflow(从 1.8 版开始)有

    SparkSqlOperator - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/spark_sql_operator.py ;
    SparkSQLHook code - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_sql_hook.py
    SparkSubmitOperator - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/spark_submit_operator.py
    SparkSubmitHook code - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_submit_hook.py
    

    如果你使用这些,如果 spark 作业失败,气流任务将失败。如果您使用 spark1.x 获取实时日志,您可能需要更改 spark_submit_hook 文件中的日志记录部分,因为 spark-submit 甚至会将某些 1.x 版本的错误记录到标准输出(我必须对 1.6.x 版本进行更改)。 1).

    另请注意,自上次稳定版本以来,SparkSubmitOperator 已进行了许多改进。

    【讨论】:

      【解决方案2】:

      您可以考虑使用client 模式,因为在 spark 作业完成之前客户端不会终止。气流执行器可以获取退出代码。

      否则您可能需要使用作业服务器。查看https://github.com/spark-jobserver/spark-jobserver

      【讨论】:

      • 我们考虑过这一点,但如果我们使用client 模式,我们是否不需要气流箱成为 Spark 集群的一部分?我还是 Spark 的新手,当我们尝试 client 模式时,没有工作开始,直到我在所说的盒子上启动了 Spark 工作器。
      • 您需要在与 Airflow 工作线程相同的主机上运行 spark-submit。该工作节点必须能够与 Spark 集群通信。
      【解决方案3】:

      您可以开始利用 LivyOperator 来监控作业, LivyOperator 将按您配置轮询的时间间隔轮询 Spark 作业的状态。 示例:

      kickoff_streamer_task = LivyOperator(
          task_id='kickoff_streamer_task',
          dag=dag,
          livy_conn_id='lokori',
          file='abfs://data@amethiaprime.dfs.core.windows.net/user/draxuser/drax_streamer.jar',
          **polling_interval=60**,  # used when you want to pull the status of submitted job
          queue='root.ids.draxuser',
          proxy_user='draxuser',
          args=['10', '3000'],
          num_executors=4,
          conf={
              'spark.shuffle.compress': 'false',
              'master': 'yarn',
              'deploy_mode': 'cluster',
              'spark.ui.view.acls': '*'
          },
          class_name='com.apple.core.drax.dpaas.batch.DraxMatrixProducer',
          on_success_callback=livy_callback,
          on_failure_callback=_failure_callback
      )
      

      在上面的例子中,polling_interval 设置为 60 秒,它会在 60 秒时继续轮询你的工作状态,它会确保给你正确的工作状态。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2019-05-30
        • 1970-01-01
        • 2016-04-29
        • 2017-01-01
        • 1970-01-01
        • 2022-08-15
        • 1970-01-01
        相关资源
        最近更新 更多