【问题标题】:How to configure a custom Spark Plugin in Databricks?如何在 Databricks 中配置自定义 Spark 插件?
【发布时间】:2021-12-17 18:11:21
【问题描述】:

如何在Databricks中正确配置Spark插件和包含Spark Plugin类的jar?

我在 Scala 中创建了以下 Spark 3 插件类, CustomExecSparkPlugin.scala:

package example

import org.apache.spark.api.plugin.{SparkPlugin, DriverPlugin, ExecutorPlugin}

class CustomExecSparkPlugin extends SparkPlugin  {
 
  override def driverPlugin(): DriverPlugin = {
    new DriverPlugin() {
      override def shutdown(): Unit = {
        // custom code        
      }
    }
  }

  override def executorPlugin(): ExecutorPlugin = {
    new ExecutorPlugin() {
      override def shutdown(): Unit = {
        // custom code  
      }
    }
  }
}

我已将其打包成一个 jar 并将其上传到 DBFS,在 DBR 7.3(Spark 3.0.1、Scala 2.12)集群创建期间,我设置了以下 Spark 配置(高级选项):

spark.plugins com.example.CustomExecSparkPlugin
spark.driver.extraClassPath /dbfs/path/to/jar
spark.executor.extraClassPath /dbfs/path/to/jar

但是,集群创建失败并出现 Exception: com.example.CustomExecSparkPlugin not found in com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader@622d7e4

驱动 log4j 日志:

21/11/01 13:33:01 ERROR SparkContext: Error initializing SparkContext.
java.lang.ClassNotFoundException: com.example.CustomExecSparkPlugin not found in com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader@622d7e4
    at com.databricks.backend.daemon.driver.ClassLoaders$MultiReplClassLoader.loadClass(ClassLoaders.scala:115)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.spark.util.Utils$.classForName(Utils.scala:226)
    at org.apache.spark.util.Utils$.$anonfun$loadExtensions$1(Utils.scala:3006)
    at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
    at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
    at org.apache.spark.util.Utils$.loadExtensions(Utils.scala:3004)
    at org.apache.spark.internal.plugin.PluginContainer$.apply(PluginContainer.scala:160)
    at org.apache.spark.internal.plugin.PluginContainer$.apply(PluginContainer.scala:146)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:591)
    at com.databricks.backend.daemon.driver.DatabricksILoop$.$anonfun$initializeSharedDriverContext$1(DatabricksILoop.scala:347)
    at com.databricks.backend.daemon.driver.ClassLoaders$.withContextClassLoader(ClassLoaders.scala:29)
    at com.databricks.backend.daemon.driver.DatabricksILoop$.initializeSharedDriverContext(DatabricksILoop.scala:347)
    at com.databricks.backend.daemon.driver.DatabricksILoop$.getOrCreateSharedDriverContext(DatabricksILoop.scala:277)
    at com.databricks.backend.daemon.driver.DriverCorral.com$databricks$backend$daemon$driver$DriverCorral$$driverContext(DriverCorral.scala:179)
    at com.databricks.backend.daemon.driver.DriverCorral.<init>(DriverCorral.scala:216)
    at com.databricks.backend.daemon.driver.DriverDaemon.<init>(DriverDaemon.scala:39)
    at com.databricks.backend.daemon.driver.DriverDaemon$.create(DriverDaemon.scala:211)
    at com.databricks.backend.daemon.driver.DriverDaemon$.wrappedMain(DriverDaemon.scala:216)
    at com.databricks.DatabricksMain.$anonfun$main$1(DatabricksMain.scala:106)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.DatabricksMain.$anonfun$withStartupProfilingData$1(DatabricksMain.scala:321)
    at com.databricks.logging.UsageLogging.$anonfun$recordOperation$4(UsageLogging.scala:431)
    at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:239)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:234)
    at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:231)
    at com.databricks.DatabricksMain.withAttributionContext(DatabricksMain.scala:74)
    at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:276)
    at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:269)
    at com.databricks.DatabricksMain.withAttributionTags(DatabricksMain.scala:74)
    at com.databricks.logging.UsageLogging.recordOperation(UsageLogging.scala:412)
    at com.databricks.logging.UsageLogging.recordOperation$(UsageLogging.scala:338)
    at com.databricks.DatabricksMain.recordOperation(DatabricksMain.scala:74)
    at com.databricks.DatabricksMain.withStartupProfilingData(DatabricksMain.scala:321)
    at com.databricks.DatabricksMain.main(DatabricksMain.scala:105)
    at com.databricks.backend.daemon.driver.DriverDaemon.main(DriverDaemon.scala)
