【问题标题】:Spark streaming from kafka topic using scala使用 scala 从 kafka 主题进行 Spark 流式传输
【发布时间】:2019-05-13 05:52:55
【问题描述】:

我是 scala/Spark 开发的新手。我使用 sbt 和 scala 从 Kafka 主题创建了一个简单的流应用程序。我有以下代码

build.sbt

name := "kafka-streaming"

version := "1.0"

assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)

assemblyMergeStrategy in assembly := {
  case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first
  case PathList(pl @ _*) if pl.contains("log4j.properties") => MergeStrategy.concat
  case PathList("META-INF", "io.netty.versions.properties") => MergeStrategy.last
  case x =>
    val oldStrategy = (assemblyMergeStrategy in assembly).value
    oldStrategy(x)
}

scalaVersion := "2.11.8"

resolvers += "jitpack" at "https://jitpack.io"

// still want to be able to run in sbt
// https://github.com/sbt/sbt-assembly#-provided-configuration
run in Compile <<= Defaults.runTask(fullClasspath in Compile, mainClass in (Compile, run), runner in (Compile, run))

fork in run := true
javaOptions in run ++= Seq(
  "-Dlog4j.debug=true",
  "-Dlog4j.configuration=log4j.properties")

libraryDependencies ++= Seq(
  "com.groupon.sparklint" %% "sparklint-spark162" % "1.0.4" excludeAll (
    ExclusionRule(organization = "org.apache.spark")
    ),
  "org.apache.spark" %% "spark-core" % "2.4.0",
  "org.apache.spark" %% "spark-sql" % "2.4.0",
  "org.apache.spark" %% "spark-streaming" % "2.4.0" % "provided",
  "org.apache.spark" %% "spark-streaming-kafka" % "1.6.3"
)

WeatherDataStream.scala

package com.supergloo
import kafka.serializer.StringDecoder
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils

/**
  * Stream from Kafka
  */
object WeatherDataStream {

  val localLogger = Logger.getLogger("WeatherDataStream")

  def main(args: Array[String]) {

    // update
    // val checkpointDir = "./tmp"

    val sparkConf = new SparkConf().setAppName("Raw Weather")
    sparkConf.setIfMissing("spark.master", "local[5]")

    val ssc = new StreamingContext(sparkConf, Seconds(2))

    val kafkaTopicRaw = "spark-topic"
    val kafkaBroker = "127.0.01:9092"

    val topics: Set[String] = kafkaTopicRaw.split(",").map(_.trim).toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> kafkaBroker)

    localLogger.info(s"connecting to brokers: $kafkaBroker")
    localLogger.info(s"kafkaParams: $kafkaParams")
    localLogger.info(s"topics: $topics")

    val rawWeatherStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
    localLogger.info(s"Manaaaaaaaaaf  --->>>: $rawWeatherStream")


    //Kick off
    ssc.start()

    ssc.awaitTermination()

    ssc.stop()
  }
}

我已经使用命令创建了 jar 文件

sbt 包

并使用命令运行应用程序

./spark-submit --master spark://myserver:7077 --class com.supergloo.WeatherDataStream /home/Manaf/kafka-streaming_2.11-1.0.jar

