【问题标题】:Apache Spark integration with KafkaApache Spark 与 Kafka 的集成
【发布时间】:2021-01-11 10:22:35
【问题描述】:

我正在学习关于 Kafka 和 Spark 的 Udemy 课程,并且正在学习 Apache Spark 与 Kafka 的集成

下面是apache spark的代码

SparkSession session = SparkSession.builder().appName("KafkaConsumer").master("local[*]").getOrCreate();
  session.sparkContext().setLogLevel("ERROR");
  Dataset<Row> df = session
    .readStream()
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "second_topic").load();
            
df.show();

下面是pom.xml文件的内容

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.example.kafka.spark</groupId>
  <artifactId>Kafka-Spark-Integration-Code</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.0.0</version>
    </dependency> 
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.12</artifactId>
        <version>3.0.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<!--    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.12</artifactId>
        <version>3.0.0</version>
    </dependency> -->
    
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
        <version>3.0.0</version>
   </dependency>
    
 </dependencies>
</project>

但是,当我运行代码时,我遇到了无法解决的错误。我在 MX Linux 上使用 openjdk 8 和 spark 3。谢谢

exception in thread "main" java.lang.ClassFormatError: Invalid code attribute name index 24977 in class file org/apache/spark/sql/execution/columnar/InMemoryRelation
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at org.apache.spark.sql.internal.SharedState.<init>(SharedState.scala:83)
    at org.apache.spark.sql.SparkSession.$anonfun$sharedState$1(SparkSession.scala:132)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.SparkSession.sharedState$lzycompute(SparkSession.scala:132)
    at org.apache.spark.sql.SparkSession.sharedState(SparkSession.scala:131)
    at org.apache.spark.sql.internal.BaseSessionStateBuilder.build(BaseSessionStateBuilder.scala:323)
    at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$instantiateSessionState(SparkSession.scala:1107)
    at org.apache.spark.sql.SparkSession.$anonfun$sessionState$2(SparkSession.scala:157)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:155)
    at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:152)
    at org.apache.spark.sql.streaming.DataStreamReader.<init>(DataStreamReader.scala:519)
    at org.apache.spark.sql.SparkSession.readStream(SparkSession.scala:657)
    at example.code.spark.kafka.KafkaSparkConsumer.main(KafkaSparkConsumer.java:19)

【问题讨论】:

    标签: apache-spark apache-kafka spark-structured-streaming spark-kafka-integration


    【解决方案1】:

    您可以按照Structured Streaming + Kafka Integration Guide中给出的示例进行操作:

    SparkSession session = SparkSession.builder()
      .appName("KafkaConsumer")
      .master("local[*]")
      .getOrCreate();
    
    Dataset<Row> df = spark
      .readStream()
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "second_topic")
      .load()
      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
    

    消费数据。 Structured Streaming Programming Guide 向您展示了如何将数据打印到控制台:

    StreamingQuery query = df
      .writeStream()
      .format("console")
      .outputMode("append")
      .option("checkpointLocation", "path/to/checkpoint/dir")
      .start();
    
    query.awaitTermination();
    

    【讨论】:

    • 谢谢。我尝试了上面的代码,但同样的错误。我在 Windows 机器上尝试了相同的示例,但失败并出现不同的错误(未找到 kafka 依赖项),但通过了上述代码的 readStream 方法。我想知道它是否与我在 linux 机器上运行的 java 版本或环境有关?
    • 是的,我能够运行非常基本的 spark 代码。我正在使用 eclipse 并从 eclipse 运行代码。这是我第一次尝试流应用程序,所以我不知道发生了什么,尽管我在两个环境(windows 和 linux)中都有相同的 pom.xml 文件,所以我相信依赖项是相同的。
    • 所以我卸载了 openjdk 8 并从 oracle 安装了 java,它开始工作了。但是现在我的流媒体工作刚刚退出。我虽然我会连续跑。
    • 你确定给query.awaitTermination打电话了吗?
    • 谢谢我错过了。非常感谢。它解决了这个问题。
    猜你喜欢
    • 1970-01-01
    • 2019-09-18
    • 2015-07-29
    • 2017-01-01
    • 2017-04-02
    • 2018-01-09
    • 2017-11-27
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多