【问题标题】:Mapping column from arrays in Pyspark从 Pyspark 中的数组映射列
【发布时间】:2021-02-26 10:22:58
【问题描述】:

当数组存储在列中时,我是使用 Pyspark df 的新手,并在尝试基于 2 个 PySpark Dataframes 映射列时寻求一些帮助,其中一个是参考 df。

参考数据框(每个组的子组数不同):

| Group | Subgroup |       Size        |      Type        |
| ----  | -------- | ------------------| ---------------  |
|A      | A1       |['Small','Medium'] | ['A','B']        |
|A      | A2       |['Small','Medium'] | ['C','D']        |
|B      | B1       |['Small']          | ['A','B','C','D']|

源数据框:

| ID    | Size     |  Type    |     
| ----  | -------- | ---------| 
|ID_001 | 'Small'  |'A'       | 
|ID_002 | 'Medium' |'B'       | 
|ID_003 | 'Small'  |'D'       | 

在结果中,每个 ID 都属于每个组,但对于基于参考 df 的子组是专有的,结果如下所示:

| ID    | Size     |  Type    |  A_Subgroup  |   B_Subgroup  |
| ----  | -------- | ---------|  ----------  | ------------- |
|ID_001 | 'Small'  |'A'       | 'A1'         |  'B1'         |
|ID_002 | 'Medium' |'B'       | 'A1'         |  Null         |
|ID_003 | 'Small'  |'D'       | 'A2'         |  'B1'         |

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql


    【解决方案1】:

    您可以使用array_contains 条件进行连接,并旋转结果:

    import pyspark.sql.functions as F
    
    result = source.alias('source').join(
        ref.alias('ref'),
        F.expr("""
            array_contains(ref.Size, source.Size) and 
            array_contains(ref.Type, source.Type)
        """),
        'left'
    ).groupBy(
        'ID', source['Size'], source['Type']
    ).pivot('Group').agg(F.first('Subgroup'))
    
    result.show()
    +------+------+----+---+----+
    |    ID|  Size|Type|  A|   B|
    +------+------+----+---+----+
    |ID_003| Small|   D| A2|  B1|
    |ID_002|Medium|   B| A1|null|
    |ID_001| Small|   A| A1|  B1|
    +------+------+----+---+----+
    

    【讨论】:

      猜你喜欢
      • 2020-09-01
      • 1970-01-01
      • 2020-01-31
      • 1970-01-01
      • 2012-08-06
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-08-24
      相关资源
      最近更新 更多