【发布时间】:2020-04-29 23:28:11
【问题描述】:
我从 Kafka 获得 JSON 格式的数据,并在 PySpark 中将数据作为 DataFrame 读取。
我从 Kafka 获取数据后,它以 DataFrame 格式出现:
DataFrame[value: string]
但是,该值包含 JSON / DICT 格式。
打印报表并返回:
def print_row(row):
print(row)
pass
testing.writeStream.foreach(print_row).start()
Row(value='{col_1 =80.0, timestamp=2020-01-13T08:58:58.164Z}')
如何将值 (JSON) 转换为 DATAFRAME 列,例如:
col_1 timestamp
80.0 2020-01-13T08:58:58.164Z
【问题讨论】:
标签: python apache-spark pyspark apache-kafka spark-structured-streaming