【问题标题】:How to run Spark Streaming application with Kafka Direct Stream in IntelliJ IDEA?如何在 IntelliJ IDEA 中使用 Kafka Direct Stream 运行 Spark Streaming 应用程序?
【发布时间】:2020-03-20 00:20:19
【问题描述】:

我正在使用 Kafka 运行 Spark 流式传输程序并出现错误。所有导入都已完成,看起来已解决,没有任何问题。

我使用 IntelliJ IDEA 编写了一些代码,第一次运行程序时出错,我是 Java 的新手,但来自 C# 背景。所以无法理解这个问题。 zookeeper 服务启动,同时 kafka-server 启动并创建了一个名为 topicA 的主题。 Producer 也已准备好流式传输数据,但我在 IntelliJ 中运行代码以侦听队列时遇到问题

def main(args: Array[String]) {
  val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "localhost:9092",
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> "0",
    "auto.offset.reset" -> "latest",
    "enable.auto.commit" -> (false: java.lang.Boolean)
  )
  val conf = new SparkConf().setAppName("Simple Streaming Application")
  val ssc = new StreamingContext(conf, Seconds(5))
  val topics = Array("topicA")
  val stream = KafkaUtils.createDirectStream[String, String](
    ssc,
    PreferConsistent,
    Subscribe[String, String](topics, kafkaParams)
  )

  stream.foreachRDD { rdd =>
    // Get the offset ranges in the RDD
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    for (o <- offsetRanges) {
      println(s"${o.topic} ${o.partition} offsets: ${o.fromOffset} to ${o.untilOffset}")
    }
  }

  ssc.start

  // the above code is printing out topic details every 5 seconds
  // until you stop it.

  ssc.stop(stopSparkContext = false)
}

产生的异常是:

Exception in thread "main" java.lang.VerifyError: class scala.collection.mutable.WrappedArray overrides final method toBuffer.()Lscala/collection/mutable/Buffer;
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.SparkConf.loadFromSystemProperties(SparkConf.scala:75)
at org.apache.spark.SparkConf.<init>(SparkConf.scala:70)
at org.apache.spark.SparkConf.<init>(SparkConf.scala:57)
at sparkStreamClass$.main(sparkStreamClass.scala:20)
at sparkStreamClass.main(sparkStreamClass.scala)

这是我的 pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.learnStreaming</groupId>
    <artifactId>sparkProjectArtifact</artifactId>
    <version>1.0-SNAPSHOT</version>

<dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.3.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.3.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.3.1</version>
        <scope>provided</scope>
    </dependency>

</dependencies>
</project>

【问题讨论】:

  • 无法按原样调试。你能显示包含main 函数的整个源文件吗?您是使用 SBTMavenGradle 等来检索必要的库、配置编译器等还是手动安装它们?如果是前者,您能否发布您的构建文件(例如build.sbtpom.xml 等)。您使用的是哪个版本的 Scala 以及哪个版本的 Spark(后者必须支持前者)?
  • 一种猜测是您使用了错误版本的 Scala。不幸的是,Scala 主要版本不兼容二进制。因此,如果您向我们展示您的构建配置(包括所有依赖项),这将非常有帮助
  • 我系统上安装的scala版本是2.13,spark版本是2.3.1

标签: scala apache-spark apache-kafka spark-streaming


【解决方案1】:

修改了 pom.xml,它对我有用!

 <properties>
    <spark.version>2.1.0</spark.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
</dependencies>

【讨论】:

    【解决方案2】:

    我也遇到了这个问题,是因为使用了错误的Scala版本造成的在你的想法中。

    【讨论】:

      【解决方案3】:

      由于 SCALA 和 SPARK 库之间的版本不兼容,我们遇到了这个问题

      当我进行以下配置时,我遇到了类似的错误: spark-core_2.11 , spark-sql_2.11 , Scala - 2.13

      我在我的系统中安装了 scala 2.11,并在程序模块中添加了新的 Scala SDK 库,它工作正常。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2018-01-13
        • 2016-06-27
        • 2016-07-12
        • 2020-04-01
        • 1970-01-01
        • 2014-02-22
        • 2019-04-30
        • 2022-11-23
        相关资源
        最近更新 更多