【发布时间】:2020-03-20 00:20:19
【问题描述】:
我正在使用 Kafka 运行 Spark 流式传输程序并出现错误。所有导入都已完成,看起来已解决,没有任何问题。
我使用 IntelliJ IDEA 编写了一些代码,第一次运行程序时出错,我是 Java 的新手,但来自 C# 背景。所以无法理解这个问题。 zookeeper 服务启动,同时 kafka-server 启动并创建了一个名为 topicA 的主题。 Producer 也已准备好流式传输数据,但我在 IntelliJ 中运行代码以侦听队列时遇到问题
def main(args: Array[String]) {
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "0",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val conf = new SparkConf().setAppName("Simple Streaming Application")
val ssc = new StreamingContext(conf, Seconds(5))
val topics = Array("topicA")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.foreachRDD { rdd =>
// Get the offset ranges in the RDD
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (o <- offsetRanges) {
println(s"${o.topic} ${o.partition} offsets: ${o.fromOffset} to ${o.untilOffset}")
}
}
ssc.start
// the above code is printing out topic details every 5 seconds
// until you stop it.
ssc.stop(stopSparkContext = false)
}
产生的异常是:
Exception in thread "main" java.lang.VerifyError: class scala.collection.mutable.WrappedArray overrides final method toBuffer.()Lscala/collection/mutable/Buffer;
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.SparkConf.loadFromSystemProperties(SparkConf.scala:75)
at org.apache.spark.SparkConf.<init>(SparkConf.scala:70)
at org.apache.spark.SparkConf.<init>(SparkConf.scala:57)
at sparkStreamClass$.main(sparkStreamClass.scala:20)
at sparkStreamClass.main(sparkStreamClass.scala)
这是我的 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.learnStreaming</groupId>
<artifactId>sparkProjectArtifact</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.3.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.3.1</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
【问题讨论】:
-
无法按原样调试。你能显示包含
main函数的整个源文件吗?您是使用 SBT、Maven、Gradle 等来检索必要的库、配置编译器等还是手动安装它们?如果是前者,您能否发布您的构建文件(例如build.sbt、pom.xml等)。您使用的是哪个版本的 Scala 以及哪个版本的 Spark(后者必须支持前者)? -
一种猜测是您使用了错误版本的 Scala。不幸的是,Scala 主要版本不兼容二进制。因此,如果您向我们展示您的构建配置(包括所有依赖项),这将非常有帮助
-
我系统上安装的scala版本是2.13,spark版本是2.3.1
标签: scala apache-spark apache-kafka spark-streaming