【问题标题】:Improve performance of Fuzzywuzzy between two large datasets提高两个大型数据集之间 Fuzzywuzzy 的性能
【发布时间】:2022-01-05 18:52:53
【问题描述】:

我已经在我的机器上安装了 apache spark 以在本地运行它。 我的机器有这个规格: 2,3 GHz 四核 Intel Core i7 16 GB 3733 MHz LPDDR4X

所以,我配置了一个本地 spark 会话 -> local[6]。

我的问题是: 我必须匹配两个基于字符串的大型 spark 数据集。一个有 240 万条记录,另一个有 38k 条记录。我已经为 python 3.9 使用了 Fuzzywuzzy 库,我的代码的性能真的很差。我正在运行这个匹配 7 个小时,它还没有完成。

我想在较小的数据集中找到最佳匹配,以完成大型数据集中的信息。因此,我将要在较小数据集中搜索的大型数据集中的信息连接起来,并应用了 process.extractBests()。我知道有很多比较要做,但我还没有找到任何其他解决方案来尽量减少比较次数。另一方面,我觉得我没有利用并行处理。 你能给我一些建议吗?

header1 = ["name_make_ad", "name_model_ad", "name_fueltype_ad", "engine_capacity_ccm", "engine_power_km",
                    "engine_power_kw", "name_transmission_ad", "name_body_ad", "name_drivetype_ad", "nr_door_ad",
                    "nr_seats_ad"]

header2 = ["name_make", "name_model", "name_fueltype", "cap_ccm", "pwr_km_base", "pwr_kw_base",
                     "name_transmission_et", "name_body_et", "name_drivetype_et", "cnt_door", "cnt_seat", "id"]

dataset1 = df1.withColumn("concatenated1", F.concat_ws(", ", *[F.col(x) for x in header1]))

dataset2 = df2 \
    .withColumn("id", F.monotonically_increasing_id()) \
    .withColumn("concatenated2", F.concat_ws(", ", *[F.col(x) for x in header2]))

matches = []
list1 = df1.select("header1").rdd.flatMap(lambda x: x).collect()

list2 = df2.select("header2").rdd.flatMap(lambda x: x).collect()

for item1 in list1:
    matches.append((item1,process.extractBests(item1, list2, scorer=fuzz.token_set_ratio, score_cutoff=50, limit=2)[0][0]
                    .split(", ")[-1]))
dfIndexes = sparkSession.createDataFrame(matches).toDF("header1","id")
dfAdsIndexed = df1.join(dfIndexes, on="header1", how="inner").drop("header1")
dfMapping = dfAdsIndexed.join(df2, on="id", how="inner").drop("header2","id")

数据集 1:

name_make_ad name_model_ad prod_year_ad name_fueltype_ad engine_capacity_ccm engine_power_km engine_power_kw name_transmission_ad name_body_ad name_drivetype_ad name_color_ad nr_door_ad nr_seats_ad mileage_ad currency_ad price_ad
fiat freemont 2016 diesel 1956.0 170.0 125.0 automatic suv all-wheel-auto grey 5 7 82000.0 PLN 66500.0
fiat freemont 2015 diesel 1956.0 170.0 125.0 manual suv front-wheel grey 5 7 140000.0 PLN 64900.0
fiat freemont 2013 diesel 1956.0 140.0 103.0 manual suv front-wheel black 5 7 189000.0 PLN 47970.0

数据集2:

