【问题标题】:How to reduceByKey in PySpark with custom grouping of rows?如何在 PySpark 中使用自定义行分组来 reduceByKey?
【发布时间】:2019-05-22 09:31:28
【问题描述】:

我有一个如下所示的数据框:

items_df
======================================================
| customer   item_type    brand    price    quantity |  
|====================================================|
|  1         bread        reems     20         10    |  
|  2         butter       spencers  10         21    |  
|  3         jam          niles     10         22    |
|  1         bread        marks     16         18    |
|  1         butter       jims      19         12    |
|  1         jam          jills     16         6     |
|  2         bread        marks     16         18    |
======================================================

我创建了一个 rdd,将上面的内容转换为 dict:

rdd = items_df.rdd.map(lambda row: row.asDict())

结果如下:

[
   { "customer": 1, "item_type": "bread", "brand": "reems", "price": 20, "quantity": 10 },
   { "customer": 2, "item_type": "butter", "brand": "spencers", "price": 10, "quantity": 21 },
   { "customer": 3, "item_type": "jam", "brand": "niles", "price": 10, "quantity": 22 },
   { "customer": 1, "item_type": "bread", "brand": "marks", "price": 16, "quantity": 18 },
   { "customer": 1, "item_type": "butter", "brand": "jims", "price": 19, "quantity": 12 },
   { "customer": 1, "item_type": "jam", "brand": "jills", "price": 16, "quantity": 6 },
   { "customer": 2, "item_type": "bread", "brand": "marks", "price": 16, "quantity": 18 }
]

我想先按客户对上述行进行分组。然后我想介绍自定义的新键“面包”、“黄油”、“果酱”,并为该客户对所有这些行进行分组。所以我的 rdd 从 7 行减少到 3 行。

输出如下所示:

[
    { 
        "customer": 1, 
        "breads": [
            {"item_type": "bread", "brand": "reems", "price": 20, "quantity": 10},
            {"item_type": "bread", "brand": "marks", "price": 16, "quantity": 18},
        ],
        "butters": [
            {"item_type": "butter", "brand": "jims", "price": 19, "quantity": 12}
        ],
        "jams": [
            {"item_type": "jam", "brand": "jills", "price": 16, "quantity": 6}
        ]
    },
    {
        "customer": 2,
        "breads": [
            {"item_type": "bread", "brand": "marks", "price": 16, "quantity": 18}
        ],
        "butters": [
            {"item_type": "butter", "brand": "spencers", "price": 10, "quantity": 21}
        ],
        "jams": []
    },
    {
        "customer": 3,
        "breads": [],
        "butters": [],
        "jams": [
            {"item_type": "jam", "brand": "niles", "price": 10, "quantity": 22}
        ]
    }
]

有人知道如何使用 PySpark 实现上述目标吗?我想知道是否有使用 reduceByKey() 或类似方法的解决方案。如果可能,我希望避免使用 groupByKey()。

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql rdd


    【解决方案1】:

    首先添加一列item_types 来透视数据框。

    items_df = items_df.withColumn('item_types', F.concat(F.col('item_type'),F.lit('s')))
    items_df.show()
    
    +--------+---------+--------+-----+--------+----------+
    |customer|item_type|   brand|price|quantity|item_types|
    +--------+---------+--------+-----+--------+----------+
    |       1|    bread|   reems|   20|      10|    breads|
    |       2|   butter|spencers|   10|      21|   butters|
    |       3|      jam|   niles|   10|      22|      jams|
    |       1|    bread|   marks|   16|      18|    breads|
    |       1|   butter|    jims|   19|      12|   butters|
    |       1|      jam|   jills|   16|       6|      jams|
    |       2|    bread|   marks|   16|      18|    breads|
    +--------+---------+--------+-----+--------+----------+
    

    然后您可以使用customer 组对表进行透视,并同时使用F.collect_list() 聚合其他列。

    items_df = items_df.groupby(['customer']).pivot("item_types").agg(
        F.collect_list(F.struct(F.col("item_type"),F.col("brand"), F.col("price"),F.col("quantity")))
    ).sort('customer')
    items_df.show()
    
    +--------+--------------------+--------------------+--------------------+
    |customer|              breads|             butters|                jams|
    +--------+--------------------+--------------------+--------------------+
    |       1|[[bread, reems, 2...|[[butter, jims, 1...|[[jam, jills, 16,...|
    |       2|[[bread, marks, 1...|[[butter, spencer...|                  []|
    |       3|                  []|                  []|[[jam, niles, 10,...|
    +--------+--------------------+--------------------+--------------------+
    
    

    最后你需要设置recursive=True 将嵌套的Row 转换为dict。

    rdd = items_df.rdd.map(lambda row: row.asDict(recursive=True))
    print(rdd.take(10))
    
    
    [{'customer': 1,
      'breads': [{'item_type': u'bread', 'brand': u'reems', 'price': 20, 'quantity': 10},
                 {'item_type': u'bread', 'brand': u'marks', 'price': 16, 'quantity': 18}],
      'butters': [{'item_type': u'butter', 'brand': u'jims', 'price': 19, 'quantity': 12}],
      'jams': [{'item_type': u'jam', 'brand': u'jills', 'price': 16, 'quantity': 6}]},
     {'customer': 2,
      'breads': [{'item_type': u'bread', 'brand': u'marks', 'price': 16, 'quantity': 18}],
      'butters': [{'item_type': u'butter', 'brand': u'spencers', 'price': 10, 'quantity': 21}],
      'jams': []},
     {'customer': 3,
      'breads': [],
      'butters': [],
      'jams': [{'item_type': u'jam', 'brand': u'niles', 'price': 10, 'quantity': 22}]}]
    

    【讨论】:

    • 非常感谢@giser_yugang!这是一个很好的解释
    【解决方案2】:

    我还使用了另一种方法,在 rdd 中使用了 reduceByKey()。给定数据框 items_df,首先将其转换为 rdd:

    rdd = items_df.rdd.map(lambda row: row.asDict())
    

    将每一行转换为具有元组 (customer, [row_obj]),其中我们有 row_obj 在列表中:

    rdd = rdd.map(lambda row: ( row["customer"], [row] ) )
    

    使用 reduceByKey 按客户分组,其中列表为给定客户连接:

    rdd = rdd.reduceByKey(lambda x,y: x+y)
    

    将元组转换回字典,其中键是客户,值是所有关联行的列表:

    rdd = rdd.map(lambda tup: { tup[0]: tup[1] } )
    

    由于每个客户数据现在都是连续的,我们可以使用自定义函数将数据分离为面包、黄油、果酱:

    def organize_items_in_customer(row):
        cust_id = list(row.keys())[0]
        items = row[cust_id]
        new_cust_obj = { "customer": cust_id, "breads": [], "butters": [], "jams": [] }
        plurals = { "bread":"breads", "butter":"butters", "jam":"jams" }
        for item in items:
            item_type = item["item_type"]
            key = plurals[item_type]
            new_cust_obj[key].append(item)
        return new_cust_obj
    

    调用上述函数转换rdd:

    rdd = rdd.map(organize_items_in_customer)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2018-08-07
      • 1970-01-01
      • 2021-12-15
      • 2015-07-02
      • 2021-10-03
      • 2015-09-14
      • 2018-04-16
      相关资源
      最近更新 更多