【问题标题】:Find all point pairs closer than a given maximum distance找到比给定最大距离更近的所有点对
【发布时间】:2015-12-14 06:21:18
【问题描述】:

我想(有效地)找到所有比某个距离更近的点对max_d。我目前使用cdist的方法是:

import numpy as np
from scipy.spatial.distance import cdist

def close_pairs(X,max_d):
    d = cdist(X,X)

    I,J = (d<max_d).nonzero()
    IJ  = np.sort(np.vstack((I,J)), axis=0)

    # remove diagonal element
    IJ  = IJ[:,np.diff(IJ,axis=0).ravel()<>0]

    # remove duplicate
    dt = np.dtype([('i',int),('j',int)])
    pairs = np.unique(IJ.T.view(dtype=dt)).view(int).reshape(-1,2)

    return pairs

def test():
    X = np.random.rand(100,2)*20
    p = close_pairs(X,2)

    from matplotlib import pyplot as plt
    plt.clf()
    plt.plot(X[:,0],X[:,1],'.r')
    plt.plot(X[p,0].T,X[p,1].T,'-b')

但我认为这是矫枉过正(而且不是很可读),因为大部分工作只是为了消除与自我的距离和重复。

我的主要问题是:有更好的方法吗?

(注意:此时输出的类型(arrayset,...)并不重要)

我目前的想法是使用pdist,它返回一个仅包含正确对的压缩距离数组。但是,一旦我从压缩距离数组中找到合适的坐标k,我如何计算它相当于哪些i,j 对?

所以另一个问题是:有没有一种简单的方法来获取相对于 pdist 输出条目的坐标对列表:

  • 一个函数f(k)-&gt;i,j
  • 这样cdist(X,X)[i,j] = pdist(X)[k]

【问题讨论】:

  • 如果你可以添加一行代码来调用close_pairs 并带有一些正确形状和数据类型的随机数据,那就太棒了:)
  • 我在标签中添加了python,现在它会在此处突出显示语法。在您的实际问题中,您要处理多少点?而且这个计算需要一次,需要循环计算,需要计算多个距离还是多个点集的结果?我已经添加了一个答案,但最好的做法取决于用例......
  • 我目前的需要,应该做一次。
  • 点数通常在 [1000,2000] 中的某处
  • OK 将查看我回到工作机器时的时间。就我个人而言,我认为它在时间上没有太多意义,并且会采用最有可能具有可读性和无错误的任何方法。如果是我的代码,我会使用 kdtree 方法。如果您的 scipy 已过期,我强烈建议您安装 Anaconda Python 发行版。

标签: python numpy scipy coordinates distance


【解决方案1】:

根据我的经验,有两种最快的方法可以在 3D 中找到邻居列表。一种是使用用 C++ 或 Cython 编写的最简单的双循环代码(在我的例子中,两者都是)。它在 N^2 中运行,但对于小型系统来说非常快。另一种方法是使用线性时间算法。 Scipy ckdtree 是一个不错的选择,但有局限性。来自分子动力学软件的邻居列表查找器功能最强大,但很难包装,而且初始化时间可能很慢。

下面我比较四种方法:

  • 朴素的 cython 代码
  • OpenMM 周围的包装器(很难安装,见下文)
  • Scipy.spatial.ckdtree
  • scipy.spatial.distance.pdist

测试设置:n 点分散在一个矩形框中,体积密度为 0.2。系统大小从 10 到 1000000(一百万)个粒子不等。联系半径取自0.5, 1, 2, 4, 7, 10。请注意,因为密度为 0.2,所以在接触半径为 0.5 时,每个粒子平均有大约 0.1 个接触,在 1 = 0.8,在 2 = 6.4,在 10 - 大约 800!对于小型系统,接触发现重复了几次,对于 >30k 粒子的系统进行了一次。如果每次调用的时间超过 5 秒,则运行中止。

设置:双至强 2687Wv3、128GB RAM、Ubuntu 14.04、python 2.7.11、scipy 0.16.0、numpy 1.10.1。没有任何代码使用并行优化(OpenMM 除外,尽管并行部分运行得如此之快,以至于在 CPU 图上甚至都没有注意到,但大部分时间都花在了从 OpenMM 传输数据上)。

