【问题标题】:Function to filter values in PySpark在 PySpark 中过滤值的函数
【发布时间】:2020-09-03 14:33:43
【问题描述】:

我正在尝试在 PySpark 中运行一个 for 循环,该循环需要为算法过滤变量。

这是我的数据框 df_prods 的示例:

+----------+--------------------+--------------------+
|ID        |        NAME        |           TYPE     |
+----------+--------------------+--------------------+
|    7983  |SNEAKERS 01         |            Sneakers|
|    7034  |SHIRT 13            |               Shirt|
|    3360  |SHORTS 15           |               Short|

我想遍历一个 ID 列表,从算法中获取匹配项,然后过滤产品的类型。

我创建了一个获取类型的函数:

def get_type(ID_PROD):
    return [row[0] for row in df_prods.filter(df_prods.ID == ID_PROD).select("TYPE").collect()]

并希望它返回:

print(get_type(7983))
Sneakers

但我发现了两个问题:
1- 这样做需要很长时间(比我在 Python 上做类似的事情要长)
2-它返回一个字符串数组类型:['Sneakers'],当我尝试过滤产品时,会发生这种情况:

type = get_type(7983)
df_prods.filter(df_prods.type == type)
java.lang.RuntimeException: Unsupported literal type class java.util.ArrayList [Sneakers]

有人知道在 PySpark 上解决此问题的更好方法吗?

提前非常感谢您。我很难学习 PySpark。

【问题讨论】:

  • 而不是简单地收集 .. 做 collect().head

标签: dataframe apache-spark filter pyspark


【解决方案1】:

对您的功能稍作调整。这将返回过滤后找到的第一条记录中目标列的实际字符串。

from pyspark.sql.functions import col

def get_type(ID_PROD):
  return df.filter(col("ID") == ID_PROD).select("TYPE").collect()[0]["TYPE"]

type = get_type(7983)
df_prods.filter(col("TYPE") == type) # works

我发现使用col("colname") 更具可读性。

关于你提到的性能问题,我真的不能说没有更多细节(例如检查数据和应用程序的其余部分)。试试这个语法并告诉我性能是否有所提高。

【讨论】:

  • 成功了,谢谢!过滤需要 9 秒,这比我使用 Pandas 的时间要长。我知道我与 Pandas 内存中的数据帧而不是 Pyspark 中的数据帧有关,但我缺乏 Pyspark 知识以使其更快。
猜你喜欢
  • 1970-01-01
  • 2021-03-27
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多