【问题标题】:How do I map values from a dictionary to a new column in Pyspark如何将字典中的值映射到 Pyspark 中的新列
【发布时间】:2021-10-21 11:16:12
【问题描述】:

我试图将值映射到我的 pyspark df 中的新列

dict = {'443368995': 0, '667593514': 1, '940995585': 2, '880811536': 3, '174590194': 4}

I am reading a csv which has following data -
+--------------------+----------------+---------+------------+-------------+----------+---------+
|              Region|         Country| ItemType|SalesChannel|OrderPriority| OrderDate|  OrderID|
+--------------------+----------------+---------+------------+-------------+----------+---------+
|  Sub-Saharan Africa|    South Africa|   Fruits|     Offline|            M| 7/27/2012|443368995|
|Middle East and N...|         Morocco|  Clothes|      Online|            M| 9/14/2013|667593514|
|Australia and Oce...|Papua New Guinea|     Meat|     Offline|            M| 5/15/2015|940995585|
|  Sub-Saharan Africa|        Djibouti|  Clothes|     Offline|            H| 5/17/2017|880811536|
|              Europe|        Slovakia|Beverages|     Offline|            L|10/26/2016|174590194|
+--------------------+----------------+---------+------------+-------------+----------+---------+

下面是我想根据上面的字典的某些决定添加的附加列 -

+---------+
| SomeFlag|
+---------+
|        Y|
|        N|
|        Y|
|        Y|
|        Y|
+---------+

这是我尝试过的代码 ==>

df = spark.read.option("header", True).csv("sample.csv")
def my_mapp_fn(checkcol, dict1):
     print(col(checkcol))
     print(key)
     return coalesce(*[when(col(checkcol) == key, lit(value)) for key, value in d.items()])

new_df = df.withColumn("SomeFlag", my_mapp_fn(dict, col('OrderId')))

我遇到错误 ==>

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 2, in my_mapp_fn
  File "C:\BigData\Spark\python\pyspark\sql\functions.py", line 106, in col
    return _invoke_function("col", col)
  File "C:\BigData\Spark\python\pyspark\sql\functions.py", line 58, in _invoke_function
    return Column(jf(*args))
  File "C:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1296, in __call__
  File "C:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1260, in _build_args
  File "C:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1247, in _get_args
  File "C:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_collections.py", line 510, in convert
  File "C:\BigData\Spark\python\pyspark\sql\column.py", line 460, in __iter__
    raise TypeError("Column is not iterable")
TypeError: Column is not iterable

有什么建议吗?提前致谢

【问题讨论】:

    标签: python-3.x pyspark


    【解决方案1】:

    为了映射 dict 并根据您可以编写的内容创建列:

    from pyspark.sql import functions as F
    
    dict_data = {'443368995': '0', '667593514': '1', '940995585': '2', '880811536': '3', '174590194': '4'}
    
    d = [
        ("M", '443368995'),
        ("M", '667593514'),
        ("M", '940995585'),
        ("H", '880811536'),
        ("L", '174590194'),
        
    ]
    df = spark.createDataFrame(d,['OrderPriority','OrderID'])
    df.show()
    
    # output
    +-------------+---------+
    |OrderPriority|  OrderID|
    +-------------+---------+
    |            M|443368995|
    |            M|667593514|
    |            M|940995585|
    |            H|880811536|
    |            L|174590194|
    +-------------+---------+
    
    
    (
        df
        .withColumn("MapOrderID", F.col("OrderID"))
        .replace(to_replace=dict_data, subset=["MapOrderID"])
        .show()
    )
    
    # output
    +-------------+---------+----------+
    |OrderPriority|  OrderID|MapOrderID|
    +-------------+---------+----------+
    |            M|443368995|         0|
    |            M|667593514|         1|
    |            M|940995585|         2|
    |            H|880811536|         3|
    |            L|174590194|         4|
    +-------------+---------+----------+
    

    您可以在新创建的列上应用何时/否则:

    (
        df
        .withColumn("MapOrderID", F.col("OrderID"))
        .replace(to_replace=dict_data, subset=["MapOrderID"])
        .withColumn("MapOrderID", 
                    F.when(F.col("MapOrderID") == "2", "Ok").otherwise("Not Ok")
                   )
        .show()
    )
    
    # output
    +-------------+---------+----------+
    |OrderPriority|  OrderID|MapOrderID|
    +-------------+---------+----------+
    |            M|443368995|    Not Ok|
    |            M|667593514|    Not Ok|
    |            M|940995585|        Ok|
    |            H|880811536|    Not Ok|
    |            L|174590194|    Not Ok|
    +-------------+---------+----------+
    

    重要的是 dict 中的键/值必须是相同的类型。 这意味着如果你要映射的ID是字符串,值1/2/3也应该是字符串,否则会报错:

    ValueError: Mixed type replacements are not supported
    

    【讨论】:

    • 但是,我可以这样做,然后使用 OrderID 和 SomeFlag 创建一个新的 df 并加入原始文件。我正在寻找一个 udf 或 pyspark 函数级别的解决方案,这样我就不必创建一个新的 df
    猜你喜欢
    • 2019-02-23
    • 2023-02-10
    • 1970-01-01
    • 2018-09-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多