【问题标题】:Unable to Send Spark Data Frame to Kafka (java.lang.ClassNotFoundException: Failed to find data source: kafka.)无法将 Spark 数据帧发送到 Kafka(java.lang.ClassNotFoundException:无法找到数据源:kafka。)
【发布时间】:2018-09-07 23:48:39
【问题描述】:

我在使用 Spark 数据框将数据推送到 Kafka 时遇到问题。

让我用示例示例详细解释我的场景。我想将数据加载到 spark 并将 spark 输出发送到 kafka。我正在使用 Gradle 3.5 和 Spark 2.3.1 和 Kafka 1.0.1

这里是 build.gradle

buildscript {
ext {
    springBootVersion = '1.5.15.RELEASE'
}
repositories {
    mavenCentral()
}
dependencies {
    classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
  }
 }

apply plugin: 'scala'
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'

group = 'com.sample'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8

repositories {
    mavenCentral()
}

dependencies {
compile('org.springframework.boot:spring-boot-starter')
compile ('org.apache.spark:spark-core_2.11:2.3.1')
compile ('org.apache.spark:spark-sql_2.11:2.3.1')
compile ('org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.1')
compile ('org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1')

testCompile('org.springframework.boot:spring-boot-starter-test')
}

这是我的代码:

package com.sample
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._

object SparkConnection {

case class emp(empid:Integer, empname:String, empsal:Float)

def main(args:Array[String]) {

    val sparkConf = new SparkConf().setAppName("Spark 
    Connection").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val dataRdd = sc.textFile("/home/sample/data/sample.txt")
    val mapRdd = dataRdd.map(row => row.split(","))
    val empRdd = mapRdd.map( row => emp(row(0).toInt, row(1), row(2).toFloat))

    val sqlContext = new SQLContext(sc)

    import sqlContext.implicits._

    val empDF = empRdd.toDF() 

    empDF.
    select(to_json(struct(empDF.columns.map(column):_*)).alias("value"))
    .write.format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("topic", "my-kafka-topic").save()

    }

  }

请忽略 build.gradle 中的 Spring Boot 框架 API。

使用 Gradle 构建包后,我可以看到 .gradle 文件中提到的所有依赖类。

但是当我使用 spark-submit 运行代码时

spark-submit --class com.sample.SparkConnection spark_kafka_integration.jar

我收到以下错误

Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:635)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:241)
    at com.iniste.SparkConnection$.main(SparkConnection.scala:29)
    at com.iniste.SparkConnection.main(SparkConnection.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618)
    at scala.util.Try.orElse(Try.scala:84)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:618)
    ... 13 more
2018-09-05 17:41:17 INFO  SparkContext:54 - Invoking stop() from shutdown hook
2018-09-05 17:41:17 INFO  AbstractConnector:318 - Stopped Spark@51684e4a{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
2018-09-05 17:41:17 INFO  MapOutputTrackerMasterEndpoint:54 - MapOutputTrackerMasterEndpoint stopped!
2018-09-05 17:41:17 INFO  MemoryStore:54 - MemoryStore cleared
2018-09-05 17:41:17 INFO  BlockManager:54 - BlockManager stopped
2018-09-05 17:41:17 INFO  BlockManagerMaster:54 - BlockManagerMaster stopped
2018-09-05 17:41:17 INFO  OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 - OutputCommitCoordinator stopped!
2018-09-05 17:41:17 INFO  SparkContext:54 - Successfully stopped SparkContext
2018-09-05 17:41:17 INFO  ShutdownHookManager:54 - Shutdown hook called
2018-09-05 17:41:17 INFO  ShutdownHookManager:54 - Deleting directory /tmp/spark-bd4cb4ef-3883-4c26-a93f-f355b13ef306
2018-09-05 17:41:17 INFO  ShutdownHookManager:54 - Deleting directory /tmp/spark-156dfdbd-cff4-4c70-943f-35ef403a01ed

请帮助我摆脱这个错误。他们建议我在 spark-submit 中使用 --packages 选项的一些博客。但是我有一些代理限制,这是下载上述软件包所必需的。但我无法理解为什么 spark-submit 无法获取已经可用的罐子。请纠正我做错的地方。

【问题讨论】:

  • 你需要 Gradle 为你创建一个 shadow Jar,但是如果有代理,那么 Gradle 也应该无法下载这些包
  • 但是我运行 Gradle 工具的本地桌面没有任何代理问题。我的 Spark 节点有代理问题。
  • 尝试看看你是否可以让它工作github.com/johnrengelman/shadow 另外,Spark Core、Spark SQL 和 Streaming 应该从最终 jar 中排除,因为这些包已经是 Spark 分发的一部分。注意:您仍然需要将 Streaming 添加到 Gradle org.apache.spark:spark-streaming_2.11:2.3.1
  • 我通过在运行 spark-submit 时添加带有 --jars 选项的依赖 jars 解决了这个问题。但是我仍然怀疑为什么即使我的项目包含所有依赖的 jar,spark-submit 也无法选择 jar。对于您的评论,我将排除这些软件包。但我没有使用 spark_streaming 来满足我的要求。所以对我来说,不需要那个罐子
  • 如果您没有对项目进行遮蔽/遮蔽,则它不会包含依赖库。 Kafka 不包含在 Spark 包中,因此需要单独下载,这就是您所做的。另外,Kafka 包本身需要流包,所以你实际上正在使用它

标签: scala apache-spark gradle apache-kafka spark-structured-streaming


【解决方案1】:

与任何 Spark 应用程序一样,spark-submit 用于启动您的应用程序。 spark-sql-kafka-0-10_2.11 及其依赖项可以使用 --packages 直接添加到 spark-submit,如下所示

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1 com.sample.SparkConnection spark_kafka_integration.jar

这可以找到here

但是,根据 cricket_007 的建议,我已将 shadowjar 添加到您的 build.gradle 所以新的可能看起来和这个相似。

buildscript {
    ext {
        springBootVersion = '1.5.15.RELEASE'
    }
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

plugins {
    id "com.github.johnrengelman.shadow" version "2.0.4"
}
apply plugin: 'scala'
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: "com.github.johnrengelman.shadow"

group = 'com.sample'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8

repositories {
    mavenCentral()
}

dependencies {
    compile('org.springframework.boot:spring-boot-starter')
    compile ('org.apache.spark:spark-core_2.11:2.3.1')
    compile ('org.apache.spark:spark-sql_2.11:2.3.1')
    compile ('org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.1')
    compile ('org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1')
    compile 'org.scala-lang:scala-library:2.11.12'
    // https://mvnrepository.com/artifact/org.apache.kafka/kafka
    //compile group: 'org.apache.kafka', name: 'kafka_2.10', version: '0.8.0'

    testCompile('org.springframework.boot:spring-boot-starter-test')
}
shadowJar{
    baseName = "spark_kafka_integration"
    zip64 true
    classifier = null
    version = null
}

所以要创建你的 jar,命令将只是 :shadowJar from your gradle。

【讨论】:

  • 我不能使用像 com.github.johnrengelman.shadow 这样的外部插件
猜你喜欢
  • 2021-03-01
  • 2021-03-18
  • 2021-08-14
  • 2020-07-17
  • 2018-12-01
  • 2021-09-13
  • 2021-02-21
  • 2017-12-06
  • 2018-01-29
相关资源
最近更新 更多