【问题标题】:Spark Schema JSON nestedSpark Schema JSON 嵌套
【发布时间】:2020-03-07 15:52:06
【问题描述】:

我正在尝试使用来自 binance websocket 的 json 数据。

现在我的架构看起来像这样:

val schema = new StructType()
  .add("e",StringType)
  .add("E",StringType)
  .add("s",StringType)
  .add("k",StringType)
    .add("t",IntegerType)
    .add("T",IntegerType)
    .add("s",StringType)
    .add("i",StringType)
    .add("f",StringType)
    .add("L",StringType)
    .add("o",DoubleType)
    .add("c",DoubleType)
    .add("h",DoubleType)
    .add("l",DoubleType)
    .add("v",DoubleType)
    .add("n",IntegerType)
    .add("x",StringType)
    .add("q",DoubleType)
    .add("V",DoubleType)
    .add("Q",DoubleType)
    .add("B",StringType)

我从我的 kafka 主题中收到这条消息:

{"e":"kline","E":1583595170076,"s":"BTCUSDT","k":{"t":1583595120000,"T":1583595179999,"s":"BTCUSDT","i":"1m","f":47069029,"L":47069101,"o":"9111.22","c":"9114.90","h":"9114.91","l":"9109.65","v":"30.297","n":73,"x":false,"q":"276055.09390","V":"11.517","Q":"104946.56519","B":"0"}}

如您所见,消息嵌套在“k”键下。

我在 spark 中的输出目前看起来像这样:

 root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)


-------------------------------------------

https://imgur.com/a/9LPu9z6 

数据框的图像,因为我无法在不破坏框架的情况下将其粘贴到论坛中。

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    在您的架构中,您需要将 "k" 设置为 StructType()

    • DoubleType 更改为 StringType,因为双精度数据包含在示例数据中的 "" 中。

    Example:

    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions._
    
    //schema for json
    val schema = new StructType().
    add("e",StringType).
    add("E",StringType).
    add("s",StringType).
    add("k",new StructType().
        add("t",LongType).
        add("T",LongType).
        add("s",StringType).
        add("i",StringType).
        add("f",StringType).
        add("L",StringType).
        add("o",StringType).
        add("c",StringType).
        add("h",StringType).
        add("l",StringType).
        add("v",StringType).
        add("n",IntegerType).
        add("x",BooleanType).
        add("q",StringType).
        add("V",StringType).
        add("Q",StringType).
        add("B",StringType)
        )
    
    //sample data
    val jsn=Seq("""{"e":"kline","E":1583595170076,"s":"BTCUSDT","k":{"t":1583595120000,"T":1583595179999,"s":"BTCUSDT","i":"1m","f":47069029,"L":47069101,"o":"9111.22","c":"9114.90","h":"9114.91","l":"9109.65","v":"30.297","n":73,"x":false,"q":"276055.09390","V":"11.517","Q":"104946.56519","B":"0"}}""")
    
    
    spark.read.schema(schema).json(jsn.toDS).select("*","k.*").drop("k").show()
    
    //+-----+-------------+-------+-------------+-------------+-------+---+--------+--------+-------+-------+-------+-------+------+---+-----+------------+------+------------+---+
    //|    e|            E|      s|            t|            T|      s|  i|       f|       L|      o|      c|      h|      l|     v|  n|    x|           q|     V|           Q|  B|
    //+-----+-------------+-------+-------------+-------------+-------+---+--------+--------+-------+-------+-------+-------+------+---+-----+------------+------+------------+---+
    //|kline|1583595170076|BTCUSDT|1583595120000|1583595179999|BTCUSDT| 1m|47069029|47069101|9111.22|9114.90|9114.91|9109.65|30.297| 73|false|276055.09390|11.517|104946.56519|  0|
    //+-----+-------------+-------+-------------+-------------+-------+---+--------+--------+-------+-------+-------+-------+------+---+-----+------------+------+------------+---+
    

    然后您可以使用数据框Column casting 将所有必填字段转换为 float..etc 类型。

    【讨论】:

      猜你喜欢
      • 2019-08-20
      • 1970-01-01
      • 1970-01-01
      • 2020-12-26
      • 2019-06-13
      • 1970-01-01
      • 2022-01-14
      • 1970-01-01
      • 2019-04-11
      相关资源
      最近更新 更多