name_make name_model modnamegrp2 name_vehtype_et y_modbegin y_modend name_body_et cnt_seat cnt_door name_fueltype pwr_km_base pwr_kw_base pwr_km_hyb pwr_kw_hyb cap_ccm torque_base torque_hyb cnt_hyb name_drivetype_et name_transmission_et
FIAT Freemont Freemont Passenger Car 2011 2016 Van 7 5 Petrol 170 125 0 0 2360 220 0 0 Front wheel drive Automatic transmission
FIAT Freemont Freemont Passenger Car 2011 2016 Van 7 5 Petrol 280 206 0 0 3605 342 0 0 4 wheel drive general Automatic transmission
FIAT Freemont Freemont Passenger Car 2011 2016 Van 7 5 Diesel 170 125 0 0 1956 350 0 0 4 wheel drive general Automatic transmission
FIAT Freemont Freemont Passenger Car 2011 2016 Van 5 5 Petrol 170 125 0 0 2360 220 0 0 Front wheel drive Automatic transmission
FIAT Freemont Freemont Passenger Car 2011 2016 Van 7 5 Diesel 170 125 0 0 1956 350 0 0 Front wheel drive Manual gearbox
FIAT Freemont Freemont Passenger Car 2011 2016 Van 7 5 Diesel 140 103 0 0 1956 350 0 0 Front wheel drive Manual gearbox

所以,我必须找出 dataset1 中的每条记录,根据相似性在 dataset2 上的最佳匹配是什么。

此示例的输出将是:

A B C D E F G H I J K L M N O P Q R S T
FIAT Freemont Freemont Passenger Car 2011 2016 Van 7 5 Diesel 170 125 0 0 1956 350 0 0 4 wheel drive general Automatic transmission
FIAT Freemont Freemont Passenger Car 2011 2016 Van 7 5 Diesel 170 125 0 0 1956 350 0 0 Front wheel drive Manual gearbox
FIAT Freemont Freemont Passenger Car 2011 2016 Van 7 5 Diesel 140 103 0 0 1956 350 0 0 Front wheel drive Manual gearbox

【问题讨论】:

标签: python apache-spark pyspark bigdata


【解决方案1】:

collect 和后续的for 循环不会被 Spark 并行化。因此,性能下降。您可以在df1df2 上应用笛卡尔连接,然后调用udf 对每一行进行评分。最后可以根据分数排序,选择topn行。


from pyspark.sql import functions as F
from pyspark.sql import Window
from fuzzywuzzy import process, fuzz
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DoubleType

cutoff = 50
max_matches = 1

@udf(returnType=DoubleType()) 
def scorer(query: str, choice: str):
    match_score = list(process.extractWithoutOrder(query, [choice], scorer=fuzz.token_set_ratio, score_cutoff=cutoff))
    if len(match_score) == 0:
        return 0.0
    return float(match_score[0][1])

df_1_data = [("fiat", "freemont", 2016, "diesel", 1956.0, 170.0, 125.0, "automatic", "suv", "all-wheel-auto", "grey", 5, 7, 82000.0, "PLN", 66500.0),
("fiat", "freemont", 2015, "diesel", 1956.0, 170.0, 125.0, "manual", "suv", "front-wheel", "grey", 5, 7, 140000.0, "PLN", 64900.0),
("fiat", "freemont", 2013, "diesel", 1956.0, 140.0, 103.0, "manual", "suv", "front-wheel", "lack", 5, 7, 189000.0, "PLN", 47970.0),]

df_2_data = [("FIAT", "Freemont", "Freemont", "Passenger Car", 2011, 2016, "Van", 7, 5, "Petrol", 170, 125, 0, 0, 2360, 220, 0, 0, "Front wheel drive", "Automatic transmission"),
("FIAT", "Freemont", "Freemont", "Passenger Car", 2011, 2016, "Van", 7, 5, "Petrol", 280, 206, 0, 0, 3605, 342, 0, 0, "4 wheel drive general", "Automatic transmission"),
("FIAT", "Freemont", "Freemont", "Passenger Car", 2011, 2016, "Van", 7, 5, "Diesel", 170, 125, 0, 0, 1956, 350, 0, 0, "4 wheel drive general", "Automatic transmission"),
("FIAT", "Freemont", "Freemont", "Passenger Car", 2011, 2016, "Van", 5, 5, "Petrol", 170, 125, 0, 0, 2360, 220, 0, 0, "Front wheel drive", "Automatic transmission"),
("FIAT", "Freemont", "Freemont", "Passenger Car", 2011, 2016, "Van", 7, 5, "Diesel", 170, 125, 0, 0, 1956, 350, 0, 0, "Front wheel drive", "Manual gearbox"),
("FIAT", "Freemont", "Freemont", "Passenger Car", 2011, 2016, "Van", 7, 5, "Diesel", 140, 103, 0, 0, 1956, 350, 0, 0, "Front wheel drive", "Manual gearbox"),]


