【问题标题】:How to parse a kafka topic which contains data in the form of nested json?如何解析包含嵌套 json 形式数据的 kafka 主题?
【发布时间】:2021-07-15 09:53:22
【问题描述】:

我正在尝试阅读一个 kafka 主题并将其流式传输到我的接收器。为了读取数据,我编写了以下代码。

json 中的主题数据:

{
"HiveData": {
"Tablename": "HiveTablename1",
"Rowcount": "3213423",
"lastupdateddate": "2021-02-24 13:04:14"
},
"HbaseData": [
{
"Tablename": "HbaseTablename1",
"Rowcount": "23543",
"lastupdateddate": "2021-02-23 12:03:11"
}
],
"PostgresData": [
{
"Tablename": "PostgresTablename1",
"Rowcount": "23454345",
"lastupdateddate": "2021-02-23 12:03:11"
}
]
}

下面是我写的解析主题的代码:

 def streamData(): DataFrame = {
    val kafkaDF = spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", "server:port")
      .option("subscribe", "topic_name")
      .load()
    kafkaDF.select(from_json(col("HiveData"), topic_schema).as("HiveData")).selectExpr("HiveData.tablename as table", "HiveData.Rowcount as rowcount", "HiveData.lastupdateddate as lastupdateddate") 
    kafkaDF
 }

但如果 json 格式为:

{"Tablename": "HiveTablename1","Rowcount": "3213423","lastupdateddate": "2021-02-24 13:04:14"}

我想解析 json 并将 HiveData 放入一个单独的数据帧和一个单独的 HBaseData 数据帧和 PostgresData 相同的数据帧。如果 json 数据在一行中,我编写的代码就可以工作。 如果数据是本问题开头提到的嵌套格式,谁能告诉我如何将数据解析为多个数据帧? 非常感谢任何帮助。

【问题讨论】:

    标签: json apache-spark apache-kafka spark-streaming


    【解决方案1】:

    尝试添加

    option("multiline", "true")

    【讨论】:

      猜你喜欢
      • 2020-08-26
      • 2017-02-24
      • 1970-01-01
      • 1970-01-01
      • 2015-01-15
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多