【问题标题】:Passing multiple columns in Pandas UDF PySpark在 Pandas UDF PySpark 中传递多列
【发布时间】:2020-03-31 09:28:28
【问题描述】:

我想计算 PySpark DataFrame 的两列之间的 Jaro Winkler 距离。 Jaro Winkler 距离可通过所有节点上的 pyjarowinkler 包获得。

pyjarowinkler 的工作原理如下:

from pyjarowinkler import distance
distance.get_jaro_distance("A", "A", winkler=True, scaling=0.1)

输出:

1.0

我正在尝试编写 Pandas UDF 以将两列作为 Series 传递并使用 lambda 函数计算距离。 这是我的做法:

@pandas_udf("float", PandasUDFType.SCALAR)
def get_distance(col1, col2):
    import pandas as pd
    distance_df  = pd.DataFrame({'column_A': col1, 'column_B': col2})
    distance_df['distance'] = distance_df.apply(lambda x: distance.get_jaro_distance(str(distance_df['column_A']), str(distance_df['column_B']), winkler = True, scaling = 0.1))
    return distance_df['distance']

temp = temp.withColumn('jaro_distance', get_distance(temp.x, temp.x))

我应该能够在上述函数中传递任意两个字符串列。 我得到以下输出:

+---+---+---+-------------+
|  x|  y|  z|jaro_distance|
+---+---+---+-------------+
|  A|  1|  2|         null|
|  B|  3|  4|         null|
|  C|  5|  6|         null|
|  D|  7|  8|         null|
+---+---+---+-------------+

预期输出:

+---+---+---+-------------+
|  x|  y|  z|jaro_distance|
+---+---+---+-------------+
|  A|  1|  2|          1.0|
|  B|  3|  4|          1.0|
|  C|  5|  6|          1.0|
|  D|  7|  8|          1.0|
+---+---+---+-------------+

我怀疑这可能是因为str(distance_df['column_A']) 不正确。它包含所有行值的连接字符串。

虽然这段代码对我有用:

@pandas_udf("float", PandasUDFType.SCALAR)
def get_distance(col):
    return col.apply(lambda x: distance.get_jaro_distance(x, "A", winkler = True, scaling = 0.1))

temp = temp.withColumn('jaro_distance', get_distance(temp.x))

输出:

+---+---+---+-------------+
|  x|  y|  z|jaro_distance|
+---+---+---+-------------+
|  A|  1|  2|          1.0|
|  B|  3|  4|          0.0|
|  C|  5|  6|          0.0|
|  D|  7|  8|          0.0|
+---+---+---+-------------+

有没有办法用 Pandas UDF 做到这一点?我正在处理数百万条记录,因此 UDF 会很昂贵,但如果它有效,仍然可以接受。谢谢。

【问题讨论】:

    标签: python-3.x pandas apache-spark pyspark


    【解决方案1】:

    错误来自您在 df.apply 方法中的函数,将其调整为以下应该可以修复它:

    @pandas_udf("float", PandasUDFType.SCALAR)
    def get_distance(col1, col2):
        import pandas as pd
        distance_df  = pd.DataFrame({'column_A': col1, 'column_B': col2})
        distance_df['distance'] = distance_df.apply(lambda x: distance.get_jaro_distance(x['column_A'], x['column_B'], winkler = True, scaling = 0.1), axis=1)
        return distance_df['distance']
    

    但是,Pandas df.apply 方法没有矢量化,这与我们在 PySpark 中需要 pandas_udf 而不是 udf 的目的不符。一种更快且开销更少的解决方案是使用列表解析来创建返回的 pd.Series(查看此link 以了解有关 Pandas df.apply 及其替代方案的更多讨论):

    from pandas import Series
    
    @pandas_udf("float", PandasUDFType.SCALAR)
    def get_distance(col1, col2):
       return Series([ distance.get_jaro_distance(c1, c2, winkler=True, scaling=0.1) for c1,c2 in zip(col1, col2) ])
    
    df.withColumn('jaro_distance', get_distance('x', 'y')).show()
    +---+---+---+-------------+
    |  x|  y|  z|jaro_distance|
    +---+---+---+-------------+
    | AB| 1B|  2|         0.67|
    | BB| BB|  4|          1.0|
    | CB| 5D|  6|          0.0|
    | DB|B7F|  8|         0.61|
    +---+---+---+-------------+
    

    【讨论】:

      【解决方案2】:

      您可以先合并所有数据帧,在分区被打乱分配到工作节点后使用相同的分区键进行分区,并在pandas计算之前恢复它们。请查看我为此场景编写了一个小工具包的示例:SparkyPandas

      【讨论】:

        猜你喜欢
        • 2017-07-21
        • 1970-01-01
        • 2018-04-24
        • 1970-01-01
        • 1970-01-01
        • 2021-07-06
        • 1970-01-01
        • 1970-01-01
        • 2019-04-02
        相关资源
        最近更新 更多