【问题标题】:Python 3.8 - concurrent.futures.ProcessPoolExecutor performance going down in timePython 3.8 - concurrent.futures.ProcessPoolExecutor 性能随时间下降
【发布时间】:2021-01-22 19:47:40
【问题描述】:

我正在尝试并行化我的一个匹配函数,它在一开始就可以工作。很高兴看到我的 72 核 ec2 实例正在杀死它,大约一分钟左右它回到单核并且每秒迭代开始下降。

import concurrent.futures as cf

results = pd.DataFrame()

with cf.ProcessPoolExecutor() as executor:
    for res in tqdm(executor.map(matcher_helper, list(range(len(df))))):
        results = pd.concat([results, res], axis=0)

一开始我就看到了这个

然后就到这里了

大约一分钟,处理比单核还好。在多处理时,它的迭代速度约为 250 每秒,然后下降到 35 每秒

非常感谢任何指导。

编辑 - 附加信息 - 我原来的功能:

def matcher(data,
            data_radial_matrice,
            data_indice,
            comparison_data,
            comparison_radial_matrice,
            distance_threshold=.1):
    

    import pandas as pd
    from sklearn.metrics.pairwise import haversine_distances
    from fuzzywuzzy import fuzz
    import numpy as np

    lvl1 = haversine_distances(data_radial_matrice[data_indice].reshape(-1, 2),
                               comparison_radial_matrice) * 3959
    lvl2 = pd.Series(lvl1[lvl1 < distance_threshold])

    lvl1 = pd.DataFrame(np.argwhere(lvl1 < distance_threshold)).iloc[:, 1]

    lvl3 = pd.concat((lvl1, lvl2), axis=1)
    lvl3.columns = ['neigh_index', 'distance']
    lvl3.set_index('neigh_index', inplace=True)
    lvl3 = lvl3.merge(comparison_data,
                      left_index=True,
                      right_index=True,
                      how='inner')

    lvl4 = lvl3.loc[:, 'match_text'].apply(
        lambda x: fuzz.token_set_ratio(data.loc[data_indice, 'match_text'], x))
    lvl5 = np.where(lvl4 == np.max(lvl4))
    interim_result = lvl3.iloc[lvl5]
    interim_result['match_score'] = np.max(lvl4)
    interim_result['adp_indice'] = data_indice

    return interim_result

【问题讨论】:

  • 您是否有一个可重现的最小示例 - 我们可以运行一个小程序来观察相同的行为?
  • 我实际上正在考虑一种共享部件的方法,但我使用的数据是高度机密的,并且以这种大小模拟测试数据将非常困难。但我会在我的编辑中分享我的内在功能。
  • @Tolga 稍微偏离主题:由于您显然需要大量性能,因此您可能想用github.com/maxbachmann/rapidfuzz 替换fuzzywuzzy。
  • 谢谢,Max,我的工作中非常欢迎任何一点性能改进,我一定会快速测试一下。

标签: python pandas multiprocessing concurrent.futures process-pool


【解决方案1】:

主要的性能瓶颈是由pandas.concat进程引起的,当我将结果收集部分更改为解决问题的np.concatenate时。在 pandas 后端,经过一定的 IO 阈值后,会减慢整个进程并杀死多核处理。

我对代码做了些微改动,最后我返回了 numpy 数组。

def matcher2(data,
        data_radial_matrice,
        data_indice,
        comparison_data,
        comparison_radial_matrice,
        distance_threshold=.1):
'''  Haversine Distance between selected data point and comparison data points are calculated in miles
    by default is limited to .1 mile distance and among this filtered resuls matching is done and max score records are returned
'''

import pandas as pd
from sklearn.metrics.pairwise import haversine_distances
from fuzzywuzzy import fuzz
import numpy as np

lvl1 = haversine_distances(data_radial_matrice[data_indice].reshape(-1, 2),
                           comparison_radial_matrice) * 3959
lvl2 = pd.Series(lvl1[lvl1 < distance_threshold])

lvl1 = pd.DataFrame(np.argwhere(lvl1 < distance_threshold)).iloc[:, 1]

lvl3 = pd.concat((lvl1, lvl2), axis=1)
lvl3.columns = ['neigh_index', 'distance']
lvl3.set_index('neigh_index', inplace=True)
lvl3 = lvl3.merge(comparison_data,
                  left_index=True,
                  right_index=True,
                  how='inner')

lvl4 = lvl3.loc[:, 'match_text'].apply(
    lambda x: fuzz.token_set_ratio(data.loc[data_indice, 'match_text'], x))
lvl5 = np.where(lvl4 == np.max(lvl4))
interim_result = lvl3.iloc[lvl5]
interim_result['match_score'] = np.max(lvl4)
interim_result['adp_indice'] = data_indice

return np.array(interim_result)

最后,我正在解析结果。

def dnb_matcher_helper(indice):
    return matcher2(adp, adp_rad, indice, dnb, dnb_rad)

import concurrent.futures as cf

dnb_results = np.empty(shape=(1,35))

with cf.ProcessPoolExecutor() as executor:
    for res in tqdm(executor.map(dnb_matcher_helper, 
list(range(len(adp))))):
    if len(res) == 0:
        continue
    else:
        for line in res:
            line = line.reshape((1,35))
            dnb_results = np.concatenate((dnb_results, line), axis=0)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-08-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2010-09-25
    • 2017-09-14
    • 1970-01-01
    相关资源
    最近更新 更多