【发布时间】:2020-02-11 16:04:38
【问题描述】:
我正在尝试使用结构化流从 Kafka 读取数据。从kafka收到的数据是json格式的。 我的代码如下: 在代码中,我使用 from_json 函数将 json 转换为数据帧以进行进一步处理。
val **schema**: StructType = new StructType()
.add("time", LongType)
.add(id", LongType)
.add("properties",new StructType()
.add("$app_version", StringType)
.
.
)
val df: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers","...")
.option("subscribe","...")
.load()
.selectExpr("CAST(value AS STRING) as value")
.select(from_json(col("value"), **schema**))
我的问题是,如果字段增加, 我无法停止 spark 程序手动添加这些字段, 那么如何动态解析这些字段,我尝试了 schema_of_json(), 只能通过第一行来推断字段类型,不适合多级嵌套结构的json数据。
【问题讨论】:
标签: json apache-spark spark-structured-streaming