【问题标题】:Pyspark create array column of certain length from existing array columnPyspark 从现有数组列创建一定长度的数组列
【发布时间】:2020-05-03 17:49:32
【问题描述】:

我有一个类似 pyspark 的数据框 df

+-----+----+------------+------------+-------------+------------+
| Name| Age| P_Attribute|S_Attributes|P_Values     |S_values    | 
+-----+----+------------+------------+-------------+------------+
| Bob1| 16 |  [x1,x2]   |     [x1,x3]|["ab",1]     | [1,2]      |
| Bob2| 16 |[x1,x2,x3]  |     []     |["a","b","c"]| []         |
+-----+----+------------+------------+-------------+------------+

我想最终创建 df 如下,

+-----+----+------------+------------+
| Name| Age| Attribute  |      Values|
+-----+----+------------+------------+
| Bob1| 16 |  x1        |     ab     |
| Bob1| 16 |  x2        |     1      |
| Bob1| 16 |  x1        |     1      |
| Bob1| 16 |  x3        |     2      |
| Bob2| 16 |  x1        |     a      |
| Bob2| 16 |  x2        |     b      |
| Bob2| 16 |  x3        |     c      |
+-----+----+------------+------------+

基本上我想合并这两列并将它们分解成行。在 pyspark 数组函数的帮助下,我能够连接数组并进行分解,但稍后要识别专业属性和运动属性之间的差异,因为它们可以具有相同的名称。我也需要一个类型列,

+-----+----+------------+------------+------------+
| Name| Age|   Attribute|       type |Value       |
+-----+----+------------+------------+------------+
| Bob1| 16 |  x1        |     1      | ab         |
| Bob1| 16 |  x2        |     1      | 1          |
| Bob1| 16 |  x1        |     2      | 1          |
| Bob1| 16 |  x3        |     2      | 2          |
| Bob2| 16 |  x1        |     1      | a          |
| Bob2| 16 |  x2        |     1      | b          |
| Bob2| 16 |  x3        |     1      | c          |
+-----+----+------------+------------+------------+  

所以我最初想创建一个单独的数组列,

+-----+----+------------+------------+------------+------------+
| Name| Age| P_Attribute|S_Attributes|P_type      |S_type      |
+-----+----+------------+------------+------------+------------+
| Bob1| 16 |  [x1,x2]   |     [x1,x3]|   [1,1]    | [2,2]      |
| Bob2| 16 |[x1,x2,x3]  |     []     |  [1,1,1]   |  []        |
+-----+----+------------+------------+------------+------------+

这样我就可以合并列并使用所需的类型列展开,如上面的 df 所示。 问题是我无法动态创建 P_type 和 S_type 列。 我试过下面的代码,

new_df = df.withColumn("temp_P_type", F.lit(1))\
                .withColumn("P_type", F.array_repeat("temp_P_type",F.size("P_Attribute")))

这会引发 TypeError: Column is not iterable 错误。 如果列的长度已经被提取为另一列,它也不起作用。 任何人都可以帮我解决这个问题,或者是否有更好的解决方案可以做到这一点?是否可以在不使用 RDD 和 python 函数(没有 UDF)的情况下以 df 级别执行此操作?

附:我正在使用火花 2.4

【问题讨论】:

  • 我想创建如下的 df, 这个预期的结果(第二个表)非常令人困惑。您是否只需要一个属性列,例如下面的第三张表?

标签: python apache-spark pyspark apache-spark-sql


【解决方案1】:

我建议使用高阶函数 transform,先使用 structarray_union,然后使用 @987654324 @ 并使用 .* expansion. 选择两者。

df.show()
#+----+---+------------+------------+
#|Name|Age| P_Attribute|S_Attributes|
#+----+---+------------+------------+
#|Bob1| 16|    [x1, x2]|    [x1, x3]|
#|Bob2| 16|[x1, x2, x3]|          []|
#+----+---+------------+------------+

from pyspark.sql import functions as F
df.withColumn("Attributes", F.explode(F.array_union(F.expr("""transform(P_Attribute,x-> struct(x as Attribute,1 as Type))"""),\
              F.expr("""transform(S_Attributes,x-> struct(x as Attribute,2 as Type))"""))))\
   .select("Name", "Age", "Attributes.*").show()

