【问题标题】:How do I use multiple conditions with pyspark.sql.funtions.when() from a dict?如何在字典中使用 pyspark.sql.functions.when() 的多个条件?
【发布时间】:2026-01-06 00:05:02
【问题描述】:

我想根据字典中的值生成一个 when 子句。它与正在做的事情非常相似How do I use multiple conditions with pyspark.sql.funtions.when()?

只有我想传递一个列和值的字典

假设我有一个字典:

{
  'employed': 'Y',
  'athlete': 'N'
}

我想用那个 dict 来生成等价的:

df.withColumn("call_person",when((col("employed") == "Y") & (col("athlete") == "N"), "Y")

所以最终结果是:

+---+-----------+--------+-------+
| id|call_person|employed|athlete|
+---+-----------+--------+-------+
|  1|     Y     |    Y   |   N   |
|  2|     N     |    Y   |   Y   |
|  3|     N     |    N   |   N   |
+---+-----------+--------+-------+

请注意我想以编程方式执行此操作的部分原因是我有不同长度的字典(条件数)

【问题讨论】:

    标签: python-3.x pyspark apache-spark-sql


    【解决方案1】:

    使用reduce()函数:

    from functools import reduce
    from pyspark.sql.functions import when, col
    
    # dictionary
    d = {
      'employed': 'Y',
      'athlete': 'N'
    }
    
    # set up the conditions, multiple conditions merged with `&`
    cond = reduce(lambda x,y: x&y, [ col(c) == v for c,v in d.items() if c in df.columns ])
    
    # set up the new column
    df.withColumn("call_person", when(cond, "Y").otherwise("N")).show()
    +---+--------+-------+-----------+
    | id|employed|athlete|call_person|
    +---+--------+-------+-----------+
    |  1|       Y|      N|          Y|
    |  2|       Y|      Y|          N|
    |  3|       N|      N|          N|
    +---+--------+-------+-----------+
    

    【讨论】:

    • 是的,你说得对 - 使用 reduce 真的很有趣
    【解决方案2】:

    您也可以直接访问字典项:

    dict ={
      'code': 'b',
      'amt': '4'
      }
    list = [(1, 'code'),(1,'amt')]
    df=spark.createDataFrame(list, ['id', 'dict_key'])
    
    from pyspark.sql.functions import udf
    from pyspark.sql.types import StringType
    
    user_func =  udf (lambda x: dict.get(x), StringType())
    newdf = df.withColumn('new_column',user_func(df.dict_key))
    
    >>> newdf.show();
    +---+--------+----------+
    | id|dict_key|new_column|
    +---+--------+----------+
    |  1|    code|         b|
    |  1|     amt|         4|
    +---+--------+----------+
    

    或广播字典

    broadcast_dict = sc.broadcast(dict)
    
    def my_func(key):
        return broadcast_dict.value.get(key)
    
    new_my_func = udf(my_func, StringType())
    
    newdf = df.withColumn('new_column',new_my_func(df.dict_key))
    >>> newdf.show();
    +---+--------+----------+
    | id|dict_key|new_column|
    +---+--------+----------+
    |  1|    code|         b|
    |  1|     amt|         4|
    +---+--------+----------+
    

    【讨论】:

    • 我继续添加了一些结果,b/c 这不是我想要完成的。感谢您的回复,我很抱歉在原始帖子中没有更清楚
    最近更新 更多