【问题标题】:Weighted random sample in pythonpython中的加权随机样本
【发布时间】:2012-10-14 09:50:18
【问题描述】:

我正在寻找一个函数 weighted_sample 的合理定义,它不会只返回给定权重列表的一个随机索引(类似于

def weighted_choice(weights, random=random):
    """ Given a list of weights [w_0, w_1, ..., w_n-1],
        return an index i in range(n) with probability proportional to w_i. """
    rnd = random.random() * sum(weights)
    for i, w in enumerate(weights):
        if w<0:
            raise ValueError("Negative weight encountered.")
        rnd -= w
        if rnd < 0:
            return i
    raise ValueError("Sum of weights is not positive")

给出一个具有恒定权重的分类分布)但是其中的k 的随机样本,没有替换,就像random.samplerandom.choice 相比的行为。

正如weighted_choice可以写成

lambda weights: random.choice([val for val, cnt in enumerate(weights)
    for i in range(cnt)])

weighted_sample 可以写成

lambda weights, k: random.sample([val for val, cnt in enumerate(weights)
    for i in range(cnt)], k)

但我想要一个不需要我将权重分解为(可能很大)列表的解决方案。

编辑:如果有任何不错的算法可以返回直方图/频率列表(与参数weights 格式相同)而不是索引序列,那也将非常有用。

【问题讨论】:

  • 谢谢,“堆”解决方案除了我正在寻找的一个小修改之外。 (我仍然需要正确理解它,但是在将h[i].w = 0 替换为h[i][2] -= 1 和相关内容之后,这看起来不错。)
  • 相关:Data structure for loaded dice?。它提到了 O(n) 初始化,然后是每个骰子方法 O(1)
  • @JFSebastian:只是远程相关,因为这里的重点是我正在寻找一个权重每一步都改变的解决方案,因为我“在绘制 没有 替换”,因此每个时间步都需要重新初始化数据结构,其中一个权重下降了一个。 (如果您知道如何使问题更清楚,请编辑。)

标签: python algorithm random


【解决方案1】:

来自您的代码:..

weight_sample_indexes = lambda weights, k: random.sample([val 
        for val, cnt in enumerate(weights) for i in range(cnt)], k)

.. 我假设权重是正整数,并且“不替换”是指不替换解开的序列。

这是一个基于 random.sample 和 O(log n) __getitem__ 的解决方案:

import bisect
import random
from collections import Counter, Sequence

def weighted_sample(population, weights, k):
    return random.sample(WeightedPopulation(population, weights), k)

class WeightedPopulation(Sequence):
    def __init__(self, population, weights):
        assert len(population) == len(weights) > 0
        self.population = population
        self.cumweights = []
        cumsum = 0 # compute cumulative weight
        for w in weights:
            cumsum += w   
            self.cumweights.append(cumsum)  
    def __len__(self):
        return self.cumweights[-1]
    def __getitem__(self, i):
        if not 0 <= i < len(self):
            raise IndexError(i)
        return self.population[bisect.bisect(self.cumweights, i)]

示例

total = Counter()
for _ in range(1000):
    sample = weighted_sample("abc", [1,10,2], 5)
    total.update(sample)
print(sample)
print("Frequences %s" % (dict(Counter(sample)),))

# Check that values are sane
print("Total " + ', '.join("%s: %.0f" % (val, count * 1.0 / min(total.values()))
                           for val, count in total.most_common()))

输出

['b', 'b', 'b', 'c', 'c']
Frequences {'c': 2, 'b': 3}
Total b: 10, c: 2, a: 1

