【问题标题】:Avoid Race Condition in Numba避免 Numba 中的竞争条件
【发布时间】:2020-04-22 18:54:46
【问题描述】:

这是一个玩具njit 函数,它接收一个距离矩阵,循环遍历矩阵的每一行,并记录每列中的最小值以及最小值来自哪一行。但是,IIUC 使用 prange 可能会导致竞争条件(尤其是对于较大的输入数组):

from numba import njit, prange
import numpy as np

@njit
def some_transformation_func(D, row_i):
   """
   This function applies some transformation to the ith row (`row_i`) in the `D` matrix in place. 
   However, the transformation time is random (but all less than a second), which means 
   that the rows can take
   """

    # Apply some inplace transformation on the ith row of D


@njit(parallel=True)
def some_func(D):
    P = np.empty((D.shape[1]))
    I = np.empty((D.shape[1]), np.int64)
    P[:] = np.inf
    I[:] = -1

    for row in prange(D.shape[0]):
        some_transformation_func(D, row)
        for col in range(D.shape[1]):
            if P[col] > D[row, col]:
                P[col] = D[row, col]
                I[col] = row

    return P, I

if __name__ == "__main__":
    D = np.array([[4,1,6,9,9], 
                  [1,3,8,2,7], 
                  [2,8,0,0,1],
                  [3,7,4,6,5]
                 ])
    P, I = some_func(D)
    print(P)
    print(I)

    # [1. 1. 0. 0. 1.]
    # [1 0 2 2 2]

我如何确认是否存在竞争条件(特别是如果 D 非常大且包含更多行和列)?而且,更重要的是,如果存在竞态条件,我该如何避免呢?

【问题讨论】:

  • 不写答案,因为我不是 Numba 专家,阅读 stackoverflow.com/questions/59596794/… 让我相信你的行 P[col] > D[row, col]: P[col] = D[row, col] 是一场比赛,因为它们是两条独立的行,没有锁定,读取和写入相同的记忆。
  • 至于测试,就像你说的,随着数组变大,不达到竞争条件不一致的概率会降低。我建议一个简单的连续实现,并在非常大的数组上比较使用numpy.testing.assert_almost_equal。如果相等,那么比赛的概率非常低。
  • 还有一点,在python中,与Matlab不同,在局部变量名中使用大写字母被认为是非常规的。
  • 如果有任何帮助,请告诉我
  • 感谢您的回复。但是我真正需要帮助我们如何避免竞争条件(不损失并行性能)

标签: python race-condition numba


【解决方案1】:

在这些情况下,与其将prange设置为数组的大小,不如手动将数据分块成n_threads个数的块,然后相应地分配处理,最后执行减少。所以,是这样的:

from numba import njit, prange, config
import numpy as np

@njit
def wrapper_func(thread_idx, start_indices, stop_indices, D, P, I):
    for row in range(start_indices[thread_idx], stop_indices[thread_idx]):
        some_transformation_func(D, row)
        for col in range(D.shape[1]):
            if P[thread_idx, col] > D[row, col]:
                P[thread_idx, col] = D[row, col]
                I[thread_idx, col] = row


@njit
def some_transformation_func(D, row_i):
   """
   This function applies some transformation to the ith row (`row_i`) in the `D` matrix in place. 
   However, the transformation time is random (but all less than a second), which means 
   that the rows can take
   """

    # Apply some inplace transformation on the ith row of D


@njit(parallel=True)
def some_func(D):
    n_threads = config.NUMBA_NUM_THREADS  # Let's assume that there are 2 threads
    P = np.empty((n_threads, D.shape[1]))
    I = np.empty((n_threads, D.shape[1]), np.int64)
    P[:, :] = np.inf
    I[:, :] = -1

    start_indices = np.array([0, 2], np.int64)
    stop_indices = np.array([2, 4], np.int64)  # Note that these are exclusive

    for thread_idx in prange(n_threads):
        wrapper_func(thread_idx, start_indices, stop_indices, D, P, I)

    # Perform reduction from all threads and store results in P[0]
    for thread_idx in range(1, n_threads):
        for i in prange(l):
            if P[0, i] > P[thread_idx, i]:
                P[0, i] = P[thread_idx, i]
                I[0, i] = I[thread_idx, i]

    return P[0], I[0]

if __name__ == "__main__":
    D = np.array([[4,1,6,9,9], 
                  [1,3,8,2,7], 
                  [2,8,0,0,1],
                  [3,7,4,6,5]
                 ])
    P, I = some_func(D)
    print(P)
    print(I)

    # [1. 1. 0. 0. 1.]
    # [1 0 2 2 2]

请注意,这将花费您更多的内存(确切地说是n_threads 更多内存),但您会从并行化中受益。此外,代码变得更简洁,更易于维护。需要做的是找出将数据分块并确定start_rowstop_row(专有)索引的最佳方法。

【讨论】:

    猜你喜欢
    • 2020-01-16
    • 1970-01-01
    • 2015-01-30
    • 2010-09-25
    • 1970-01-01
    • 2010-09-25
    • 2019-06-12
    • 1970-01-01
    相关资源
    最近更新 更多