【问题标题】:Unable to read kafka using spark sql无法使用 spark sql 读取 kafka
【发布时间】:2019-11-04 08:16:24
【问题描述】:

我正在尝试使用 spark 阅读 kafka,但我猜遇到了一些与库相关的问题。

我正在向 kafka 主题推送一些事件,我可以通过 kafka 控制台消费者阅读但无法通过 spark 阅读。我正在使用 spark-sql-kafka 库,该项目是用 maven 编写的。 Scala 版本是 2.11.12,spark 版本是 2.4.3。

            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.4.3</version>
            <scope>provided</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
            <version>2.4.3</version>
            <scope>provided</scope>
        </dependency>

我的java代码如下:-

SparkSession spark = SparkSession.builder()
                .appName("kafka-tutorials")
                .master("local[*]")
                .getOrCreate();

        Dataset<Row> rows = spark.readStream().
                format("kafka").option("kafka.bootstrap.servers", "localhost:9092")
                .option("subscribe", "meetup-trending-topics")
                .option("startingOffsets", "latest")
                .load();

        rows.writeStream()
                .outputMode("append")
                .format("console")
                .start();

        spark.streams().awaitAnyTermination();
        spark.stop();

以下是我收到的错误消息:-

线程“main”org.apache.spark.sql.AnalysisException 中的异常:找不到数据源:kafka。请按照《Structured Streaming + Kafka Integration Guide》部署部分部署应用; 在 org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:652) 在 org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:161)

解决方案:- 1)创建 uber jar 或 ii)--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3 我之前在 mainclass 之后给出了 --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3 选项。

【问题讨论】:

标签: java maven apache-spark apache-kafka spark-streaming


【解决方案1】:

这个:

<scope>provided</scope>

表示您有责任提供适当的 jar。我(和许多其他人)更喜欢避免使用这个范围,而是构建一个 uberjar 来部署。

【讨论】:

  • 赞成这个,因为它帮助我理解了 uberjar 的概念并且几乎回答了实际问题:)
  • 但是您应该将此范围用于提供的 Spark 库(sql、流式传输、核心等)。否则,您的 JAR 1) 比它们需要的大 2) 可能与 Spark 的类路径上的资源冲突
  • @cricket_007 我不同意。使用 fat jar 的优点之一是这些库应该优先于提供的库。你的罐子的大小不应该是一个主要问题。
猜你喜欢
  • 2020-09-18
  • 2018-01-09
  • 2021-12-21
  • 2021-12-05
  • 2015-01-26
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-11-06
相关资源
最近更新 更多