header1 = ["name_make_ad", "name_model_ad", "name_fueltype_ad", "engine_capacity_ccm", "engine_power_km",
                    "engine_power_kw", "name_transmission_ad", "name_body_ad", "name_drivetype_ad", "nr_door_ad",
                    "nr_seats_ad"]

header2 = ["name_make", "name_model", "name_fueltype", "cap_ccm", "pwr_km_base", "pwr_kw_base",
                     "name_transmission_et", "name_body_et", "name_drivetype_et", "cnt_door", "cnt_seat"]

df1 = spark.createDataFrame(df_1_data, ("name_make_ad", "name_model_ad", "prod_year_ad", "name_fueltype_ad", "engine_capacity_ccm", "engine_power_km", "engine_power_kw", "name_transmission_ad", "name_body_ad", "name_drivetype_ad", "name_color_ad", "nr_door_ad", "nr_seats_ad", "mileage_ad", "currency_ad", "price_ad",))

df2 = spark.createDataFrame(df_2_data, ("name_make", "name_model", "modnamegrp2", "name_vehtype_et", "y_modbegin", "y_modend", "name_body_et", "cnt_seat", "cnt_door", "name_fueltype", "pwr_km_base", "pwr_kw_base", "pwr_km_hyb", "pwr_kw_hyb", "cap_ccm", "torque_base", "torque_hyb", "cnt_hyb",  "name_drivetype_et",  "name_transmission_et",))

dataset1 = df1.withColumn("id", F.monotonically_increasing_id())\
              .withColumn("concatenated1", F.concat_ws(", ", *[F.col(x) for x in header1]))

dataset2 = df2.withColumn("concatenated2", F.concat_ws(", ", *[F.col(x) for x in header2]))

combined_df = F.broadcast(dataset1).crossJoin(dataset2)

df_score_matching_cutoff = combined_df.withColumn("fuzz_score", scorer(col("concatenated1"), col("concatenated2")))\
                      .filter(col("fuzz_score") >= cutoff)

window_spec = Window.partitionBy("id").orderBy(F.desc("fuzz_score"))

results = df_score_matching_cutoff.withColumn("rn", F.row_number().over(window_spec))\
                        .filter(col("rn") <= max_matches)

results.show(truncate=False)

输出

