【发布时间】:2021-10-21 11:16:12
【问题描述】:
我试图将值映射到我的 pyspark df 中的新列
dict = {'443368995': 0, '667593514': 1, '940995585': 2, '880811536': 3, '174590194': 4}
I am reading a csv which has following data -
+--------------------+----------------+---------+------------+-------------+----------+---------+
| Region| Country| ItemType|SalesChannel|OrderPriority| OrderDate| OrderID|
+--------------------+----------------+---------+------------+-------------+----------+---------+
| Sub-Saharan Africa| South Africa| Fruits| Offline| M| 7/27/2012|443368995|
|Middle East and N...| Morocco| Clothes| Online| M| 9/14/2013|667593514|
|Australia and Oce...|Papua New Guinea| Meat| Offline| M| 5/15/2015|940995585|
| Sub-Saharan Africa| Djibouti| Clothes| Offline| H| 5/17/2017|880811536|
| Europe| Slovakia|Beverages| Offline| L|10/26/2016|174590194|
+--------------------+----------------+---------+------------+-------------+----------+---------+
下面是我想根据上面的字典的某些决定添加的附加列 -
+---------+
| SomeFlag|
+---------+
| Y|
| N|
| Y|
| Y|
| Y|
+---------+
这是我尝试过的代码 ==>
df = spark.read.option("header", True).csv("sample.csv")
def my_mapp_fn(checkcol, dict1):
print(col(checkcol))
print(key)
return coalesce(*[when(col(checkcol) == key, lit(value)) for key, value in d.items()])
new_df = df.withColumn("SomeFlag", my_mapp_fn(dict, col('OrderId')))
我遇到错误 ==>
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 2, in my_mapp_fn
File "C:\BigData\Spark\python\pyspark\sql\functions.py", line 106, in col
return _invoke_function("col", col)
File "C:\BigData\Spark\python\pyspark\sql\functions.py", line 58, in _invoke_function
return Column(jf(*args))
File "C:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1296, in __call__
File "C:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1260, in _build_args
File "C:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", line 1247, in _get_args
File "C:\BigData\Spark\python\lib\py4j-0.10.9-src.zip\py4j\java_collections.py", line 510, in convert
File "C:\BigData\Spark\python\pyspark\sql\column.py", line 460, in __iter__
raise TypeError("Column is not iterable")
TypeError: Column is not iterable
有什么建议吗?提前致谢
【问题讨论】:
标签: python-3.x pyspark