结果:请注意,下面的图是对数尺度的,分布在 6 个数量级上。即使是很小的视觉差异实际上也可能是 10 倍。 对于少于 1000 个粒子的系统,Cython 代码总是更快。但是,1000 个粒子后的结果取决于接触半径。 pdist 实现总是比 cython 慢,并且占用更多内存,因为它显式地创建了一个距离矩阵,由于 sqrt 而速度很慢。

  • 在小接触半径(每个粒子ckdtree 是所有系统尺寸的理想选择。
  • 在中等接触半径下,(每个粒子 5-50 个接触)naive cython 实现是最好的,最多 10000 个粒子,然后 OpenMM 开始以大约几个数量级获胜,但 ckdtree 的性能仅差 3-10 倍
  • 在高接触半径(每个粒子>200 个接触)的情况下,幼稚的方法最多可以处理 100k 或 1M 个粒子,那么 OpenMM 可能会胜出。

安装 OpenMM 非常棘手;您可以在http://bitbucket.org/mirnylab/openmm-polymer 文件“contactmaps.py”或自述文件中阅读更多内容。然而,下面的结果表明,对于 N>100k 个粒子,每个粒子只有 5-50 个接触是有利的。

Cython 代码如下:

import numpy as np
cimport numpy as np
cimport cython

cdef extern from "<vector>" namespace "std":
    cdef cppclass vector[T]:
        cppclass iterator:
            T operator*()
            iterator operator++()
            bint operator==(iterator)
            bint operator!=(iterator)
        vector()
        void push_back(T&)
        T& operator[](int)
        T& at(int)
        iterator begin()
        iterator end()

np.import_array() # initialize C API to call PyArray_SimpleNewFromData
cdef public api tonumpyarray(int* data, long long size) with gil:
    if not (data and size >= 0): raise ValueError
    cdef np.npy_intp dims = size
    #NOTE: it doesn't take ownership of `data`. You must free `data` yourself
    return np.PyArray_SimpleNewFromData(1, &dims, np.NPY_INT, <void*>data)

@cython.boundscheck(False)
@cython.wraparound(False)
def contactsCython(inArray, cutoff):
    inArray = np.asarray(inArray, dtype = np.float64, order = "C")
    cdef int N = len(inArray)
    cdef np.ndarray[np.double_t, ndim = 2] data = inArray
    cdef int j,i
    cdef double curdist
    cdef double cutoff2 = cutoff * cutoff  # IMPORTANT to avoid slow sqrt calculation
    cdef vector[int] contacts1
    cdef vector[int] contacts2
    for i in range(N):
        for j in range(i+1, N):
            curdist = (data[i,0] - data[j,0]) **2 +(data[i,1] - data[j,1]) **2 + (data[i,2] - data[j,2]) **2
            if curdist < cutoff2:
                contacts1.push_back(i)
                contacts2.push_back(j)
    cdef int M = len(contacts1)

    cdef np.ndarray[np.int32_t, ndim = 2] contacts = np.zeros((M,2), dtype = np.int32)
    for i in range(M):
        contacts[i,0] = contacts1[i]
        contacts[i,1] = contacts2[i]
    return contacts

Cython 代码的编译(或 makefile):

    cython --cplus fastContacts.pyx
    g++  -g -march=native -Ofast -fpic -c   fastContacts.cpp -o fastContacts.o `python-config --includes`
    g++  -g -march=native -Ofast -shared  -o fastContacts.so  fastContacts.o `python-config --libs`

测试代码:

from __future__ import print_function, division

import signal
import time
from contextlib import contextmanager

import matplotlib
import matplotlib.pyplot as plt
import numpy as np
from scipy.spatial import ckdtree
from scipy.spatial.distance import pdist

from contactmaps import giveContactsOpenMM  # remove this unless you have OpenMM and openmm-polymer libraries installed
from fastContacts import contactsCython


class TimeoutException(Exception): pass


@contextmanager
def time_limit(seconds):
    def signal_handler(signum, frame):
        raise TimeoutException("Timed out!")

    signal.signal(signal.SIGALRM, signal_handler)
    signal.alarm(seconds)
    try:
        yield
    finally:
        signal.alarm(0)


matplotlib.rcParams.update({'font.size': 8})


def close_pairs_ckdtree(X, max_d):
    tree = ckdtree.cKDTree(X)
    pairs = tree.query_pairs(max_d)
    return np.array(list(pairs))


def condensed_to_pair_indices(n, k):
    x = n - (4. * n ** 2 - 4 * n - 8 * k + 1) ** .5 / 2 - .5
    i = x.astype(int)
    j = k + i * (i + 3 - 2 * n) / 2 + 1
    return np.array([i, j]).T


def close_pairs_pdist(X, max_d):
    d = pdist(X)
    k = (d < max_d).nonzero()[0]
    return condensed_to_pair_indices(X.shape[0], k)


a = np.random.random((100, 3)) * 3  # test set
methods = {"cython": contactsCython, "ckdtree": close_pairs_ckdtree, "OpenMM": giveContactsOpenMM,
           "pdist": close_pairs_pdist}

# checking that each method gives the same value
allUniqueInds = []
for ind, method in methods.items():
    contacts = method(a, 1)
    uniqueInds = contacts[:, 0] + 100 * contacts[:, 1]  # unique index of each contacts
    allUniqueInds.append(np.sort(uniqueInds))  # adding sorted unique conatcts
for j in allUniqueInds:
    assert np.allclose(j, allUniqueInds[0])

# now actually doing testing
repeats = [30,30,30, 30, 30, 20,  20,   10,   5,   3,     2 ,       1,     1,      1]
sizes =    [10,30,100, 200, 300,  500, 1000, 2000, 3000, 10000, 30000, 100000, 300000, 1000000]
systems = [[np.random.random((n, 3)) * ((n / 0.2) ** 0.333333) for k in range(repeat)] for n, repeat in
           zip(sizes, repeats)]

for j, radius in enumerate([0.5, 1, 2, 4, 7, 10]):
    plt.subplot(2, 3, j + 1)
    plt.title("Radius = {0}; {1:.2f} cont per particle".format(radius, 0.2 * (4 / 3 * np.pi * radius ** 3)))

    times = {i: [] for i in methods}

    for name, method in methods.items():
        for n, system, repeat in zip(sizes, systems, repeats):
            if name == "pdist" and n > 30000:
                break  # memory issues
            st = time.time()
            try:
                with time_limit(5 * repeat):
                    for ind in range(repeat):
                        k = len(method(system[ind], radius))
            except:
                print("Run aborted")
                break
            end = time.time()
            mytime = (end - st) / repeat
            times[name].append((n, mytime))
            print("{0} radius={1} n={2} time={3} repeat={4} contPerParticle={5}".format(name, radius, n, mytime,repeat, 2 * k / n))

    for name in sorted(times.keys()):
        plt.plot(*zip(*times[name]), label=name)
    plt.xscale("log")
    plt.yscale("log")
    plt.xlabel("System size")
    plt.ylabel("Time (seconds)")
    plt.legend(loc=0)

plt.show()

【讨论】:

  • 看起来 scipy 0.17 附带的 ckdtree 可以选择返回一个 numpy 数组。这应该使它在庞大的联系人列表上更快,因为它将避免从集合到列表到 numpy 数组的转换。
  • 到目前为止,带有 np.ndarray 返回选项的 scipy ckdtree 是我在 3D 中查找最近邻居的默认方法。我已弃用我们聚合物模拟库中的任何其他联系人查找器。
【解决方案2】:

以下是使用 cKDTree 模块的方法。见query_pairs

import numpy as np
from scipy.spatial.distance import cdist
from scipy.spatial import ckdtree


def close_pairs(X,max_d):
    d = cdist(X,X)

    I,J = (d<max_d).nonzero()
    IJ  = np.sort(np.vstack((I,J)), axis=0)

    # remove diagonal element
    IJ  = IJ[:,np.diff(IJ,axis=0).ravel()<>0]

    # remove duplicate
    dt = np.dtype([('i',int),('j',int)])
    pairs = np.unique(IJ.T.view(dtype=dt)).view(int).reshape(-1,2)

    return pairs

def close_pairs_ckdtree(X, max_d):
    tree = ckdtree.cKDTree(X)
    pairs = tree.query_pairs(max_d)
    return np.array(list(pairs))

def test():
    np.random.seed(0)
    X = np.random.rand(100,2)*20
    p = close_pairs(X,2)
    q = close_pairs_ckdtree(X, 2)

    from matplotlib import pyplot as plt
    plt.plot(X[:,0],X[:,1],'.r')
    plt.plot(X[p,0].T,X[p,1].T,'-b')
    plt.figure()
    plt.plot(X[:,0],X[:,1],'.r')
    plt.plot(X[q,0].T,X[q,1].T,'-b')

    plt.show()

t

【讨论】:

  • 用scipy 0.11,我得用KDtree版本,比较慢。 ckdtree 怎么样?
  • 我没有任何数字,但我从经验中知道它有很大的不同(如您所料)
【解决方案3】:

我终于自己找到了。将压缩距离数组中的索引k 转换为平方距离数组中的等效i,j 的函数是:

def condensed_to_pair_indices(n,k):
    x = n-(4.*n**2-4*n-8*k+1)**.5/2-.5
    i = x.astype(int)
    j = k+i*(i+3-2*n)/2+1
    return i,j

我不得不和sympy 玩了一会儿才能找到它。现在,计算所有小于给定距离的点对:

def close_pairs_pdist(X,max_d):
    d = pdist(X)
    k = (d<max_d).nonzero()[0]
    return condensed_to_pair_indices(X.shape[0],k)

正如所料,它比其他方法更有效(但我没有测试ckdtree)。我会更新timeit的答案。

【讨论】:

  • 很好,我找到了另一个方向的公式,但是将其反转组合数学已经有一段时间了;-)