#+----+---+---------+----+
#|Name|Age|Attribute|Type|
#+----+---+---------+----+
#|Bob1| 16|       x1|   1|
#|Bob1| 16|       x2|   1|
#|Bob1| 16|       x1|   2|
#|Bob1| 16|       x3|   2|
#|Bob2| 16|       x1|   1|
#|Bob2| 16|       x2|   1|
#|Bob2| 16|       x3|   1|
#+----+---+---------+----+

UPDATE:

df.show()

#+----+---+------------+------------+---------+--------+
#|Name|Age| P_Attribute|S_Attributes| P_Values|S_values|
#+----+---+------------+------------+---------+--------+
#|Bob1| 16|    [x1, x2]|    [x1, x3]|  [ab, 1]|  [1, 2]|
#|Bob2| 16|[x1, x2, x3]|          []|[a, b, c]|      []|
#+----+---+------------+------------+---------+--------+

from pyspark.sql import functions as F
df.withColumn("Attributes", F.explode(F.array_union\
               (F.expr("""transform(arrays_zip(P_Attribute,P_Values),x->\
                          struct(x.P_Attribute as Attribute,1 as Type,string(x.P_Values) as Value))"""),\
                F.expr("""transform(arrays_zip(S_Attributes,S_Values),x->\
                          struct(x.S_Attributes as Attribute,2 as Type,string(x.S_Values) as Value))"""))))\
   .select("Name", "Age", "Attributes.*").show()

#+----+---+---------+----+-----+
#|Name|Age|Attribute|Type|Value|
#+----+---+---------+----+-----+
#|Bob1| 16|       x1|   1|   ab|
#|Bob1| 16|       x2|   1|    1|
#|Bob1| 16|       x1|   2|    1|
#|Bob1| 16|       x3|   2|    2|
#|Bob2| 16|       x1|   1|    a|
#|Bob2| 16|       x2|   1|    b|
#|Bob2| 16|       x3|   1|    c|
#+----+---+---------+----+-----+

【讨论】:

  • 感谢@Mohammand Murtaza Hashmi 的回答。它解决了这个问题。是否可以向 F.expr 添加多个列,例如 F.expr("""transform(S_Attributes,S_value, x, y -> struct(x as Attribute, y as Vaue, 2 as Type))""")) )
  • 所以 transform 只能接受一个参数,或者 (x,i) 其中 i 是 x 的索引。因此,要使其正常工作,您必须将S_AttributesS_value 组合起来,然后使用变换对其进行遍历。如果S_value 也是一个数组,你可以这样做F.expr("""transform(arrays_zip(S_Attributes,S_value), x -> struct(x.S_Attributes as Attribute, x.S_value as Value, 2 as Type))""")
  • 如果您可以提供S_value 列和P_value 列,我很乐意与他们一起更新解决方案@NachiketKate
【解决方案2】:

您可以执行以下操作。首先将P_attributesS_attributes 收集到单个Attributes 列中,然后对其执行posexplode,这应该给出引用属性源(PS)的type 列作为你需要。最后explodeAttributes 列将所有属性展平。

import pyspark.sql.functions as f

df = spark.createDataFrame([
    ['Bob1', 16, ['x1', 'x2'], ['x1', 'x3']],
    ['Bob2', 16, ['x1', 'x2', 'x3'], []]],
    ['Name', 'Age', 'P_Attribute', 'S_Attributes'])

df.withColumn('Attributes', f.array('P_Attribute', 'S_Attributes'))\
  .select('Name', 'Age', f.posexplode('Attributes').alias('type', 'Attribute'))\
  .withColumn('Attribute', f.explode('Attribute'))\
  .show()

+----+---+----+---------+
|Name|Age|type|Attribute|
+----+---+----+---------+
|Bob1| 16|   0|       x1|
|Bob1| 16|   0|       x2|
|Bob1| 16|   1|       x1|
|Bob1| 16|   1|       x3|
|Bob2| 16|   0|       x1|
|Bob2| 16|   0|       x2|
|Bob2| 16|   0|       x3|
+----+---+----+---------+

【讨论】:

  • 感谢@Psidom 的回答。如果将 P_values 和 S_values 列作为这些属性的值,是否可以使用 f.array()?
  • 不完全确定您需要什么。但可能f.posexplode(f.explode(f.array(f.create_map('P_Attribute', 'P_values'), f.create_map('S_Attributes', 'S_values')))).alias('type', 'Attribute')如果属性列没有重复元素。
猜你喜欢
  • 1970-01-01
  • 2022-08-03
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-05-10
相关资源
最近更新 更多