但我遇到了这样的错误

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$
        at com.supergloo.WeatherDataStream$.main(WeatherDataStream.scala:37)
        at com.supergloo.WeatherDataStream.main(WeatherDataStream.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtils$
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

根据我的堆栈溢出分析,我知道使用汇编命令创建 jar

sbt 汇编

但是在执行汇编命令时出现如下错误

[error] 153 errors were encountered during merge
[trace] Stack trace suppressed: run last *:assembly for the full output.
[error] (*:assembly) deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\org.apache.arrow\arrow-vector\jars\arrow-vector-0.10.0.jar:git.properties
[error] C:\Users\amanaf\.ivy2\cache\org.apache.arrow\arrow-format\jars\arrow-format-0.10.0.jar:git.properties
[error] C:\Users\amanaf\.ivy2\cache\org.apache.arrow\arrow-memory\jars\arrow-memory-0.10.0.jar:git.properties
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\javax.inject\javax.inject\jars\javax.inject-1.jar:javax/inject/Inject.class
[error] C:\Users\amanaf\.ivy2\cache\org.glassfish.hk2.external\javax.inject\jars\javax.inject-2.4.0-b34.jar:javax/inject/Inject.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\javax.inject\javax.inject\jars\javax.inject-1.jar:javax/inject/Named.class
[error] C:\Users\amanaf\.ivy2\cache\org.glassfish.hk2.external\javax.inject\jars\javax.inject-2.4.0-b34.jar:javax/inject/Named.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\javax.inject\javax.inject\jars\javax.inject-1.jar:javax/inject/Provider.class
[error] C:\Users\amanaf\.ivy2\cache\org.glassfish.hk2.external\javax.inject\jars\javax.inject-2.4.0-b34.jar:javax/inject/Provider.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\javax.inject\javax.inject\jars\javax.inject-1.jar:javax/inject/Qualifier.class
[error] C:\Users\amanaf\.ivy2\cache\org.glassfish.hk2.external\javax.inject\jars\javax.inject-2.4.0-b34.jar:javax/inject/Qualifier.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\javax.inject\javax.inject\jars\javax.inject-1.jar:javax/inject/Scope.class
[error] C:\Users\amanaf\.ivy2\cache\org.glassfish.hk2.external\javax.inject\jars\javax.inject-2.4.0-b34.jar:javax/inject/Scope.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\javax.inject\javax.inject\jars\javax.inject-1.jar:javax/inject/Singleton.class
[error] C:\Users\amanaf\.ivy2\cache\org.glassfish.hk2.external\javax.inject\jars\javax.inject-2.4.0-b34.jar:javax/inject/Singleton.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\org.lz4\lz4-java\jars\lz4-java-1.4.0.jar:net/jpountz/lz4/LZ4BlockInputStream.class
[error] C:\Users\amanaf\.ivy2\cache\net.jpountz.lz4\lz4\jars\lz4-1.2.0.jar:net/jpountz/lz4/LZ4BlockInputStream.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\org.lz4\lz4-java\jars\lz4-java-1.4.0.jar:net/jpountz/lz4/LZ4BlockOutputStream.class
[error] C:\Users\amanaf\.ivy2\cache\net.jpountz.lz4\lz4\jars\lz4-1.2.0.jar:net/jpountz/lz4/LZ4BlockOutputStream.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\org.lz4\lz4-java\jars\lz4-java-1.4.0.jar:net/jpountz/lz4/LZ4Compressor.class
[error] C:\Users\amanaf\.ivy2\cache\net.jpountz.lz4\lz4\jars\lz4-1.2.0.jar:net/jpountz/lz4/LZ4Compressor.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\org.lz4\lz4-java\jars\lz4-java-1.4.0.jar:net/jpountz/lz4/LZ4Constants.class
[error] C:\Users\amanaf\.ivy2\cache\net.jpountz.lz4\lz4\jars\lz4-1.2.0.jar:net/jpountz/lz4/LZ4Constants.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\org.lz4\lz4-java\jars\lz4-java-1.4.0.jar:net/jpountz/lz4/LZ4Factory.class
[error] C:\Users\amanaf\.ivy2\cache\net.jpountz.lz4\lz4\jars\lz4-1.2.0.jar:net/jpountz/lz4/LZ4Factory.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\org.lz4\lz4-java\jars\lz4-java-1.4.0.jar:net/jpountz/lz4/LZ4FastDecompressor.class
[error] C:\Users\amanaf\.ivy2\cache\net.jpountz.lz4\lz4\jars\lz4-1.2.0.jar:net/jpountz/lz4/LZ4FastDecompressor.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\org.lz4\lz4-java\jars\lz4-java-1.4.0.jar:net/jpountz/lz4/LZ4HCJNICompressor.class
[error] C:\Users\amanaf\.ivy2\cache\net.jpountz.lz4\lz4\jars\lz4-1.2.0.jar:net/jpountz/lz4/LZ4HCJNICompressor.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\org.lz4\lz4-java\jars\lz4-java-1.4.0.jar:net/jpountz/lz4/LZ4HCJavaSafeCompressor$HashTable.class
[error] C:\Users\amanaf\.ivy2\cache\net.jpountz.lz4\lz4\jars\lz4-1.2.0.jar:net/jpountz/lz4/LZ4HCJavaSafeCompressor$HashTable.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\org.lz4\lz4-java\jars\lz4-java-1.4.0.jar:net/jpountz/lz4/LZ4HCJavaSafeCompressor.class
[error] C:\Users\amanaf\.ivy2\cache\net.jpountz.lz4\lz4\jars\lz4-1.2.0.jar:net/jpountz/lz4/LZ4HCJavaSafeCompressor.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\org.lz4\lz4-java\jars\lz4-java-1.4.0.jar:net/jpountz/lz4/LZ4HCJavaUnsafeCompressor$HashTable.class
[error] C:\Users\amanaf\.ivy2\cache\net.jpountz.lz4\lz4\jars\lz4-1.2.0.jar:net/jpountz/lz4/LZ4HCJavaUnsafeCompressor$HashTable.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\org.lz4\lz4-java\jars\lz4-java-1.4.0.jar:net/jpountz/lz4/LZ4HCJavaUnsafeCompressor.class
[error] C:\Users\amanaf\.ivy2\cache\net.jpountz.lz4\lz4\jars\lz4-1.2.0.jar:net/jpountz/lz4/LZ4HCJavaUnsafeCompressor.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\org.lz4\lz4-java\jars\lz4-java-1.4.0.jar:net/jpountz/lz4/LZ4JNI.class
[error] C:\Users\amanaf\.ivy2\cache\net.jpountz.lz4\lz4\jars\lz4-1.2.0.jar:net/jpountz/lz4/LZ4JNI.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\org.lz4\lz4-java\jars\lz4-java-1.4.0.jar:net/jpountz/lz4/LZ4JNICompressor.class
[error] C:\Users\amanaf\.ivy2\cache\net.jpountz.lz4\lz4\jars\lz4-1.2.0.jar:net/jpountz/lz4/LZ4JNICompressor.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\org.lz4\lz4-java\jars\lz4-java-1.4.0.jar:net/jpountz/lz4/LZ4JNIFastDecompressor.class
[error] C:\Users\amanaf\.ivy2\cache\net.jpountz.lz4\lz4\jars\lz4-1.2.0.jar:net/jpountz/lz4/LZ4JNIFastDecompressor.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\org.lz4\lz4-java\jars\lz4-java-1.4.0.jar:net/jpountz/lz4/LZ4JNISafeDecompressor.class
[error] C:\Users\amanaf\.ivy2\cache\net.jpountz.lz4\lz4\jars\lz4-1.2.0.jar:net/jpountz/lz4/LZ4JNISafeDecompressor.class

【问题讨论】:

    标签: scala apache-spark sbt spark-streaming spark-streaming-kafka


    【解决方案1】:

    此问题与库版本有关。我刚刚像这样更新了我的 build.sbt

    name := "kafka-streaming"
    
    version := "1.0"
    
    assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
    
    assemblyMergeStrategy in assembly := {
      case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first
      case PathList(pl @ _*) if pl.contains("log4j.properties") => MergeStrategy.concat
      case PathList("META-INF", "io.netty.versions.properties") => MergeStrategy.last
      case x =>
        val oldStrategy = (assemblyMergeStrategy in assembly).value
        oldStrategy(x)
    }
    
    scalaVersion := "2.11.8"
    
    resolvers += "jitpack" at "https://jitpack.io"
    
    // still want to be able to run in sbt
    // https://github.com/sbt/sbt-assembly#-provided-configuration
    run in Compile <<= Defaults.runTask(fullClasspath in Compile, mainClass in (Compile, run), runner in (Compile, run))
    
    fork in run := true
    javaOptions in run ++= Seq(
      "-Dlog4j.debug=true",
      "-Dlog4j.configuration=log4j.properties")
    
    
    libraryDependencies ++= Seq(
      "com.groupon.sparklint" %% "sparklint-spark162" % "1.0.4" excludeAll (
        ExclusionRule(organization = "org.apache.spark")
        ),
      "org.apache.spark" %% "spark-core" % "1.6.2" % "provided",
      "org.apache.spark" %% "spark-sql" % "1.6.2" % "provided",
      "org.apache.spark" %% "spark-streaming" % "1.6.2" % "provided",
      "org.apache.spark" %% "spark-streaming-kafka" % "1.6.2",
      "com.datastax.spark" %% "spark-cassandra-connector" % "1.6.0"
    )
    

    现在问题解决了。

    【讨论】:

      【解决方案2】:

      这是将消息从 Kafka 提取到 Spark Streams 以进行词频计数的基本代码。代码是为本地机器定制的。

      import org.apache.kafka.clients.consumer.ConsumerConfig
      import org.apache.kafka.common.serialization.StringDeserializer
      import org.apache.spark.SparkConf
      import org.apache.spark.streaming._
      import org.apache.spark.sql.SparkSession
      import org.apache.spark.streaming.kafka010.LocationStrategies._
      import org.apache.spark.streaming.kafka010.ConsumerStrategies._
      import org.apache.spark.streaming.kafka010._`
      
       object WordFreqCount {
         def main( args:Array[String] ){
           println("Start")
           val conf = new SparkConf()
          .setMaster("local[*]")
          .setAppName("KafkaReceiver")
          .set("spark.driver.bindAddress","127.0.0.1")
          println("conf created")
          val spark = SparkSession.builder().config(conf).getOrCreate()
          val sc = spark.sparkContext
          val ssc = new StreamingContext(sc, Seconds(10))
          println("ssc created")
          val topics = "wctopic"
          val brokers = "127.0.0.1:9092"
          val groupId = "wcgroup"
          val topicsSet = topics.split(",").toSet
          val kafkaParams = Map[String, Object](
            "bootstrap.servers" -> brokers,
            "key.deserializer" -> classOf[StringDeserializer],
            "value.deserializer" -> classOf[StringDeserializer],
            "group.id" -> groupId,
            "auto.offset.reset" -> "latest",
            "enable.auto.commit" -> (false: java.lang.Boolean)
          )
          val messages = KafkaUtils.createDirectStream[String, String](
            ssc,
            PreferConsistent,
            Subscribe[String, String](topicsSet, kafkaParams))
          // Get the lines, split them into words, count the words and print
          val lines = messages.map(_.value)
          val words = lines.flatMap(_.split(" "))
          val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
          wordCounts.print()
          //val kafkaStream = KafkaUtils.createStream(ssc, "127.0.0.1:2181", "wcgroup",Map("wctopic" -> 1))
          messages.print()  //prints the stream of data received
          ssc.start()
          ssc.awaitTermination()
          println("End")`
       }
      
      }
      

      【讨论】:

      • 请对您的代码提供一些基本解释,以便我们大家学习。
      猜你喜欢
      • 1970-01-01
      • 2016-11-22
      • 1970-01-01
      • 2018-02-13
      • 2018-11-16
      • 2020-07-21
      • 2019-07-21
      • 2017-05-26
      相关资源
      最近更新 更多