【解决方案4】:

稍微快一点,没有彻底测试时差,但如果我运行它几次,我的方法给出的时间约为 0.0755529403687,而你的方法给出的时间约为 0.0928771495819。我使用 triu 方法去除数组的上三角形(重复项所在的位置),包括对角线(自距离所在的位置),我也不排序,因为如果你绘制它,我是否无关紧要是否按顺序绘制它们。所以我猜它加速了大约 15% 左右

import numpy as np
from scipy.spatial.distance import cdist
from scipy.misc import comb

def close_pairs(X,max_d):
    d = cdist(X,X)
    I,J = (d<max_d).nonzero()
    IJ  = np.sort(np.vstack((I,J)), axis=0)

    # remove diagonal element
    IJ  = IJ[:,np.diff(IJ,axis=0).ravel()<>0]

    # remove duplicate
    dt = np.dtype([('i',int),('j',int)])
    pairs = np.unique(IJ.T.view(dtype=dt)).view(int).reshape(-1,2)

    return pairs


def close_pairs1(X,max_d):
    d = cdist(X,X)
    d1 = np.triu_indices(len(X)) # indices of the upper triangle including the diagonal
    d[d1] = max_d+1 # value that will not get selected when doing d<max_d in the next line
    I,J = (d<max_d).nonzero()
    pairs = np.vstack((I,J)).T
    return pairs