Caused by: java.lang.ClassNotFoundException: com.example.CustomExecSparkPlugin
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
    at com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader.loadClass(ClassLoaders.scala:151)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
    at com.databricks.backend.daemon.driver.ClassLoaders$MultiReplClassLoader.loadClass(ClassLoaders.scala:112)
    ... 43 more
21/11/01 13:33:02 INFO AbstractConnector: Stopped Spark@b6bccb4{HTTP/1.1,[http/1.1]}{10.88.234.70:40001}
21/11/01 13:33:02 INFO SparkUI: Stopped Spark web UI at http://10.88.234.70:40001
21/11/01 13:33:02 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
21/11/01 13:33:02 INFO MemoryStore: MemoryStore cleared
21/11/01 13:33:02 INFO BlockManager: BlockManager stopped
21/11/01 13:33:02 INFO BlockManagerMaster: BlockManagerMaster stopped
21/11/01 13:33:02 WARN MetricsSystem: Stopping a MetricsSystem that is not running
21/11/01 13:33:02 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
21/11/01 13:33:02 INFO SparkContext: Successfully stopped SparkContext

【问题讨论】:

    标签: apache-spark apache-spark-sql databricks azure-databricks aws-databricks


    【解决方案1】:

    您可以考虑将其添加为init script instead。初始化脚本让您有机会在 spark 开始之前将 jars 添加到集群中,这可能是 spark 插件所期望的。

    • 将您的 jar 上传到 dbfs,例如 dbfs:/databricks/plugins
    • 创建如下所示的 bash 脚本并将其上传到同一位置。
    • 使用指定的初始化脚本创建/编辑集群。
    #!/bin/bash
    
    STAGE_DIR="/dbfs/databricks/plugins/
    
    echo "BEGIN: Upload Spark Plugins"
    cp -f $STAGE_DIR/*.jar /mnt/driver-daemon/jars || { echo "Error copying Spark Plugin library file"; exit 1;}
    echo "END: Upload Spark Plugin JARs"
    
    echo "BEGIN: Modify Spark config settings"
    cat << 'EOF' > /databricks/driver/conf/spark-plugin-driver-defaults.conf
    [driver] {
       "spark.plugins" = "com.example.CustomExecSparkPlugin"
    }
    EOF
    echo "END: Modify Spark config settings"
    

    我相信将 jar 复制到 /mnt/driver-daemons/jars 将使 Spark 在 Spark 完全初始化之前知道该 jar (doc)。我不太确定它会对执行人产生什么影响:(

    【讨论】:

    • 初始化脚本在所有节点上执行,除非你明确检查IS_DRIVER环境变量
    • 在 AWS 和 Azure Databricks 中,/mnt/driver-daemons/jars/ 是 /databricks/jars/ 本身的符号链接,而 GCP 中不存在此路径。通过初始化脚本将插件 jar 直接复制到 /databricks/jars/ 并在集群创建/编辑期间通过 Spark Config 设置 spark.plugins 对我有用。
    猜你喜欢
    • 2020-02-29
    • 2021-08-23
    • 2022-07-31
    • 2011-09-11
    • 1970-01-01
    • 2021-04-01
    • 2019-10-09
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多