【发布时间】:2017-08-03 14:01:36
【问题描述】:
我有 size = n 的样本。
我想为每个 i 计算:1 sample[:i] 的中位数。 例如,我计算了每个 i 的平均值:
cummean = np.cumsum(sample) / np.arange(1, n + 1)
我可以对没有循环和理解的中位数做类似的事情吗?
【问题讨论】:
标签: python numpy statistics vectorization
我有 size = n 的样本。
我想为每个 i 计算:1 sample[:i] 的中位数。 例如,我计算了每个 i 的平均值:
cummean = np.cumsum(sample) / np.arange(1, n + 1)
我可以对没有循环和理解的中位数做类似的事情吗?
【问题讨论】:
标签: python numpy statistics vectorization
有一个近似的解决方案。如果您将值列表arr 视为概率质量函数。您可以使用np.cumsum(arr) 获取累积分布函数。根据定义,中位数只是概率的一半。
这给了你一个近似的解决方案
arr[np.searchsorted(np.cumsum(arr), np.cumsum(arr)/2)]
【讨论】:
arr 已经排序,在这种情况下,您可以简单地查看每个 i 的索引 i//2。
知道 Python 有一个 heapq 模块可以让您保持一个可迭代的运行“最小值”,我在 heapq 和 median 上进行了搜索,并找到了 steaming medium 的各种项目。这个:
http://www.ardendertat.com/2011/11/03/programming-interview-questions-13-median-of-integer-stream/
有一个class streamMedian 维护两个heapq,一个是值的下半部分,另一个是上半部分。中位数是一个的“顶部”或两者的平均值。该类有一个insert 方法和一个getMedian 方法。大部分工作都在insert。
我将它复制到 Ipython 会话中,并定义:
def cummedian_stream(b):
S=streamMedian()
ret = []
for item in b:
S.insert(item)
ret.append(S.getMedian())
return np.array(ret)
测试:
In [155]: a = np.random.randint(0,100,(5000))
In [156]: amed = cummedian_stream(a)
In [157]: np.allclose(cummedian_sorted(a), amed)
Out[157]: True
In [158]: timeit cummedian_sorted(a)
1 loop, best of 3: 781 ms per loop
In [159]: timeit cummedian_stream(a)
10 loops, best of 3: 39.6 ms per loop
heapq 流方法更快。
@Uriel 给出的列表理解比较慢。但是如果我用np.median 代替statistics.median 它比@Divakar's 排序解决方案要快:
def fastloop(a):
return np.array([np.median(a[:i+1]) for i in range(len(a))])
In [161]: timeit fastloop(a)
1 loop, best of 3: 360 ms per loop
而且@Paul Panzer's 分区方法也不错,但与流类相比仍然很慢。
In [165]: timeit cummedian_partition(a)
1 loop, best of 3: 391 ms per loop
(如果需要,我可以将 streamMedian 类复制到此答案)。
【讨论】:
numpy 也无法使O(n^2)(或更糟)的算法比——它是什么——O(n log n) 更快甚至更好?
是否有延迟入场的空间?
def cummedian_partition(a):
n = len(a)
assert n%4 == 0 # for simplicity
mn = a.min() - 1
mx = a.max() + 1
h = n//2
N = n + h//2
work = np.empty((h, N), a.dtype)
work[:, :n] = a
work[:, n] = 2*mn - a[0]
i, j = np.tril_indices(h, -1)
work[i, n-1-j] = (2*mn - a[1:h+1])[j]
k, l = np.ogrid[:h, :h//2 - 1]
work[:, n+1:] = np.where(k > 2*l+1, mx, 2 * mn - mx)
out = np.partition(work, (N-n//2-1, N-n//2, h//2-1, h//2), axis=-1)
out = np.r_[2*mn-out[:, h//2: h//2-2:-1], out[::-1, N-n//2-1:N-n//2+1]]
out[::2, 0] = out[::2, 1]
return np.mean(out, axis=-1)
该算法使用具有线性复杂度的分区。需要一些体操,因为np.partition 不支持每行分割点。综合复杂度和所需的内存是二次方的。
与当前最佳时间相比:
for j in (100, 1000, 5000):
a = np.random.randint(0, 100, (j,))
print('size', j)
print('correct', np.allclose(cummedian_partition(a), cummedian_sorted(a)))
print('Divakar', timeit(lambda: cummedian_sorted(a), number=10))
print('PP', timeit(lambda: cummedian_partition(a), number=10))
# size 100
# correct True
# Divakar 0.0022412699763663113
# PP 0.002393342030700296
# size 1000
# correct True
# Divakar 0.20881508802995086
# PP 0.10222102201078087
# size 5000
# correct True
# Divakar 6.158387024013791
# PP 3.437395485001616
【讨论】:
这是一种沿行复制元素以提供2D 数组的方法。然后,我们将用一个大数字填充上三角区域,以便稍后当我们沿每一行对数组进行排序时,基本上会将所有元素排序到对角线元素,并模拟累积窗口。然后,按照median 的定义选择中间一个或两个中间的平均值(对于偶数个元素),我们将获得第一个位置的元素:(0,0),然后是第二行: (1,0) & (1,1) 的平均值,第三行:(2,1),第四行:(3,1) & (3,2) 的平均值,依此类推。因此,我们将从排序后的数组中提取出这些元素,从而得到我们的中值。
因此,实现将是 -
def cummedian_sorted(a):
n = a.size
maxn = a.max()+1
a_tiled_sorted = np.tile(a,n).reshape(-1,n)
mask = np.triu(np.ones((n,n),dtype=bool),1)
a_tiled_sorted[mask] = maxn
a_tiled_sorted.sort(1)
all_rows = a_tiled_sorted[np.arange(n), np.arange(n)//2].astype(float)
idx = np.arange(1,n,2)
even_rows = a_tiled_sorted[idx, np.arange(1,1+(n//2))]
all_rows[idx] += even_rows
all_rows[1::2] /= 2.0
return all_rows
运行时测试
方法-
# Loopy solution from @Uriel's soln
def cummedian_loopy(arr):
return [median(a[:i]) for i in range(1,len(a)+1)]
# Nan-fill based solution from @Nickil Maveli's soln
def cummedian_nanfill(arr):
a = np.tril(arr).astype(float)
a[np.triu_indices(a.shape[0], k=1)] = np.nan
return np.nanmedian(a, axis=1)
时间安排 -
设置#1:
In [43]: a = np.random.randint(0,100,(100))
In [44]: print np.allclose(cummedian_loopy(a), cummedian_sorted(a))
...: print np.allclose(cummedian_loopy(a), cummedian_nanfill(a))
...:
True
True
In [45]: %timeit cummedian_loopy(a)
...: %timeit cummedian_nanfill(a)
...: %timeit cummedian_sorted(a)
...:
1000 loops, best of 3: 856 µs per loop
1000 loops, best of 3: 778 µs per loop
10000 loops, best of 3: 200 µs per loop
设置#2:
In [46]: a = np.random.randint(0,100,(1000))
In [47]: print np.allclose(cummedian_loopy(a), cummedian_sorted(a))
...: print np.allclose(cummedian_loopy(a), cummedian_nanfill(a))
...:
True
True
In [48]: %timeit cummedian_loopy(a)
...: %timeit cummedian_nanfill(a)
...: %timeit cummedian_sorted(a)
...:
10 loops, best of 3: 118 ms per loop
10 loops, best of 3: 47.6 ms per loop
100 loops, best of 3: 18.8 ms per loop
设置#3:
In [49]: a = np.random.randint(0,100,(5000))
In [50]: print np.allclose(cummedian_loopy(a), cummedian_sorted(a))
...: print np.allclose(cummedian_loopy(a), cummedian_nanfill(a))
True
True
In [54]: %timeit cummedian_loopy(a)
...: %timeit cummedian_nanfill(a)
...: %timeit cummedian_sorted(a)
...:
1 loops, best of 3: 3.36 s per loop
1 loops, best of 3: 583 ms per loop
1 loops, best of 3: 521 ms per loop
【讨论】:
loopy 的时间更快,但我使用的是np.median,而不是统计数据。
使用statistics.median 和累积列表理解(请注意,奇数索引包含偶数长度列表的中位数 - 其中中位数是两个中位数元素的平均值,因此它通常以小数而不是整数产生):
>>> from statistics import median
>>> arr = [1, 3, 4, 2, 5, 3, 6]
>>> cum_median = [median(arr[:i+1]) for i in range(len(arr)-1)]
>>> cum_median
[1, 2.0, 3, 2.5, 3, 3.0]
【讨论】:
np.median 在这个角色中比statistics.median 快得多。