【发布时间】:2020-09-03 14:33:43
【问题描述】:
我正在尝试在 PySpark 中运行一个 for 循环,该循环需要为算法过滤变量。
这是我的数据框 df_prods 的示例:
+----------+--------------------+--------------------+
|ID | NAME | TYPE |
+----------+--------------------+--------------------+
| 7983 |SNEAKERS 01 | Sneakers|
| 7034 |SHIRT 13 | Shirt|
| 3360 |SHORTS 15 | Short|
我想遍历一个 ID 列表,从算法中获取匹配项,然后过滤产品的类型。
我创建了一个获取类型的函数:
def get_type(ID_PROD):
return [row[0] for row in df_prods.filter(df_prods.ID == ID_PROD).select("TYPE").collect()]
并希望它返回:
print(get_type(7983))
Sneakers
但我发现了两个问题:
1- 这样做需要很长时间(比我在 Python 上做类似的事情要长)
2-它返回一个字符串数组类型:['Sneakers'],当我尝试过滤产品时,会发生这种情况:
type = get_type(7983)
df_prods.filter(df_prods.type == type)
java.lang.RuntimeException: Unsupported literal type class java.util.ArrayList [Sneakers]
有人知道在 PySpark 上解决此问题的更好方法吗?
提前非常感谢您。我很难学习 PySpark。
【问题讨论】:
-
而不是简单地收集 .. 做 collect().head
标签: dataframe apache-spark filter pyspark