def close_pairs3(X, max_d):
    d = pdist(X)
    n = len(X)
    pairs = np.zeros((0,2))
    for i in range(n):
        for j in range(i+1,n):
            # formula from http://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.distance.squareform.html
            a=d[int(comb(n,2)-comb(n-i,2)+j-i-1+0.1)] # the +0.1 is because otherwise i get floating point trouble
            if(a<max_d):
                pairs = np.r_[pairs, np.array([i,j])[None,:]]
    return pairs

def close_pairs4(X, max_d):
    d = pdist(X)
    n = len(X)
    a = np.where(d<max_d)[0]
    i = np.arange(n)[:,None]
    j = np.arange(n)[None,:]
    b = np.array(comb(n,2)-comb(n-i,2)+j-i-1+0.1, dtype=int)
    d1 = np.tril_indices(n)
    b[d1] = -1

    pairs = np.zeros((0,2), dtype=int)

    # next part is the bottleneck: the np.where each time, 
    for v in a:
        i, j = np.where(v==b) 
        pairs = np.r_[pairs, np.array([i[0],j[0]])[None,:]]
    return pairs

def close_pairs5(X, max_d):
    t0=time.time()
    d = pdist(X)
    n = len(X)
    a = np.where(d<max_d)[0]
    i = np.arange(n)[:,None]
    j = np.arange(n)[None,:]
    t1 = time.time()
    b = np.array(comb(n,2)-comb(n-i,2)+j-i-1+0.1, dtype=int)
    d1 = np.tril_indices(n)
    b[d1] = -1
    t2 = time.time()
    V = b[:,:,None]-a[None,None,:] # takes a little time
    t3 = time.time()
    p = np.where(V==0) # takes most of the time, thought that removing the for-loop from the previous method might improve it, but it does not do that much. This method contains the formula you wanted though, but apparently it is still faster if you use the cdist methods
    t4 = time.time()
    pairs = np.vstack((p[0],p[1])).T
    print t4-t3,t3-t2, t2-t1, t1-t0
    return pairs





