【问题标题】:How to read stream of structured data and write to Hive table如何读取结构化数据流并写入 Hive 表
【发布时间】:2019-10-08 22:57:19
【问题描述】:

需要从 Kafka 流中读取结构化数据流,并将其写入已经存在的 Hive 表中。经分析,其中一种选择似乎是对 Kafka 源执行 readStream,然后对 HDFS 文件路径中的 File sink 执行 writeStream。

我的问题是——是否可以直接写入 Hive 表?或者,对于这个用例,是否有可以遵循的解决方法?

编辑1:

.foreachBatch - 似乎可以正常工作,但存在下面提到的问题

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.SaveMode
val spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
import spark.implicits._
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType};
//subscribe to kafka topic
val csvDF = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "xxxxxx912:9092").option("subscribe", "testtest").load()
val abcd = csvDF.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)","CAST(topic AS STRING)","CAST(offset AS STRING)","CAST(partition AS STRING)","CAST(timestamp AS STRING)").as[(String, String, String, String, String, String)]
val query = abcd.writeStream.foreachBatch((batchDs: Dataset[_], batchId: Long) => {batchDs.write.mode(SaveMode.Append).insertInto("default.6columns");}).option("quote", "\u0000").start()

hive> select * from 6columns;
OK
0       A3,L1,G1,P1,O1,101,TXN1     testtest        122     0       2019-05-23 12:38:49.515
0       A3,L1,G1,P1,O1,102,TXN2     testtest        123     0       2019-05-23 12:38:49.524
0       A1,L1,G1,P1,O1,100,TXN3     testtest        124     0       2019-05-23 12:38:49.524
0       A2,L2,G1,P1,O2,100,TXN4     testtest        125     0       2019-05-23 12:38:49.524
0       A3,L1,G1,P1,O1,103,TXN5     testtest        126     0       2019-05-23 12:38:54.525
0       A3,L1,G1,P1,O1,104,TXN6     testtest        127     0       2019-05-23 12:38:55.525
0       A4,L1,G1,P1,O1,100,TXN7     testtest        128     0       2019-05-23 12:38:56.526
0       A1,L1,G1,P1,O1,500,TXNID8   testtest        129     0       2019-05-23 12:38:57.526
0       A6,L2,G2,P1,O1,500,TXNID9   testtest        130     0       2019-05-23 12:38:57.526

我正在寻找的是拆分 Kafka 消息的 value 属性,以便数据类似于 Hive 表它变成一个 12 列表(A3、L1、G1、P1、O1、101、TXN1 - 拆分为 7属性)。需要一些类似于我在编写数据帧时所做的 .option("quote", "\u0000") 的额外转换。但似乎不起作用。

【问题讨论】:

  • 在 HDFS 文件路径中写入有什么问题? HDFS 文件夹=建立在它之上的 Hive 表。 (表 DDL 中的位置属性)

标签: apache-spark hive spark-structured-streaming


【解决方案1】:

一旦您设置好流并从 kafka 消费,您就可以像这样使用 forEachBatch 函数。

val yourStream = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .load()

val query = yourStream.writeStream.foreachBatch((batchDs: Dataset[_], batchId: Long) => {
  batchDs
      .write
      .mode(SaveMode.Append)
      .insertInto("your_db.your_table");
}).start()

query.awaitTermination()

要将字符串按, 拆分为单独的列,您可以使用split 函数将所有由, 分隔的项目放入一个数组中,然后您可以通过索引单独选择项目,例如@987654326 @ 获取第一个元素。

所以替换

val abcd = csvDF.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)","CAST(topic AS STRING)","CAST(offset AS STRING)","CAST(partition AS STRING)","CAST(timestamp AS STRING)").as[(String, String, String, String, String, String)]

val abcd = csvDF.selectExpr("CAST(key AS STRING)", "SPLIT(CAST(value AS STRING), ',')[0]", "SPLIT(CAST(value AS STRING), ',')[1]", "SPLIT(CAST(value AS STRING), ',')[2]", "SPLIT(CAST(value AS STRING), ',')[3]", "SPLIT(CAST(value AS STRING), ',')[4]", "SPLIT(CAST(value AS STRING), ',')[5]", "SPLIT(CAST(value AS STRING), ',')[6]", "CAST(topic AS STRING)", "CAST(offset AS STRING)", "CAST(partition AS STRING)", "CAST(timestamp AS STRING)").as[(String, String, String, String, String, String, String, String, String, String, String)]

【讨论】:

  • 这似乎符合我们的预期。我已经编辑了后续问题的问题。如果我可以拆分消息的“价值”,那么我将完成此操作。
  • "SPLIT(CAST(value AS STRING), ',')[0]" , "SPLIT(CAST(value AS STRING), ',')[1]", "SPLIT(CAST(value AS STRING), ',')[2]", "SPLIT(CAST(value AS STRING), ',')[3]", "SPLIT(CAST(value AS STRING), ',')[4]", "SPLIT(CAST(value AS STRING), ',')[5]", "SPLIT(CAST(value AS STRING), ',')[6]"替换"CAST(value AS STRING)"
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2018-09-03
  • 2019-01-19
  • 2020-10-06
  • 2020-06-16
  • 2019-05-26
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多