【讨论】:

    【解决方案2】:

    您要创建的是非均匀随机分布。这样做的一种不好的方法是创建一个巨大的数组,其中输出符号与权重成比例。因此,如果 a 的可能性是 b 的 5 倍,那么您将创建一个数组,其中 a 的数量是 b 的 5 倍。这适用于权重甚至是彼此的倍数的简单分布。如果你想要 99.99% a 和 0.01% b 怎么办?您必须创建 10000 个插槽。

    有更好的方法。所有具有 N 个符号的非均匀分布都可以分解为一系列 n-1 个二元分布,其中每一个的概率均等。

    因此,如果您有这样的分解,您首先会通过从 1 - N-1 生成均匀随机数来随机选择二进制分布

    u32 dist = randInRange( 1, N-1 ); // generate a random number from 1 to N;
    

    然后说选择的分布是具有两个符号 a 和 b 的二元分布,a 的概率为 0-alpha,b 的概率为 alpha-1:

    float f = randomFloat();
    return ( f > alpha ) ? b : a;
    

    如何分解任何非均匀随机分布有点复杂。本质上,您创建了 N-1 个“桶”。选择概率最低的符号和概率最高的符号,并将它们的权重按比例分配到第一个二进制分布中。然后删除最小的符号,并删除用于创建此二进制分布的较大符号的权重。并重复此过程,直到没有符号为止。

    如果您想使用此解决方案,我可以为此发布 c++ 代码。

    【讨论】:

    • 这个答案的一个问题是我需要无替换地采样,这意味着概率(或绝对频率,它们都是非负整数)在每个选择步骤。
    • 通过将概率分布分解为二进制分布的解决方案,您可以在恒定时间内更新任何单个概率的变化。因此,如果您每一步只有一个概率变化,您可以在每一步中以恒定时间更新分布。
    【解决方案3】:

    如果你为random.sample()构造了正确的数据结构来操作,你根本不需要定义一个新的函数。只需使用random.sample()

    这里,__getitem__() 是 O(n),其中 n 是您拥有的不同重量的物品的数量。但它在内存中很紧凑,只需要存储(weight, value) 对。我在实践中使用过类似的课程,对于我的目的来说它非常快。请注意,此实现采用整数权重。

    class SparseDistribution(object):
        _cached_length = None
    
        def __init__(self, weighted_items):
            # weighted items are (weight, value) pairs
            self._weighted_items = []
            for item in weighted_items:
                self.append(item)
    
        def append(self, weighted_item):
            self._weighted_items.append(weighted_item)
            self.__dict__.pop("_cached_length", None)
    
        def __len__(self):
            if self._cached_length is None:
                length = 0
                for w, v in self._weighted_items:
                    length += w
                self._cached_length = length
            return self._cached_length
    
        def __getitem__(self, index):
            if index < 0 or index >= len(self):
                raise IndexError(index)
            for w, v in self._weighted_items:
                if index < w:
                    return v
            raise Exception("Shouldn't have happened")
    
        def __iter__(self):
            for w, v in self._weighted_items:
                for _ in xrange(w):
                    yield v
    

    那么,我们就可以使用它了:

    import random
    
    d = SparseDistribution([(5, "a"), (2, "b")])
    d.append((3, "c"))
    
    for num in (3, 5, 10, 11):
        try:
            print random.sample(d, num)
        except Exception as e:
            print "{}({!r})".format(type(e).__name__, str(e))
    

    导致:

    ['a', 'a', 'b']
    ['b', 'a', 'c', 'a', 'b']
    ['a', 'c', 'a', 'c', 'a', 'b', 'a', 'a', 'b', 'c']
    ValueError('sample larger than population')
    

    【讨论】:

      【解决方案4】:

      由于我目前对结果的直方图最感兴趣,因此我想到了使用numpy.random.hypergeometric 的以下解决方案(不幸的是,对于ngood &lt; 1nbad &lt; 1nsample &lt; 1 的边界情况,它的行为很糟糕,所以这些情况需要单独检查。)

      def weighted_sample_histogram(frequencies, k, random=numpy.random):
          """ Given a sequence of absolute frequencies [w_0, w_1, ..., w_n-1],
          return a generator [s_0, s_1, ..., s_n-1] where the number s_i gives the
          absolute frequency of drawing the index i from an urn in which that index is
          represented by w_i balls, when drawing k balls without replacement. """
          W = sum(frequencies)
          if k > W:
              raise ValueError("Sum of absolute frequencies less than number of samples")
          for frequency in frequencies:
              if k < 1 or frequency < 1:
                  yield 0
              else:
                  W -= frequency
                  if W < 1:
                      good = k
                  else:
                      good = random.hypergeometric(frequency, W, k)
                  k -= good
                  yield good
          raise StopIteration
      

      我很乐意接受任何关于如何改进这一点或为什么这可能不是一个好的解决方案的 cmet。

      实现这个(以及其他加权随机事物)的 python 包现在位于http://github.com/Anaphory/weighted_choice

      【讨论】:

        【解决方案5】:

        另一种解决方案

        from typing import List, Any
        import numpy as np
        
        def weighted_sample(choices: List[Any], probs: List[float]):
            """
            Sample from `choices` with probability according to `probs`
            """
            probs = np.concatenate(([0], np.cumsum(probs)))
            r = random.random()
            for j in range(len(choices) + 1):
                if probs[j] < r <= probs[j + 1]:
                    return choices[j]
        

        例子:

        aa = [0,1,2,3]
        probs = [0.1, 0.8, 0.0, 0.1]
        np.average([weighted_sample(aa, probs) for _ in range(10000)])
        
        Out: 1.0993
        

        【讨论】:

          【解决方案6】:

          采样速度非常快。所以除非你有很多兆字节要处理,否则 sample() 应该没问题。

          在我的机器上,从 10000000 个长度为 100 的样本中生成 1000 个样本需要 1.655 秒。 从 10000000 个元素中遍历 100000 个长度为 100 的样本耗时 12.98 秒。

          from random import sample,random
          from time import time
          
          def generate(n1,n2,n3):
              w = [random() for x in range(n1)]
          
              print len(w)
          
              samples = list()
              for i in range(0,n2):
                  s = sample(w,n3)
                  samples.append(s)
          
              return samples
          
          start = time()
          size_set = 10**7
          num_samples = 10**5
          length_sample = 100
          samples = generate(size_set,num_samples,length_sample)
          end = time()
          
          allsum=0
          for row in samples:
              sum = reduce(lambda x, y: x+y,row)
              allsum+=sum
          
          print 'sum of all elements',allsum
          
          print '%f seconds for %i samples of %i length %i'%((end-start),size_set,num_sam\
          ples,length_sample)
          

          【讨论】:

          • 下次请使用timeit模块计算速度,更值得信赖。另请注意,您在这里没有进行加权抽样,这是本问题的主题。
          • Re time(): 1ms 不够准确?
          • Re weighted_sampling:如果提问的人准确,我可以给出准确的答案。完全不清楚加权抽样在这里应该意味着什么。无论如何,对于大多数用途来说,sample 都足够快。
          • Re time():做一个样本是不够的。 zedshaw.com/essays/programmer_stats.html另外,帖子是准确的。
          猜你喜欢
          • 2017-09-18
          • 2011-09-19
          • 2012-01-31
          • 2016-03-11
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2021-03-10
          相关资源
          最近更新 更多