【问题标题】:Why is parallel implementation slower than serial? (Python multiprocessing module)为什么并行实现比串行慢? (Python 多处理模块)
【发布时间】:2016-10-18 09:45:52
【问题描述】:

我正在尝试通过引入另一种计算邻居之间距离的方法来扩展 scikit-learn 类 KNeighborsClassifier(如果有兴趣,请参阅 here)。

并行化方案如下: 假设我们要计算集合 A 和集合 B 的所有元素之间的距离,对于 A 中的每个元素(一个接一个顺序),并行计算到 B 中所有元素的距离 。 耗时的操作是计算任意两个元素之间的距离,所以每个进程都要进行这个基础操作。

问题在于并行执行比串行执行慢得多(使用 Python 的 multiprocessing 模块),无论是在使用同步调用还是异步调用时,无论机器和使用的内核数量如何。

我怀疑这与使用在后台通信的共享变量有关。问题是,正在传达哪些变量以及如何避免这种情况?

代码:

class WordMoversKNN(KNeighborsClassifier):
"""K nearest neighbors classifier using the Word Mover's Distance.
Parameters
----------

W_embed : array, shape: (vocab_size, embed_size)
    Precomputed word embeddings between vocabulary items.
    Row indices should correspond to the columns in the bag-of-words input.
n_neighbors : int
    Number of neighbors to use by default for :meth:`k_neighbors` queries.
n_jobs : int
    The number of parallel jobs to run for Word Mover's Distance computation.
    If ``-1``, then the number of jobs is set to the number of CPU cores.
verbose : int, optional
    Controls the verbosity; the higher, the more messages. Defaults to 0.

"""

def __init__(self, W_embed, n_neighbors=1, n_jobs=1, verbose=5):
    self.W_embed = W_embed
    self.verbose = verbose
    if n_jobs == -1:
        n_jobs = mp.cpu_count()

    super(WordMoversKNN, self).__init__(n_neighbors=n_neighbors, n_jobs=n_jobs, metric='precomputed', algorithm='brute')

def _wmd(self, i, row, X_train):
    """Compute the WMD between training sample i and given test row.

    Assumes that `row` and train samples are sparse BOW vectors summing to 1.
    """
    union_idx = np.union1d(X_train[i].indices, row.indices)
    W_minimal = self.W_embed[union_idx]
    W_dist = euclidean_distances(W_minimal)
    bow_i = X_train[i, union_idx].A.ravel()
    bow_j = row[:, union_idx].A.ravel()
    return emd(bow_i, bow_j, W_dist)

def _wmd_row(self, row, X_train):
    """Wrapper to compute the WMD of a row with all training samples.

    Assumes that `row` and train samples are sparse BOW vectors summing to 1.
    Useful for parallelization.
    """
    n_samples_train = X_train.shape[0]
    return [self._wmd(i, row, X_train) for i in range(n_samples_train)]

def _pairwise_wmd(self, X_test, X_train=None, ordered=True):
    """Computes the word mover's distance between all train and test points.

    Parallelized over rows of X_test.

    Assumes that train and test samples are sparse BOW vectors summing to 1.

    Parameters
    ----------
    X_test: scipy.sparse matrix, shape: (n_test_samples, vocab_size)
        Test samples.

    X_train: scipy.sparse matrix, shape: (n_train_samples, vocab_size)
        Training samples. If `None`, uses the samples the estimator was fit with.
    ordered: returns result keeping the order of the rows in dist (following X_test).
        Otherwise, the rows of dist follow a potentially random order which does not follow the order
        of indices in X_test. However, computation is faster in this case (asynchronous parallel execution)

    Returns
    -------
    dist : array, shape: (n_test_samples, n_train_samples)
        Distances between all test samples and all train samples.

    """
    n_samples_test = X_test.shape[0]

    if X_train is None: X_train = self._fit_X

    if (self.n_jobs == 1) or (n_samples_test < 2*self.n_jobs):  # to avoid parallelism overhead for small test samples
        dist = [ self._wmd_row( test_sample , X_train ) for test_sample in X_test ]
    else:
        if self.verbose:
            print("WordMoversKNN set to use {} parallel processes".format(self.n_jobs))
        if ordered:
            dist = Parallel(n_jobs=self.n_jobs, verbose=self.verbose)( delayed(self._wmd_row) (test_sample, X_train) for test_sample in X_test)
        else: # Asynchronous call is faster but returns results in random order              
            pool = mp.Pool(processes=self.n_jobs)

            results = [pool.apply_async(self._wmd_row, args=(test_sample, X_train)) for test_sample in X_test]
            dist = [p.get() for p in results]
    return np.array(dist)


def calculate(self, X):
    """Predict the class labels for the provided data
    Parameters
    ----------
    X : scipy.sparse matrix, shape (n_test_samples, vocab_size)
        Test samples.
    Returns
    -------
    y : array of shape [n_samples]
        Class labels for each data sample.
    """
    X = check_array(X, accept_sparse='csr', copy=True)
    X = normalize(X, norm='l1', copy=False)
    dist = self._pairwise_wmd(X)
    # A matrix of distances given to predict in combination with metric = 'precomputed'
    # means that no more distance calculations take place. Neighbors are found simply by sorting
    return super(WordMoversKNN, self).predict(dist)

【问题讨论】:

  • 并行实际上可能比串行慢,以防万一,例如只有一个处理器(例如单核 CPU),因此操作不能并行处理。并行地,切换上下文需要少量时间,而串行处理时不需要这段时间。
  • @dee 谢谢。问题是我尝试在多台(多核)机器上运行相同的代码并使用不同数量的内核,但执行速度总是慢得多。同时,当我为一些玩具示例尝试相同的模块 (multiprocessing) 时(例如,对数组进行排序或在数组列表上应用一些数学函数),我可以看到速度有所提高。
  • multiprocessing 也可能比串行执行慢,如果 IPC 成本(意味着将 test_sampleX_train 复制到子进程,并将从 _wmd_row 返回的列表复制回父进程) 非常高。 IPC 非常慢,因此对于非常大的对象,它可能超过并行计算所获得的收益。如果您有一个庞大的列表需要进行大量不太昂贵的计算,multiprocessing 可能无济于事。

标签: python parallel-processing scikit-learn multiprocessing


【解决方案1】:

主要问题是对于矩阵X_test 的每一行,都会产生一个新进程,每次都需要传递完整的X_train 以及其他变量(例如self.X_embed)每一个过程。由于它们的大小,酸洗和分派这些变量非常耗时。 当我将矩阵X_test 拆分为n_jobs 大小为X_test.shape[0]//n_jobs 的块时,我得到了极大的加速,总体上只产生n_jobs 进程并且必须传递变量n_jobs 次而不是X_test.shape[0] 次。 但是,由于要沟通的变量的大小,我认为对于这类问题,数据并行是比任务并行更合适的方法,因此我打算使用mpi4py,以便每个进程分别创建自己的self.W_embedX_trainX_test 矩阵,只传达计算结果。

【讨论】:

    猜你喜欢
    • 2012-02-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-08-25
    • 2019-06-28
    • 1970-01-01
    相关资源
    最近更新 更多