【发布时间】:2020-08-04 19:28:40
【问题描述】:
输入数据帧来自键值对中的 Kafka -
输入:
{
"CBT-POSTED-TXN": {
"eventTyp": "TXN-NEW",
"eventCatgry": "CBT-POSTED-TXN"
},
"CBT-BALANCE-CHG": {
"eventTyp": "TXN-NEW",
"eventCatgry": "CBT-BALANCE-CHG",
"enablerEventVer": "1.0.0"
}
}
输出的DataFrame需要是-
第 1 行
{
"eventTyp": "TXN-NEW",
"eventCatgry": "CBT-POSTED-TXN"
}
第 2 行
{
"eventTyp": "TXN-NEW",
"eventCatgry": "CBT-BALANCE-CHG",
"enablerEventVer": "1.0.0"
}
以下是我尝试解析的方式-
override def translateSource(df: DataFrame, spark: SparkSession = SparkUtil.getSparkSession("")): DataFrame = {
import spark.implicits._
val k3ProcessingSchema: StructType = Encoders.product[k3SourceSchema].schema
val dfTranslated=df.selectExpr("CAST(key AS STRING) key", "cast(value as string) value", "CAST(partition as String)", "CAST(offset as String)", "CAST(timestamp as String)").as[(String, String, String, String, String)]
.select(from_json($"value", k3ProcessingSchema).as("k3Clubbed"), $"value", $"key", $"partition", $"offset", $"timestamp")
.select($"k3Clubbed", $"value", $"key", $"partition", $"offset", $"timestamp").filter("k3Clubbed is not null")
.as[(k3SourceSchema, String, String, String, String, String)]
dfTranslated.collect()
df
}
case class k3SourceSchema (
k3SourceValue: Map[String,String]
)
但是代码无法将包含 JSON 格式的 df(value) 列与多个事件解析为字符串(键)和字符串(值)的映射
【问题讨论】: