【发布时间】:2020-03-31 09:28:28
【问题描述】:
我想计算 PySpark DataFrame 的两列之间的 Jaro Winkler 距离。 Jaro Winkler 距离可通过所有节点上的 pyjarowinkler 包获得。
pyjarowinkler 的工作原理如下:
from pyjarowinkler import distance
distance.get_jaro_distance("A", "A", winkler=True, scaling=0.1)
输出:
1.0
我正在尝试编写 Pandas UDF 以将两列作为 Series 传递并使用 lambda 函数计算距离。 这是我的做法:
@pandas_udf("float", PandasUDFType.SCALAR)
def get_distance(col1, col2):
import pandas as pd
distance_df = pd.DataFrame({'column_A': col1, 'column_B': col2})
distance_df['distance'] = distance_df.apply(lambda x: distance.get_jaro_distance(str(distance_df['column_A']), str(distance_df['column_B']), winkler = True, scaling = 0.1))
return distance_df['distance']
temp = temp.withColumn('jaro_distance', get_distance(temp.x, temp.x))
我应该能够在上述函数中传递任意两个字符串列。 我得到以下输出:
+---+---+---+-------------+
| x| y| z|jaro_distance|
+---+---+---+-------------+
| A| 1| 2| null|
| B| 3| 4| null|
| C| 5| 6| null|
| D| 7| 8| null|
+---+---+---+-------------+
预期输出:
+---+---+---+-------------+
| x| y| z|jaro_distance|
+---+---+---+-------------+
| A| 1| 2| 1.0|
| B| 3| 4| 1.0|
| C| 5| 6| 1.0|
| D| 7| 8| 1.0|
+---+---+---+-------------+
我怀疑这可能是因为str(distance_df['column_A']) 不正确。它包含所有行值的连接字符串。
虽然这段代码对我有用:
@pandas_udf("float", PandasUDFType.SCALAR)
def get_distance(col):
return col.apply(lambda x: distance.get_jaro_distance(x, "A", winkler = True, scaling = 0.1))
temp = temp.withColumn('jaro_distance', get_distance(temp.x))
输出:
+---+---+---+-------------+
| x| y| z|jaro_distance|
+---+---+---+-------------+
| A| 1| 2| 1.0|
| B| 3| 4| 0.0|
| C| 5| 6| 0.0|
| D| 7| 8| 0.0|
+---+---+---+-------------+
有没有办法用 Pandas UDF 做到这一点?我正在处理数百万条记录,因此 UDF 会很昂贵,但如果它有效,仍然可以接受。谢谢。
【问题讨论】:
标签: python-3.x pandas apache-spark pyspark