【问题标题】:Error when connecting spark structured streaming + kafka连接火花结构化流+ kafka时出错
【发布时间】:2020-08-18 01:36:14
【问题描述】:

我正在尝试将我的结构化流 Spark 2.4.5 与 kafka 连接,但我尝试此数据源提供程序错误的所有时间都会出现。 按照我的 scala 代码和我的 sbt 构建:

import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger

object streaming_app_demo {
  def main(args: Array[String]): Unit = {

    println("Spark Structured Streaming with Kafka Demo Application Started ...")

    val KAFKA_TOPIC_NAME_CONS = "test"
    val KAFKA_OUTPUT_TOPIC_NAME_CONS = "test"
    val KAFKA_BOOTSTRAP_SERVERS_CONS = "localhost:9092"


    val spark = SparkSession.builder
      .master("local[*]")
      .appName("Spark Structured Streaming with Kafka Demo")
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    // Stream from Kafka
    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS_CONS)
      .option("subscribe", KAFKA_TOPIC_NAME_CONS)
      .option("startingOffsets", "latest")
      .load()

    val ds = df
      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("topic", "test2")
      .start()
  }
}

错误是:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:652)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:161)
    at streaming_app_demo$.main(teste.scala:29)
    at streaming_app_demo.main(teste.scala)

而我的 sbt.build 是:

name := "scala_212"

version := "0.1"

scalaVersion := "2.12.11"

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.5"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.5"

libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.5" % "provided"

libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.5.0"

谢谢!

【问题讨论】:

  • 我已将 sbt.build 中的第 11 行更改为: libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.5" 和我的代码终于可以工作了

标签: scala apache-spark apache-kafka spark-streaming spark-structured-streaming


【解决方案1】:

对于spark structured streaming + kafka,需要这个 spark-sql-kafka-0-10 库。

您收到此 org.apache.spark.sql.AnalysisException: Failed to find data source: kafka 异常是因为 spark-sql-kafka 库在您的类路径中不可用,并且无法在 META-INF/services 文件夹中找到 org.apache.spark.sql.sources.DataSourceRegister

DataSourceRegister path inside jar file

/org/apache/spark/spark-sql-kafka-0-10_2.11/2.2.0/spark-sql-kafka-0-10_2.11-2.2.0.jar!/META-INF/services /org.apache.spark.sql.sources.DataSourceRegister

Update

如果您使用的是 SBT,请尝试添加以下代码块。这将在您的最终 jar 中包含 org.apache.spark.sql.sources.DataSourceRegister 文件。

// META-INF discarding
assemblyMergeStrategy in assembly := {
  case PathList("META-INF","services",xs @ _*) => MergeStrategy.filterDistinctLines
  case PathList("META-INF",xs @ _*) => MergeStrategy.discard
  case "application.conf" => MergeStrategy.concat
  case _ => MergeStrategy.first
}

【讨论】:

  • 所以,我将我的库更改为: libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.5" 我的代码终于可以工作了.我如何把这个包放在 pyspark 中?我必须下载 spark-sql-kafka-0-10 jar 并放入 spark jars 目录?
  • 你在使用 sbt 吗?
  • 用 pyspark,没有
  • 但是在您的问题中您添加了 sbt 构建文件?,如果您使用的是 sbt,请使用上面的代码,以便它会自动将文件添加到您的最终 jar 文件中。
  • 是的,但是我用 intelli j 创建了这个项目。所以我不像我想运行 pyspark 和我想将这个“spark-sql-kafka-0-10 依赖项以使用 pyspark 运行结构化流”添加到我的 pyspark 时那样下载 spark 文件
猜你喜欢
  • 2020-10-25
  • 2021-05-31
  • 2019-09-20
  • 1970-01-01
  • 2016-09-06
  • 2017-04-27
  • 1970-01-01
  • 2018-11-08
  • 2018-07-12
相关资源
最近更新 更多