【发布时间】:2018-07-10 13:24:22
【问题描述】:
我有一个非常大的 pyspark 数据框。我需要将每一行的数据帧转换为 JSON 格式的字符串,然后将字符串发布到 Kafka 主题。我最初使用以下代码。
for message in df.toJSON().collect():
kafkaClient.send(message)
但是数据框非常大,因此在尝试collect() 时会失败。
我正在考虑使用UDF,因为它会逐行处理它。
from pyspark.sql.functions import udf, struct
def get_row(row):
json = row.toJSON()
kafkaClient.send(message)
return "Sent"
send_row_udf = F.udf(get_row, StringType())
df_json = df.withColumn("Sent", get_row(struct([df[x] for x in df.columns])))
df_json.select("Sent").show()
但我收到一个错误,因为列是输入到函数而不是行。
为了说明的目的,我们可以使用下面的 df,我们可以假设 Col1 和 Col2 必须被发送过来。
df= spark.createDataFrame([("A", 1), ("B", 2), ("D", 3)],["Col1", "Col2"])
每行的 JSON 字符串:
'{"Col1":"A","Col2":1}'
'{"Col1":"B","Col2":2}'
'{"Col1":"D","Col2":3}'
【问题讨论】:
-
这行得通吗?
json = df.apply(lambda x: getattr(x, 'to_json')(), axis=1) -
我收到了这个错误
AttributeError: 'DataFrame' object has no attribute 'apply' -
@JamesSchinner 这不是熊猫
-
哦,对不起,我愚蠢地以为是
pandasdf -
错误信息是什么?
标签: python json pyspark spark-dataframe