【问题标题】:PySpark create new column with mapping from a dictPySpark 使用来自字典的映射创建新列
【发布时间】:2025-12-30 07:10:12
【问题描述】:

使用 Spark 1.6,我有一个 Spark DataFrame column(命名为 col1),其值为 A、B、C、DS、DNS、E、F、G 和 H,我想创建一个新列(说col2) 与下面dict 的值,我如何映射这个? (所以 f.i. 'A' 需要映射到 'S' 等等。)

dict = {'A': 'S', 'B': 'S', 'C': 'S', 'DS': 'S', 'DNS': 'S', 'E': 'NS', 'F': 'NS', 'G': 'NS', 'H': 'NS'}

【问题讨论】:

    标签: python apache-spark dictionary pyspark apache-spark-sql


    【解决方案1】:

    使用 UDF 的低效解决方案(与版本无关):

    from pyspark.sql.types import StringType
    from pyspark.sql.functions import udf
    
    def translate(mapping):
        def translate_(col):
            return mapping.get(col)
        return udf(translate_, StringType())
    
    df = sc.parallelize([('DS', ), ('G', ), ('INVALID', )]).toDF(['key'])
    mapping = {
        'A': 'S', 'B': 'S', 'C': 'S', 'DS': 'S', 'DNS': 'S', 
        'E': 'NS', 'F': 'NS', 'G': 'NS', 'H': 'NS'}
    
    df.withColumn("value", translate(mapping)("key"))
    

    结果:

    +-------+-----+
    |    key|value|
    +-------+-----+
    |     DS|    S|
    |      G|   NS|
    |INVALID| null|
    +-------+-----+
    

    更高效(Spark >= 2.0, Spark )是创建MapType 文字:

    from pyspark.sql.functions import col, create_map, lit
    from itertools import chain
    
    mapping_expr = create_map([lit(x) for x in chain(*mapping.items())])
    
    df.withColumn("value", mapping_expr.getItem(col("key")))
    

    结果相同:

    +-------+-----+
    |    key|value|
    +-------+-----+
    |     DS|    S|
    |      G|   NS|
    |INVALID| null|
    +-------+-----+
    

    但更高效的执行计划:

    == Physical Plan ==
    *Project [key#15, keys: [B,DNS,DS,F,E,H,C,G,A], values: [S,S,S,NS,NS,NS,S,NS,S][key#15] AS value#53]
    +- Scan ExistingRDD[key#15]
    

    与 UDF 版本相比:

    == Physical Plan ==
    *Project [key#15, pythonUDF0#61 AS value#57]
    +- BatchEvalPython [translate_(key#15)], [key#15, pythonUDF0#61]
       +- Scan ExistingRDD[key#15]
    

    Spark >= 3.0 中,getItem 应替换为 __getitem__ ([]),即:

    df.withColumn("value", mapping_expr[col("key")]).show()
    

    【讨论】:

    • zero323 - 无法理解此方法。 withColumncolName 字符串作为第一个参数,但是您传递的是映射值而不是键? df.withColumn("value", mapping_expr.getItem(col("key"))) 也是 create_map 的结果,看起来像这样,像上面那样调用 getItem() 对我不起作用:Column<b'map(key_a, val_a, key_b, val_b)'> 有什么想法吗?
    • 非常感谢您提供此解决方案!它就像一个魅力。此外,如果您希望使用列表而不是虚构来执行相同的行为:from pyspark.sql.functions import array 例如:values = [1, 2, 3, 4, 5] indexing_expr = array(*[lit(x) for x in values]) # The * is important df.withColumn("value", indexing_expr[col("index")])
    【解决方案2】:

    听起来最简单的解决方案是使用替换功能: http://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.replace

    mapping= {
            'A': '1',
            'B': '2'
        }
    df2 = df.replace(to_replace=mapping, subset=['yourColName'])
    

    【讨论】:

    • 这里的问题是这不会创建一个新列,它会替换原始列中的值。
    • 不能先把旧值复制到新列中,再使用这个函数吗?
    • 替换还要求新值与原始列的类型相同。
    • 我喜欢这个解决方案的简洁性。如果您想要一个额外的列,只需使用.withColumn("newColumn", "column_to_copy") 左右复制该列-该示例仅提供您自己执行此操作所需知道的最少代码:) 有时我认为 SO 上的 cmets 只是用来迂腐..跨度>
    • 如果有人可以将此解决方案与公认的解决方案进行比较,那将非常有帮助。我实现了replace,但我确实大大减慢了我的代码,这让我很困惑。
    【解决方案3】:

    如果您想从嵌套字典创建地图 col,您可以使用:

    def create_map(d,):
        if type(d) != dict:
            return F.lit(d)
    
        level_map = []
        for k in d:
            level_map.append(F.lit(k))
            level_map.append(create_map(d[k]))
        return F.create_map(level_map)
    
    d = {'a': 1, 'b': {'c': 2, 'd': 'blah'}}
    print(create_map(d)) # <- Column<b'map(a, 1, b, map(c, 2, d, blah))'>
    
    

    【讨论】: