【问题标题】:How structured streaming dynamically parses kafka's json data结构化流如何动态解析kafka的json数据
【发布时间】:2020-02-11 16:04:38
【问题描述】:

我正在尝试使用结构化流从 Kafka 读取数据。从kafka收到的数据是json格式的。 我的代码如下: 在代码中,我使用 from_json 函数将 json 转换为数据帧以进行进一步处理。

val **schema**: StructType = new StructType()
    .add("time", LongType)
    .add(id", LongType)
    .add("properties",new StructType()
      .add("$app_version", StringType)
      .
      .
    )
val df: DataFrame = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers","...")
    .option("subscribe","...")
    .load()
    .selectExpr("CAST(value AS STRING) as value")
    .select(from_json(col("value"), **schema**))

我的问题是,如果字段增加, 我无法停止 spark 程序手动添加这些字段, 那么如何动态解析这些字段,我尝试了 schema_of_json(), 只能通过第一行来推断字段类型,不适合多级嵌套结构的json数据。

【问题讨论】:

    标签: json apache-spark spark-structured-streaming


    【解决方案1】:

    我的问题是,如果字段增加了,我无法停止spark程序手动添加这些字段,那么我该如何动态解析这些字段

    开箱即用的 Spark Structured Streaming(甚至 Spark SQL)是不可能的。不过有几个解决方案。

    更改代码中的架构并恢复流式查询

    您只需停止流式查询,更改代码以匹配当前架构,然后恢复它。在 Spark Structured Streaming 中,可以使用支持从检查点恢复的数据源。 Kafka 数据源确实支持。

    用户定义函数 (UDF)

    您可以编写一个用户定义函数 (UDF) 来为您执行此动态 JSON 解析。这也是最简单的选择之一。

    新数据源(MicroBatchReader)

    另一种选择是为内置的 Kafka 数据源创建一个扩展,该数据源将执行动态 JSON 解析(类似于 Kafka 反序列化器)。这需要更多的开发,但肯定是可行的。

    【讨论】:

    • 您能否分享一个示例 UDF,它将返回架构以传递给 from_json 函数?
    猜你喜欢
    • 2018-03-21
    • 2020-02-19
    • 2020-04-29
    • 1970-01-01
    • 2020-10-17
    • 2015-01-15
    • 2014-05-23
    • 2018-07-10
    • 2019-08-16
    相关资源
    最近更新 更多