【问题标题】:How can I convert the df column [JSON_Format] into multiple columns in PySpark?如何将 df 列 [JSON_Format] 转换为 PySpark 中的多个列?
【发布时间】:2020-04-29 23:28:11
【问题描述】:

我从 Kafka 获得 JSON 格式的数据,并在 PySpark 中将数据作为 DataFrame 读取。

我从 Kafka 获取数据后,它以 DataFrame 格式出现:

DataFrame[value: string]

但是,该值包含 JSON / DICT 格式。

打印报表并返回:

def print_row(row):
    print(row)
    pass

testing.writeStream.foreach(print_row).start()
Row(value='{col_1 =80.0, timestamp=2020-01-13T08:58:58.164Z}')

如何将值 (JSON) 转换为 DATAFRAME 列,例如:

col_1  timestamp
80.0   2020-01-13T08:58:58.164Z

【问题讨论】:

    标签: python apache-spark pyspark apache-kafka spark-structured-streaming


    【解决方案1】:

    定义架构并解析 JSON。

    复制自https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html

    # value schema: { "a": 1, "b": "string" }
    schema = StructType().add("a", IntegerType()).add("b", StringType())
    df.select( \
      col("key").cast("string"),
      from_json(col("value").cast("string"), schema))
    

    【讨论】:

      【解决方案2】:

      可以为由 RDD[String] 表示的 JSON 数据集创建 DataFrame,每个字符串存储一个 JSON 对象。

      jsonStrings = ['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']
      otherPeopleRDD = sc.parallelize(jsonStrings)
      otherPeople = spark.read.json(otherPeopleRDD)
      otherPeople.show()
      

      【讨论】:

      • 因为我已通过以下链接从 Kafka 读取数据。它返回 DataFrame 格式而不是 JSON 字符串。 link
      猜你喜欢
      • 2021-09-27
      • 1970-01-01
      • 2022-11-29
      • 2016-09-26
      • 2018-08-17
      • 1970-01-01
      • 1970-01-01
      • 2017-09-13
      • 1970-01-01
      相关资源
      最近更新 更多