def test():
    X = np.random.rand(1000,2)*20
    import time
    t0 = time.time()
    p = close_pairs(X,2)
    t1 = time.time()
    p2 = close_pairs1(X,2)
    t2 = time.time()
    print t2-t1, t1-t0

    from matplotlib import pyplot as plt
    plt.figure()
    plt.clf()
    plt.plot(X[:,0],X[:,1],'.r')
    plt.plot(X[p,0].T,X[p,1].T,'-b')
    plt.figure()
    plt.clf()
    plt.plot(X[:,0],X[:,1],'.r')
    plt.plot(X[p2,0].T,X[p2,1].T,'-b')
    plt.show()



test()

注意:如果你为 1K 点绘制滞后,但它需要 1K 点来比较速度,但如果用 100 点绘制它,我检查它是否正常工作 速度差异大约是百分之十到二十,我认为它不会比这更好,因为我摆脱了所有排序和独特元素的东西,所以大部分时间花费的部分可能是@987654322 @行

编辑:一些更多的测试表明,在那些时候,该 cdist 行占用大约 0.065 秒,而您的方法的其余部分大约是 0.02,而对我来说大约是 0.015 秒左右。结论:您的代码的主要瓶颈是d = cdist(X, X) 行,我更改的内容加速了您获得的其余代码,但主要瓶颈仍然存在

