【问题标题】:Fast way to use dictionary in pyspark在 pyspark 中使用字典的快速方法
【发布时间】:2025-11-23 13:40:01
【问题描述】:

我有一个关于 pyspark 的问题。

我有两列“国家”和“网络”的数据框。我需要将此数据框保存为字典,以便稍后遍历另一个数据框列。

我正在像这样保存字典:

sorted_dict = result.rdd.sortByKey()

但是当我尝试遍历它时,我遇到了一个异常:

“您似乎正在尝试广播 RDD 或从“异常”中引用 RDD:您似乎正在尝试广播 RDD 或从操作或转换中引用 RDD。 RDD 转换和动作只能由驱动程序调用,不能在其他转换内部调用;例如 SPARK-5063

明白不能同时使用两个RDD,可惜不知道怎么用这种方式使用SparkContext.broadcast,因为报错了

TypeError: broadcast() 缺少 2 个必需的位置参数:'self' 和 'value'

谁能帮我弄清楚?我需要从数据框制作字典:

+--------------------+-------+
|                 web|country|
+--------------------+-------+
|   alsudanalyoum.com|     SD|
|periodicoequilibr...|     SV|
|  telesurenglish.net|     UK|
|         nytimes.com|     US|
|portaldenoticias....|     AR|
+----------------------------+

然后取另一个数据框:

+--------------------+-------+
|           split_url|country|
+--------------------+-------+
|   alsudanalyoum.com|   Null|
|periodicoequilibr...|   Null|
|  telesurenglish.net|   Null|
|         nytimes.com|   Null|
|portaldenoticias....|   Null|
+----------------------------+

...并将字典的值放入国家列。

附:由于其他原因,join 不适合我。

【问题讨论】:

    标签: python apache-spark pyspark


    【解决方案1】:

    如果可以,你应该使用join(),但既然你不能,你可以结合使用df.rdd.collectAsMap()pyspark.sql.functions.create_map()itertools.chain来达到同样的效果。

    注意:sortByKey() 不返回字典(或地图),而是返回排序后的 RDD

    from itertools import chain
    import pyspark.sql.functions as f
    
    df = spark.createDataFrame([
       ("a", 5),
       ("b", 20),
       ("c", 10),
       ("d", 1),
    ], ["key", "value"])
    
    # create map from the origin df
    rdd_map = df.rdd.collectAsMap()
    
    # yes, these are not real null values, but here it doesn't matter
    df_target = spark.createDataFrame([
       ("a", "NULL"),
       ("b", "NULL"),
       ("c", "NULL"),
       ("d", "NULL"),
    ], ["key", "value"])
    
    df_target.show()
    +---+-----+
    |key|value|
    +---+-----+
    |  a| NULL|
    |  b| NULL|
    |  c| NULL|
    |  d| NULL|
    +---+-----+
    
    value_map = f.create_map(
        [f.lit(x) for x in chain(*rdd_map.items())]
    )
    
    # map over the "key" column into the "value" column
    df_target.withColumn(
        "value",
        value_map[f.col("key")]
    ).show()
    +---+-----+
    |key|value|
    +---+-----+
    |  a|    5|
    |  b|   20|
    |  c|   10|
    |  d|    1|
    +---+-----+
    

    【讨论】: