【问题标题】:PySpark - Convert to JSON row by rowPySpark - 逐行转换为 JSON
【发布时间】:2018-07-10 13:24:22
【问题描述】:

我有一个非常大的 pyspark 数据框。我需要将每一行的数据帧转换为 JSON 格式的字符串,然后将字符串发布到 Kafka 主题。我最初使用以下代码。

for message in df.toJSON().collect():
        kafkaClient.send(message) 

但是数据框非常大,因此在尝试collect() 时会失败。

我正在考虑使用UDF,因为它会逐行处理它。

from pyspark.sql.functions import udf, struct

def get_row(row):
    json = row.toJSON()
    kafkaClient.send(message) 
    return "Sent"

send_row_udf = F.udf(get_row, StringType())
df_json = df.withColumn("Sent", get_row(struct([df[x] for x in df.columns])))
df_json.select("Sent").show()

但我收到一个错误,因为列是输入到函数而不是行。

为了说明的目的,我们可以使用下面的 df,我们可以假设 Col1 和 Col2 必须被发送过来。

df= spark.createDataFrame([("A", 1), ("B", 2), ("D", 3)],["Col1", "Col2"])

每行的 JSON 字符串:

'{"Col1":"A","Col2":1}'
'{"Col1":"B","Col2":2}'
'{"Col1":"D","Col2":3}'

【问题讨论】:

  • 这行得通吗? json = df.apply(lambda x: getattr(x, 'to_json')(), axis=1)
  • 我收到了这个错误AttributeError: 'DataFrame' object has no attribute 'apply'
  • @JamesSchinner 这不是熊猫
  • 哦,对不起,我愚蠢地以为是pandasdf
  • 错误信息是什么?

标签: python json pyspark spark-dataframe


【解决方案1】:

你不能像这样使用select。使用foreach/foreachPartition

import json

def send(part):
    kafkaClient = ...
    for r in part:
        kafkaClient.send(json.dumps(r.asDict()))

如果您需要诊断信息,请使用Accumulator

在当前版本中,我会直接使用 Kafka 源代码(2.0 及更高版本):

from pyspark.sql.functions import to_json, struct

(df.select(to_json(struct([df[x] for x in df.columns])).alias("value"))
    .write
    .format("kafka")
    .option("kafka.bootstrap.servers", bootstrap_servers)
    .option("topic", topic)
    .save())

例如,您将需要 Kafka SQL 包:

--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.1

【讨论】:

  • 我收到此错误ImportError: cannot import name to_json。我正在使用火花 1.6.3。很遗憾,我无法使用最新版本。
  • 在这种情况下,你会被第一个卡住。这种过时的版本没有 Kafka SQL。
【解决方案2】:

这是一种适合您的方法。

将列名(键)和列值收集到每行的列表(值)中。然后将它们重新排列成键值对元组列表以传递给dict 构造函数。最后,使用json.dumps()dict 转换为字符串。

将键和值收集到列表中

将列名和值收集到一个列表中,但将键和值交错。

import pyspark.sql.functions as f

def kvp(cols, *args):
    a = cols
    b = map(str, args)
    c = a + b
    c[::2] = a
    c[1::2] = b
    return c

kvp_udf = lambda cols: f.udf(lambda *args: kvp(cols, *args), ArrayType(StringType()))
df.withColumn('kvp', kvp_udf(df.columns)(*df.columns)).show()
#+----+----+------------------+
#|Col1|Col2|               kvp|
#+----+----+------------------+
#|   A|   1|[Col1, A, Col2, 1]|
#|   B|   2|[Col1, B, Col2, 2]|
#|   D|   3|[Col1, D, Col2, 3]|
#+----+----+------------------+

将 Key-Value-Pair 列传递给 dict 构造函数

使用json.dumps()dict 转换为JSON 字符串。

import json
df.withColumn('kvp', kvp_udf(df.columns)(*df.columns))\
    .select(
        f.udf(lambda x: json.dumps(dict(zip(x[::2],x[1::2]))), StringType())(f.col('kvp'))\
        .alias('json')
    )\
    .show(truncate=False)
#+--------------------------+
#|json                      |
#+--------------------------+
#|{"Col2": "1", "Col1": "A"}|
#|{"Col2": "2", "Col1": "B"}|
#|{"Col2": "3", "Col1": "D"}|
#+--------------------------+

注意:很遗憾,这会将所有数据类型转换为字符串。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-08-30
    • 2019-04-30
    • 1970-01-01
    • 1970-01-01
    • 2020-04-02
    • 1970-01-01
    • 2018-09-15
    • 2021-03-26
    相关资源
    最近更新 更多