编辑:添加了 close_pairs3 方法,它为您提供了公式,但是速度很慢,(仍然需要弄清楚如何反转该公式,而且它会超快,明天会这样做 - 将使用 np.where( pdist(X)

编辑:添加了方法 close_pairs4,比方法 3 略好,并解释了发生了什么,但速度非常慢,与方法 5 相同,没有那个 for 循环,但仍然很慢

【讨论】:

  • 我的解决方案没有那么慢,所以时间增益并不是最重要的。但是,更具可读性(并且速度更快)是一件好事。
  • @Juh_ 好吧,我想我的代码更短更易读?
  • 我就是这个意思:-)
  • 我认为您应该在不同的答案中发布close_pairs3。现在,在每一步执行 2 个循环和连接数组只会很慢(至少您应该使用 set 而不是连接,或者在最后进行一次连接)。但是感谢这个公式。
  • 但是,您可以从 i,j 计算 k。为了提高效率,应该做的恰恰相反。
【解决方案5】:

我编写了一些代码来比较建议的解决方案。

注意:我使用 scipy 0.11 并且不能使用我预计会更慢的 ckdtree 解决方案(仅 kdtree)。使用 scipy v0.12+ 的任何人都可以运行此代码吗?

import numpy as np
from scipy.spatial.distance import cdist, pdist
from scipy.spatial import ckdtree
from scipy.spatial import kdtree


def close_pairs(X,max_d):
    d = cdist(X,X)

    I,J = (d<max_d).nonzero()
    IJ  = np.sort(np.vstack((I,J)), axis=0)

    # remove diagonal element
    IJ  = IJ[:,np.diff(IJ,axis=0).ravel()<>0]

    # remove duplicate
    dt = np.dtype([('i',int),('j',int)])
    pairs = np.unique(IJ.T.view(dtype=dt)).view(int).reshape(-1,2)

    return pairs


def condensed_to_pair_indices(n,k):
    x = n-(4.*n**2-4*n-8*k+1)**.5/2-.5
    i = x.astype(int)
    j = k+i*(i+3-2*n)/2+1
    return i,j

def close_pairs_pdist(X,max_d):
    d = pdist(X)
    k = (d<max_d).nonzero()[0]
    return condensed_to_pair_indices(X.shape[0],k)


def close_pairs_triu(X,max_d):
    d = cdist(X,X)
    d1 = np.triu_indices(len(X)) # indices of the upper triangle including the diagonal
    d[d1] = max_d+1 # value that will not get selected when doing d<max_d in the next line
    I,J = (d<max_d).nonzero()
    pairs = np.vstack((I,J)).T
    return pairs

def close_pairs_ckdtree(X, max_d):
    tree = ckdtree.cKDTree(X)
    pairs = tree.query_pairs(max_d)
    return pairs       # remove the conversion as it is not required

def close_pairs_kdtree(X, max_d):
    tree  = kdtree.KDTree(X)
    pairs = tree.query_pairs(max_d)
    return pairs       # remove the conversion as it is not required


methods = [close_pairs, close_pairs_pdist, close_pairs_triu, close_pairs_kdtree] #, close_pairs_ckdtree]

def time_test(n=[10,50,100], max_d=[5,10,50], iter_num=100):
    import timeit

    for method in methods:
        print '-- time using ' + method.__name__ + ' ---'
        for ni in n:
            for d in max_d:
                setup = '\n'.join(['import numpy as np','import %s' % __name__,'np.random.seed(0)','X = np.random.rand(%d,2)*100'%ni])
                stmt  = 'close_pairs.%s(X,%f)' % (method.__name__,d)
                time  = timeit.timeit(stmt=stmt, setup=setup, number=iter_num)/iter_num
                print 'n=%3d, max_d=%2d: \t%.2fms' % (ni, d,time*1000)

time_test(iter_num=10,n=[20,100,500],max_d=[1,5,10]) 的输出是:

-- time using close_pairs ---
n= 20, max_d= 1:    0.22ms
n= 20, max_d= 5:    0.16ms
n= 20, max_d=10:    0.21ms
n=100, max_d= 1:    0.41ms
n=100, max_d= 5:    0.53ms
n=100, max_d=10:    0.97ms
n=500, max_d= 1:    7.12ms
n=500, max_d= 5:    12.28ms
n=500, max_d=10:    33.41ms
-- time using close_pairs_pdist ---
n= 20, max_d= 1:    0.11ms
n= 20, max_d= 5:    0.10ms
n= 20, max_d=10:    0.11ms
n=100, max_d= 1:    0.19ms
n=100, max_d= 5:    0.19ms
n=100, max_d=10:    0.19ms
n=500, max_d= 1:    2.31ms
n=500, max_d= 5:    2.82ms
n=500, max_d=10:    2.49ms
-- time using close_pairs_triu ---
n= 20, max_d= 1:    0.17ms
n= 20, max_d= 5:    0.16ms
n= 20, max_d=10:    0.16ms
n=100, max_d= 1:    0.83ms
n=100, max_d= 5:    0.80ms
n=100, max_d=10:    0.80ms
n=500, max_d= 1:    23.64ms
n=500, max_d= 5:    22.87ms
n=500, max_d=10:    22.96ms
-- time using close_pairs_kdtree ---
n= 20, max_d= 1:    1.71ms
n= 20, max_d= 5:    1.69ms
n= 20, max_d=10:    1.96ms
n=100, max_d= 1:    34.99ms
n=100, max_d= 5:    35.47ms
n=100, max_d=10:    34.91ms
n=500, max_d= 1:    253.87ms
n=500, max_d= 5:    255.05ms
n=500, max_d=10:    256.66ms

结论:

但是,ckdtree 方法需要测试。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-08-28
    • 1970-01-01
    • 2021-12-24
    • 2015-12-02
    相关资源
    最近更新 更多