【问题标题】:spark streaming + kafka - spark session API火花流 + kafka - 火花会话 API
【发布时间】:2017-04-27 07:16:01
【问题描述】:

感谢您帮助使用 spark 2.0.2 运行 spark 流程序。

"java.lang.ClassNotFoundException: Failed to find data source: kafka" 的运行错误。修改后的POM文件如下。

正在创建 Spark,但在调用来自 kafka 的加载时出错。

已创建火花会话:

 val spark = SparkSession
            .builder()
            .master(master)
            .appName("Apache Log Analyzer Streaming from Kafka")
            .config("hive.metastore.warehouse.dir", hiveWarehouse)
            .config("fs.defaultFS", hdfs_FS)
            .enableHiveSupport()
            .getOrCreate()

创建 kafka 流式传输:

    val logLinesDStream = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:2181")
      .option("subscribe", topics)
      .load()

错误信息:

Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark-packages.org

POM.XML:

    <scala.version>2.10.4</scala.version>
        <scala.compat.version>2.10</scala.compat.version>
        <spark.version>2.0.2</spark.version>
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>${spark.version}</version>
        </dependency>
<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.10</artifactId>
            <version>${spark.version}</version>
        </dependency>

       <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-streaming-kafka-0-10_2.10</artifactId>
       <version>${spark.version}</version>
       </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.5</version>
        </dependency>
</dependencies>

【问题讨论】:

  • 嗨@Aavik,你能解决这个错误吗?我也遇到了同样的问题。
  • 是的,问题出在 POM xml 上。 org.apache.sparkspark-sql-kafka-0-10_2.11${spark.version}

标签: scala apache-spark apache-kafka spark-streaming-kafka


【解决方案1】:

当您实际需要 2.0.2 时,您引用的是 Spark 对 Kafka 的 v1.5.1 参考。您还需要使用sql-kafka 进行结构化流式处理:

<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.10</artifactId>
<version>2.0.2</version>

请注意,仅 Kafka >= 0.10 支持 SparkSession API

【讨论】:

  • 感谢您提供详细信息。如建议的那样,修改了 POM 文件,但仍然出现相同的错误。 2.10.42.0.2 我是否遗漏了编译器无法识别“kafka”的任何细节? P.S - 当前的 POM 文件看起来像上面的一个(在问题中修改)
  • 在上一版本中设置了参数“group.id”->“consumergroup”、“metadata.broker.list”->“localhost:9092”。让我试试。
  • 在 jar 构建中,我在路径“/org/apache/spark/sql/execution/datasources/”中看不到 kafka 目录。我可以看到其他源文件格式 - csv、parquet、jdbc、json 和文本。在确定如何获取 kafka 文件格式方面有任何帮助。
  • 只需将 spark 版本从 2.0.0 升级到 2.2.0 以及 spark 版本 2.2.0 的建议依赖项。谢谢你的建议。
【解决方案2】:

我遇到了同样的问题。我已将 spark 版本从 2.0.0 升级到 2.2.0 并添加了 Spark-sql-kafka 依赖项。它对我来说非常有效。请找到依赖项。

<spark.version>2.2.0</spark.version>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>${spark.version}</version>
    <scope>test</scope>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.10.2.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
    <version>${spark.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.2.0</version>
</dependency>

【讨论】:

    【解决方案3】:

    通过更改 POM.XML 修复它

    <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
    

    【讨论】:

    • 你不使用 Scala 2.10 吗?
    • 那么你是如何使用 Scala 2.11 依赖项进行编译的?
    • @YuvalItzchakov, 返回数据类型是 sql.DataFrame。但是从 val logLinesDStream = KafkaUtils.createStream 创建的那个,数据类型是 DStream[String]。 spark.readStream 是开发流应用程序的正确选择吗?我知道 spark.readStream 来自 2.0。谢谢
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-10-11
    • 1970-01-01
    • 2019-04-02
    • 2016-02-07
    • 2015-05-15
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多