+------------+-------------+------------+----------------+-------------------+---------------+---------------+--------------------+------------+-----------------+-------------+----------+-----------+----------+-----------+--------+---+----------------------------------------------------------------------------------+---------+----------+-----------+---------------+----------+--------+------------+--------+--------+-------------+-----------+-----------+----------+----------+-------+-----------+----------+-------+---------------------+----------------------+------------------------------------------------------------------------------------------------+----------+---+
|name_make_ad|name_model_ad|prod_year_ad|name_fueltype_ad|engine_capacity_ccm|engine_power_km|engine_power_kw|name_transmission_ad|name_body_ad|name_drivetype_ad|name_color_ad|nr_door_ad|nr_seats_ad|mileage_ad|currency_ad|price_ad|id |concatenated1                                                                     |name_make|name_model|modnamegrp2|name_vehtype_et|y_modbegin|y_modend|name_body_et|cnt_seat|cnt_door|name_fueltype|pwr_km_base|pwr_kw_base|pwr_km_hyb|pwr_kw_hyb|cap_ccm|torque_base|torque_hyb|cnt_hyb|name_drivetype_et    |name_transmission_et  |concatenated2                                                                                   |fuzz_score|rn |
+------------+-------------+------------+----------------+-------------------+---------------+---------------+--------------------+------------+-----------------+-------------+----------+-----------+----------+-----------+--------+---+----------------------------------------------------------------------------------+---------+----------+-----------+---------------+----------+--------+------------+--------+--------+-------------+-----------+-----------+----------+----------+-------+-----------+----------+-------+---------------------+----------------------+------------------------------------------------------------------------------------------------+----------+---+
|fiat        |freemont     |2016        |diesel          |1956.0             |170.0          |125.0          |automatic           |suv         |all-wheel-auto   |grey         |5         |7          |82000.0   |PLN        |66500.0 |0  |fiat, freemont, diesel, 1956.0, 170.0, 125.0, automatic, suv, all-wheel-auto, 5, 7|FIAT     |Freemont  |Freemont   |Passenger Car  |2011      |2016    |Van         |7       |5       |Diesel       |170        |125        |0         |0         |1956   |350        |0         |0      |4 wheel drive general|Automatic transmission|FIAT, Freemont, Diesel, 1956, 170, 125, Automatic transmission, Van, 4 wheel drive general, 5, 7|88.0      |1  |
|fiat        |freemont     |2015        |diesel          |1956.0             |170.0          |125.0          |manual              |suv         |front-wheel      |grey         |5         |7          |140000.0  |PLN        |64900.0 |1  |fiat, freemont, diesel, 1956.0, 170.0, 125.0, manual, suv, front-wheel, 5, 7      |FIAT     |Freemont  |Freemont   |Passenger Car  |2011      |2016    |Van         |7       |5       |Diesel       |170        |125        |0         |0         |1956   |350        |0         |0      |Front wheel drive    |Manual gearbox        |FIAT, Freemont, Diesel, 1956, 170, 125, Manual gearbox, Van, Front wheel drive, 5, 7            |95.0      |1  |
|fiat        |freemont     |2013        |diesel          |1956.0             |140.0          |103.0          |manual              |suv         |front-wheel      |lack         |5         |7          |189000.0  |PLN        |47970.0 |2  |fiat, freemont, diesel, 1956.0, 140.0, 103.0, manual, suv, front-wheel, 5, 7      |FIAT     |Freemont  |Freemont   |Passenger Car  |2011      |2016    |Van         |7       |5       |Diesel       |140        |103        |0         |0         |1956   |350        |0         |0      |Front wheel drive    |Manual gearbox        |FIAT, Freemont, Diesel, 1956, 140, 103, Manual gearbox, Van, Front wheel drive, 5, 7            |95.0      |1  |
+------------+-------------+------------+----------------+-------------------+---------------+---------------+--------------------+------------+-----------------+-------------+----------+-----------+----------+-----------+--------+---+----------------------------------------------------------------------------------+---------+----------+-----------+---------------+----------+--------+------------+--------+--------+-------------+-----------+-----------+----------+----------+-------+-----------+----------+-------+---------------------+----------------------+------------------------------------------------------------------------------------------------+----------+---+

【讨论】:

  • 我已经分享了 dataset1 和 dataset2 的一些数据,以及基于相似性的最佳匹配输出应该是什么样子。能否请你帮忙?提前致谢。
  • 你有机会看看吗?
  • 我应该在 2 小时内有一个工作示例。
  • @jorgemaagomes 用工作示例更新了答案。
  • 如果我有一个具有以下功能的 spark 集群: .config("spark.executor.instances", 10) .config("spark.executor.cores", 3) .config("spark. executor.memory", "10g") 我应该用 10 个或更多 (.reparition(10)) 对 dataset1 进行分区吗?考虑到可用执行器的数量,关于数据集上的分区数量的最佳方法是什么?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2011-04-13
  • 2012-07-13
  • 2015-05-12
  • 2018-09-27
  • 1970-01-01
  • 1970-01-01
  • 2019-09-09
相关资源
最近更新 更多