【问题标题】:Spark structured stream to kudu contextSpark结构化流到kudu上下文
【发布时间】:2020-04-13 19:59:25
【问题描述】:

我想阅读 kafka 主题,然后通过 spark 流将其写入 kudu 表。

我的第一种方法

// sessions and contexts
val conf = new SparkConf().setMaster("local[2]").setAppName("TestMain")
val sparkSession = SparkSession.builder().config(conf).getOrCreate()
val sparkContext = sparkSession.sparkContext
val kuduContext = new KuduContext("...", sparkContext);

// structure
val schema: StructType = StructType(
  StructField("userNo", IntegerType, true) ::
  StructField("bandNo", IntegerType, false) ::
  StructField("ipv4", StringType, false) :: Nil);

// kudu - prepare table
kuduContext.deleteTable("test_table");
kuduContext.createTable("test_table", schema, Seq("userNo"), new CreateTableOptions()
  .setNumReplicas(1)
  .addHashPartitions(List("userNo").asJava, 3))

// get stream from kafka
val parsed = sparkSession
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "...")
  .option("startingOffsets", "latest")
  .option("subscribe", "feed_api_band_get_popular_post_list")
  .load()
  .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))

// write it to kudu
kuduContext.insertRows(parsed.toDF(), "test_table");

现在它抱怨了

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)

我的第二种方法

似乎我将代码更改为使用传统的 KafkaUtils.createDirectStream

KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
).foreachRDD(rdd => {
  rdd.foreach(record => {
    // write to kudu.............
    println(record.value());
  })
});

ssc.start();
ssc.awaitTermination();

那么,哪一个是正确的方法?或者有什么办法让它从第一种方法运行?

Spark 版本是 2.2.0。

【问题讨论】:

  • 您是否设法使用第一种方法?

标签: apache-spark-sql spark-streaming apache-kudu


【解决方案1】:

这两种方法似乎都是正确的。第一个使用 Spark 结构化流式处理方式,其中数据以表格形式附加。第二种方法是通过传统的 DStream 做事方式来实现的

【讨论】:

    【解决方案2】:

    我相信目前没有 Kudu 支持将 KuduContext 与 Spark 结构化流结合使用。我遇到了类似的问题,不得不依靠使用传统的 Kudu 客户端并实现 ForeachWriter[Row] 类。我使用了示例here 并且能够实现解决方案。

    【讨论】:

      【解决方案3】:

      第一种方法是不正确的,正如您已经从错误中看到的那样,非常清楚:Queries with streaming sources must be executed with writeStream.start()。这只适用于批处理。

      第二个使用DStream,所以不是结构化流。

      有第 3 和第 4 种方法。

      从 Kudu 1.9.0 开始,此 issue 固定支持结构化流,并按预期使用:

          parsed
            .writeStream
            .format("kudu")
            .option("kudu.master", kuduMaster)
            .option("kudu.table", tableName)
            .option("kudu.operation", operation)
            .start()
      

      请注意,如果您使用的是 Cloudera,此方法仅适用于 cdh6.2.0 及更高版本:

      <!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-spark2 -->
      <dependency>
          <groupId>org.apache.kudu</groupId>
          <artifactId>kudu-spark2_2.11</artifactId>
          <version>1.9.0-cdh6.2.0</version>
          <scope>test</scope>
      </dependency>
      

      我的解决方案是查看code from SparkContext,看看kuduContext.insertRows(df, table) 和其他方法做了什么,然后创建一个ForeachWriter[Row]

      val kuduContext = new KuduContext(master, sparkContext)
      
        parsed
          .toDF()
          .writeStream
          .foreach(new ForeachWriter[Row] {
            override def open(partitionId: Long, version: Long): Boolean =
              kuduContext.tableExists(table)
      
            override def process(value: Row): Unit = {
              val kuduClient = kuduContext.syncClient
              val kuduSession = kuduClient.newSession()
              kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND)
              kuduSession.setIgnoreAllDuplicateRows(ignoreDuplicates)
      
              val kuduTable = kuduClient.openTable(kuduSinkConfiguration.table)
              val operation = getOperationFunction(kuduTable) //get the kuduTable.newInsert(), newUpsert(), etc.
              kuduSession.setIgnoreAllDuplicateRows(ignoreDuplicates)
      
              val row = operation.getRow
              row.add("userNo", value.getAs[Int]("userNo"))
              row.add("bandNo", value.getAs[Int]("bandNo"))
              row.add("ipv4", value.getAs[String]("ipv4"))
              kuduSession.apply(operation)
      
              kuduSession.flush()
              kuduSession.close()
            }
      
            override def close(errorOrNull: Throwable): Unit = Unit
      
          })
          .start()
      

      【讨论】:

        【解决方案4】:

        我们还可以使用 Spark 版本 2.2.0 和 cloudera 版本 CDH 5.14 将结构化流数据加载到 Kudu 表中。您只需要下载对应于 CDH6.2 的 spark-kudu-2.2.11 jar 并将其作为 jar 传递到您的 spark-submit 命令中。这将在下面的语句中识别 kudu 格式并轻松加载数据帧。

        已解析 .writeStream .format("kudu") .option("kudu.master", kuduMaster) .option("kudu.table", tableName) .option("kudu.operation", 操作) .start()

        可以从https://mvnrepository.com/artifact/org.apache.kudu/kudu-spark2_2.11/1.10.0-cdh6.3.2下载JAR

        Spark-提交声明:

        spark2-submit --master local[*] --deploy-mode client --jars spark-sql-kafka-0-10_2.11-2.2.0.jar,kafka-clients-0.10.0.0.jar, spark-streaming-kafka-0-10_2.11-2.2.0.jar,kudu-spark2_2.11-1.10.0-cdh6.3.2.jar,kudu-client-1.10.0-cdh6.3.2.jar /path_of_python_code/ rdd-stream-read.py

        注意- Kudu-client 是可选的。可能必须与集群部署模式一起使用。

        使用的writestream语句:

        query=dfCols.writeStream.format("kudu").option("kudu.master", "host:7051,host:7051,host:7051").option("kudu.table","impala: :db.kudu_table_name").option("kudu.operation","upsert").option("checkpointLocation","file:///path_of_dir/checkpoint/").start()

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2017-05-04
          • 2019-01-10
          • 2020-01-06
          • 2019-12-21
          • 2017-03-06
          • 1970-01-01
          • 2018-03-24
          • 1970-01-01
          相关资源
          最近更新 更多