【问题标题】:flatten nested json scala code in pyspark在pyspark中展平嵌套的json scala代码
【发布时间】:2021-03-02 11:02:17
【问题描述】:

尝试在 pyspark 中执行以下 scala 代码:

val maxJsonParts = 3 // whatever that number is...
val jsonElements = (0 until maxJsonParts)
                     .map(i => get_json_object($"Payment", s"$$[$i]"))

val newDF = dataframe
  .withColumn("Payment", explode(array(jsonElements: _*)))
  .where(!isnull($"Payment"))

例如,我正在尝试制作一个嵌套列,例如在下面的付款列中:

id name payment
1 James [ {"@id": 1, "currency":"GBP"},{"@id": 2, "currency": "USD"} ]

变成:

id name payment
1 James {"@id": 1, "currency":"GBP"}
1 James {"@id":2, "currency":"USD"}

表架构:

root
|-- id: integer (nullable = true)
|-- Name: string (nullable = true)   
|-- Payment: string (nullable = true)

我尝试在 Pyspark 中编写此代码,但它只是将嵌套列(付款)变为 null:

lst = [range(0,10)]
jsonElem = [F.get_json_object(F.col("payment"), f"$[{i}]") for i in lst]
bronzeDF = bronzeDF.withColumn("payment2", F.explode(F.array(*jsonElem)))
bronzeDF.show()

非常感谢任何帮助。

【问题讨论】:

  • pyspark 的界面和scala 的很相似。如果您有任何具体问题,请尝试自己重写并来这里。 “为我写这个,但用 Python”不是 SO 的问题。

标签: python python-3.x scala apache-spark pyspark


【解决方案1】:

这是另一种方法,它允许您根据正确的模式解析给定的 JSON 以生成支付数组。该解决方案基于from_json 函数,该函数允许您将字符串 JSON 解析为结构类型。

from pyspark.sql.types import IntegerType, StringType, ArrayType, StructField
from pyspark.sql.functions import from_json, explode

data = [
  (1, 'James', '[ {"@id": 1, "currency":"GBP"},{"@id": 2, "currency": "USD"} ]'), 
  (2, 'Tonny', '[ {"@id": 3, "currency":"EUR"},{"@id": 4, "currency": "USD"} ]'), 
]
df = spark.createDataFrame(data, ['id', 'name', 'payment'])

str_schema = 'array<struct<`@id`:int,`currency`:string>>'

# st_schema = ArrayType(StructType([
#                 StructField('@id', IntegerType()),
#                 StructField('currency', StringType())]))

df = df.withColumn("payment", explode(from_json(df["payment"], str_schema)))

df.show()

# +---+-----+--------+
# | id| name| payment|
# +---+-----+--------+
# |  1|James|[1, GBP]|
# |  1|James|[2, USD]|
# |  2|Tonny|[3, EUR]|
# |  2|Tonny|[4, USD]|
# +---+-----+--------+

注意: 如您所见,您可以在模式的字符串表示或ArrayType 之间进行选择。两者都应该产生相同的结果。

【讨论】:

  • 我意识到explode 从字面上做同样的事情:')。我记得我花了几个小时试图找出等效的 pyspark 代码并成功了,而 explode 函数做同样事情的事实却被大大地忽视了
【解决方案2】:

我找到了解决方案:

首先将列转换为字符串类型如下:

bronzeDF = bronzeDF.withColumn("payment2", F.to_json("payment")).drop("payment")

然后可以在列上执行以下代码,将n个嵌套的json对象堆叠为具有相同外键值的单独行:

max_json_parts = 50
lst = [f for f in range(0, max_json_parts, 1)]
jsonElem = [F.get_json_object(F.col("payment2"), f"$[{i}]") for i in lst]
bronzeDF = bronzeDF.withColumn("payment2", F.explode(F.array(*jsonElem))).where(F.col("payment2").isNotNull())

然后转换回具有定义架构的结构,并将对象键分解为单独的列:

bronzeDF = bronzeDF.withColumn("temp", F.from_json("payment2", jsonSchemaPayment)).select("*", "temp.*").drop("payment2")

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-08-13
    • 2022-08-17
    • 2018-10-28
    • 2022-10-12
    • 2014-05-28
    • 2015-04-18
    • 1970-01-01
    • 2020-06-10
    相关资源
    最近更新 更多