【问题标题】:Pyspark explode nested listPyspark 爆炸嵌套列表
【发布时间】:2020-10-04 23:14:50
【问题描述】:

我有以下数据框,我想分解值列,以便每个值都在单独的列中:

id | values
-----------------------
1  | '[[532,969020406,89],[216,969100125,23],[169,39356140000,72],[399,14407358500,188],[377,13761937166.6667,24]]'
2 | '[[532,969020406,89]]'

请注意,值列下的列表可以有不同的长度,并且它们是字符串数据类型。

所需的表格应如下所示:

id | v11 | v12 | v13 | v21 | v22... 
--------------------------------------
1  | 532 | 969020406 | 89 | 216 | 969100125...
2 | 532 | 969020406 | 89 | Null | Null...

我尝试指定架构并使用 from_json 方法创建数组然后分解它,但我遇到了问题,即任何架构似乎都不适合我的数据

json_schema =  types.StructType([types.StructField('array', types.StructType([ \
    types.StructField("v1",types.StringType(),True), \
    types.StructField("v2",types.StringType(),True), \
    types.StructField("v3",types.StringType(),True)
  ]))])

json_schema = types.ArrayType(types.StructType([ \
    types.StructField("v1",types.StringType(),True), \
    types.StructField("v2",types.StringType(),True), \
    types.StructField("v3",types.StringType(),True)
  ]))

json_schema = types.ArrayType(types.ArrayType(types.IntegerType()))

df.select('id', F.from_json('values', schema=json_schema)).show()

程序只返回 Null 值或空数组:[,,]

我还收到以下错误:StructType can not accept object '[' in type

Pyspark 推断的输入数据架构:

root
 |-- id: integer (nullable = true)
 |-- values: string (nullable = true)

任何帮助将不胜感激。

【问题讨论】:

  • 能否请您添加输入数据的架构,例如df.printSchema() 的输出为您的原始数据?

标签: apache-spark pyspark


【解决方案1】:

对于 Spark 2.4+,您可以使用 splittransform 的组合将字符串转换为二维数组。然后可以将该数组的单个条目分别转换为列。

from pyspark.sql import functions as F

df2 = df.withColumn("parsed_values", F.expr("transform(split(values, '\\\\],\\\\['), " +
           "c ->  transform(split(c, ','), d->regexp_replace(d,'[\\\\[\\\\]]','')))"))\
    .withColumn("length", F.size("parsed_values"))

max_length = df2.agg(F.max("length")).head()["max(length)"]

df2 现在有了结构

root
 |-- id: string (nullable = true)
 |-- values: string (nullable = true)
 |-- parsed_values: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- length: integer (nullable = false)

max_length 包含一行中的最大条目数(示例数据为 5)。

parsed_value[0][1] 将返回第一个条目的第二个子条目。对于示例数据,这将是 969020406

第二步,将嵌套数组转化为列。

cols = [F.col('parsed_values').getItem(x).getItem(y).alias("v{}{}".format(x+1,y+1)) \
    for x in range(0, max_length) for y in range(0,3)]

df2.select([F.col('id')] + cols).show()

输出:

+---+---+---------+---+----+---------+----+----+-----------+----+----+-----------+----+----+----------------+----+
| id|v11|      v12|v13| v21|      v22| v23| v31|        v32| v33| v41|        v42| v43| v51|             v52| v53|
+---+---+---------+---+----+---------+----+----+-----------+----+----+-----------+----+----+----------------+----+
|  1|532|969020406| 89| 216|969100125|  23| 169|39356140000|  72| 399|14407358500| 188| 377|13761937166.6667|  24|
|  2|532|969020406| 89|null|     null|null|null|       null|null|null|       null|null|null|            null|null|
+---+---+---------+---+----+---------+----+----+-----------+----+----+-----------+----+----+----------------+----+

如果有一种方法可以确定max_length,而无需找到完整数据的最大值,例如如果事先知道该值,则可以改进解决方案。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-11-03
    • 1970-01-01
    • 2021-11-09
    • 2019-10-18
    • 2018-06-20
    • 2018-11-13
    • 2016-11-07
    相关资源
    最近更新 更多