【问题标题】:Python random sample with a generator / iterable / iterator带有生成器/可迭代/迭代器的 Python 随机样本
【发布时间】:2015-02-10 18:09:11
【问题描述】:

你知道是否有办法让 python 的random.sample 与生成器对象一起工作。我正在尝试从一个非常大的文本语料库中获取随机样本。问题是random.sample() 引发了以下错误。

TypeError: object of type 'generator' has no len()

我在想也许有某种方法可以使用来自 itertools 的东西来做到这一点,但通过一些搜索找不到任何东西。

一个虚构的例子:

import random
def list_item(ls):
    for item in ls:
        yield item

random.sample( list_item(range(100)), 20 )


更新


根据MartinPieters 的要求,我对目前提出的三种方法做了一些时间安排。结果如下。

Sampling 1000 from 10000
Using iterSample 0.0163 s
Using sample_from_iterable 0.0098 s
Using iter_sample_fast 0.0148 s

Sampling 10000 from 100000
Using iterSample 0.1786 s
Using sample_from_iterable 0.1320 s
Using iter_sample_fast 0.1576 s

Sampling 100000 from 1000000
Using iterSample 3.2740 s
Using sample_from_iterable 1.9860 s
Using iter_sample_fast 1.4586 s

Sampling 200000 from 1000000
Using iterSample 7.6115 s
Using sample_from_iterable 3.0663 s
Using iter_sample_fast 1.4101 s

Sampling 500000 from 1000000
Using iterSample 39.2595 s
Using sample_from_iterable 4.9994 s
Using iter_sample_fast 1.2178 s

Sampling 2000000 from 5000000
Using iterSample 798.8016 s
Using sample_from_iterable 28.6618 s
Using iter_sample_fast 6.6482 s

所以事实证明,array.insert 在处理大样本时有一个严重的缺点。我用来计时方法的代码

from heapq import nlargest
import random
import timeit


def iterSample(iterable, samplesize):
    results = []
    for i, v in enumerate(iterable):
        r = random.randint(0, i)
        if r < samplesize:
            if i < samplesize:
                results.insert(r, v) # add first samplesize items in random order
            else:
                results[r] = v # at a decreasing rate, replace random items

    if len(results) < samplesize:
        raise ValueError("Sample larger than population.")

    return results

def sample_from_iterable(iterable, samplesize):
    return (x for _, x in nlargest(samplesize, ((random.random(), x) for x in iterable)))

def iter_sample_fast(iterable, samplesize):
    results = []
    iterator = iter(iterable)
    # Fill in the first samplesize elements:
    for _ in xrange(samplesize):
        results.append(iterator.next())
    random.shuffle(results)  # Randomize their positions
    for i, v in enumerate(iterator, samplesize):
        r = random.randint(0, i)
        if r < samplesize:
            results[r] = v  # at a decreasing rate, replace random items

    if len(results) < samplesize:
        raise ValueError("Sample larger than population.")
    return results

if __name__ == '__main__':
    pop_sizes = [int(10e+3),int(10e+4),int(10e+5),int(10e+5),int(10e+5),int(10e+5)*5]
    k_sizes = [int(10e+2),int(10e+3),int(10e+4),int(10e+4)*2,int(10e+4)*5,int(10e+5)*2]

    for pop_size, k_size in zip(pop_sizes, k_sizes):
        pop = xrange(pop_size)
        k = k_size
        t1 = timeit.Timer(stmt='iterSample(pop, %i)'%(k_size), setup='from __main__ import iterSample,pop')
        t2 = timeit.Timer(stmt='sample_from_iterable(pop, %i)'%(k_size), setup='from __main__ import sample_from_iterable,pop')
        t3 = timeit.Timer(stmt='iter_sample_fast(pop, %i)'%(k_size), setup='from __main__ import iter_sample_fast,pop')

        print 'Sampling', k, 'from', pop_size
        print 'Using iterSample', '%1.4f s'%(t1.timeit(number=100) / 100.0)
        print 'Using sample_from_iterable', '%1.4f s'%(t2.timeit(number=100) / 100.0)
        print 'Using iter_sample_fast', '%1.4f s'%(t3.timeit(number=100) / 100.0)
        print ''

我还进行了一项测试,以检查所有方法确实确实采用了生成器的无偏样本。因此,对于所有方法,我从10000100000 次中采样了1000 元素,并计算了总体中每个项目的平均出现频率,结果为~.1,正如人们对所有三种方法所期望的那样。

【问题讨论】:

  • 你试过random.sample(list(gen), 20) - 它可能不会太慢​​!
  • 你究竟从语料库中采样了什么?除了生成器之外,还有什么方法可以将其表示为其他东西?
  • @larsmans 单词和句子 - 我试图通过使用生成器对象来降低内存消耗。

标签: python random generator


【解决方案1】:

虽然 Martijn Pieters 的答案是正确的,但当 samplesize 变大时它确实会变慢,因为在循环中使用 list.insert 可能具有二次复杂度。

在我看来,这是一种在提高性能的同时保持均匀性的替代方案:

def iter_sample_fast(iterable, samplesize):
    results = []
    iterator = iter(iterable)
    # Fill in the first samplesize elements:
    try:
        for _ in xrange(samplesize):
            results.append(iterator.next())
    except StopIteration:
        raise ValueError("Sample larger than population.")
    random.shuffle(results)  # Randomize their positions
    for i, v in enumerate(iterator, samplesize):
        r = random.randint(0, i)
        if r < samplesize:
            results[r] = v  # at a decreasing rate, replace random items
    return results

对于高于10000samplesize 值,差异慢慢开始显现。拨打(1000000, 100000)的次数:

  • iterSample:5.05 秒
  • iter_sample_fast:2.64 秒

【讨论】:

  • 使用results = list(itertools.islice(iterator, samplesize)) 会产生任何进一步的改进吗?
  • @larsmans:应该是if len(results) &lt; samplesize:,而不是try:/except StopIteration:。如果list(islice()) 比重复的.append() 更快,那是值得的。
  • +1 表示此算法更新。我是 iterSample 的原作者(在 MartijnPieters 链接的较早答案中),虽然使用 list.insert 的初始化代码的复杂性问题发生在我身上,但我从来没有自己解决它。
  • @larsmans:Python 的 random.sample 返回打乱的结果(来自文档:“结果列表按选择顺序排列,因此所有子切片也将是有效的随机样本。”)如果你不这样做需要对结果进行洗牌(例如,对于len(iterable) == samplesize,它们将按照它们进入的确切顺序),然​​后您可以跳过最初的洗牌。
【解决方案2】:

你不能。

您有两种选择:将整个生成器读入一个列表,然后从该列表中采样,或者使用一种方法逐个读取生成器并从中挑选样本:

import random

def iterSample(iterable, samplesize):
    results = []

    for i, v in enumerate(iterable):
        r = random.randint(0, i)
        if r < samplesize:
            if i < samplesize:
                results.insert(r, v) # add first samplesize items in random order
            else:
                results[r] = v # at a decreasing rate, replace random items

    if len(results) < samplesize:
        raise ValueError("Sample larger than population.")

    return results

此方法根据可迭代到目前为止中的项目数调整下一个项目是样本一部分的机会。它不需要在内存中保存超过samplesize 个项目。

解决方案不是我的;它是作为another answer here on SO 的一部分提供的。

【讨论】:

  • 我担心可能是这种情况,但似乎应该在标准库中。
  • @MattiLyra:请随意提议将其添加到标准库中。
  • 所以只是为了检查我是否理解代码的逻辑。它是来自整个生成器的统一样本,因为如果在生成器结束之前达到samplesize,则结果集中的项目被替换,从而允许选择后面的项目?
  • @larsmans:不!插入有助于确保样品均匀。
  • @MattiLyra:当项目很大时,将项目添加到 python 列表不会产生额外费用。见Python Time Complexity;追加是 O(1) 常量成本。
【解决方案3】:

只是为了它,这是一个单行,它从 O(n 中生成的 n 个项目中采样 k 元素而不替换> lg k) 时间:

from heapq import nlargest

def sample_from_iterable(it, k):
    return (x for _, x in nlargest(k, ((random.random(), x) for x in it)))

【讨论】:

  • 所以当您将it 中的每个元素传递到堆时,您会为每个元素提供一个随机键?
  • @MattiLyra:是的。将key=random.random() 传递给nlargest 会更容易,但我担心这会破坏堆不变量。这确实假设您的值在随机键之间具有可比性的情况下具有可比性。
  • @MartijnPieters:它从 2.6 开始。如果您正在查看 heapq.py 源代码,请向下滚动,因为 nlargest 在文件末尾重新定义。
  • 如果您要使用key,则分布将不是正确随机的。对于 random.random() 产生完全相同的浮点数的迭代中的任何值,将始终选择迭代的两个值中的 第一个(因为 nlargest(.., key) 使用 (key(value), [decreasing counter starting at 0], value) 元组)。在您的方法中,在这种情况下,两个值中的 较大的 将是首选。因此,在这两种方法中都存在(永远如此)轻微的偏差。
  • @MartijnPieters:嗯,我想你是对的。但是,通过让random.random 从更大的范围内采样,可以使偏差任意小,所以我认为分布是渐近均匀的:)
【解决方案4】:

我正在尝试从一个非常大的文本语料库中获取随机样本。

Your excellent synthesis answer 当前显示iter_sample_fast(gen, pop) 的胜利。不过,我尝试了 Katriel 推荐的 random.sample(list(gen), pop)——相比之下,它的速度快得惊人!

def iter_sample_easy(iterable, samplesize):
    return random.sample(list(iterable), samplesize)

Sampling 1000 from 10000
Using iter_sample_fast 0.0192 s
Using iter_sample_easy 0.0009 s

Sampling 10000 from 100000
Using iter_sample_fast 0.1807 s
Using iter_sample_easy 0.0103 s

Sampling 100000 from 1000000
Using iter_sample_fast 1.8192 s
Using iter_sample_easy 0.2268 s

Sampling 200000 from 1000000
Using iter_sample_fast 1.7467 s
Using iter_sample_easy 0.3297 s

Sampling 500000 from 1000000
Using iter_sample_easy 0.5628 s

Sampling 2000000 from 5000000
Using iter_sample_easy 2.7147 s

现在,随着您的语料库变得非常大,将整个可迭代对象具体化为 list 将使用大量内存。但是,如果我们能够将问题分块,我们仍然可以利用 Python 的超快性:基本上,我们选择一个“相当小”的CHUNKSIZE,然后在那个大小的块上做random.sample ,然后再次使用random.sample 将它们合并在一起。我们只需要正确设置边界条件即可。

如果list(iterable) 的长度是CHUNKSIZE 的整数倍并且不大于samplesize*CHUNKSIZE,我知道该怎么做:

def iter_sample_dist_naive(iterable, samplesize):
    CHUNKSIZE = 10000
    samples = []
    it = iter(iterable)
    try:
        while True:
            first = next(it)
            chunk = itertools.chain([first], itertools.islice(it, CHUNKSIZE-1))
            samples += iter_sample_easy(chunk, samplesize)
    except StopIteration:
        return random.sample(samples, samplesize)

但是,上面的代码在len(list(iterable)) % CHUNKSIZE != 0 时会产生非均匀采样,并且在len(list(iterable)) * samplesize / CHUNKSIZE 变得“非常大”时会耗尽内存。恐怕修复这些错误超出了我的薪酬等级,但this blog post 中描述了一个解决方案,对我来说听起来很合理。 (搜索词:“分布式随机抽样”、“分布式水库抽样”。)

Sampling 1000 from 10000
Using iter_sample_fast 0.0182 s
Using iter_sample_dist_naive 0.0017 s
Using iter_sample_easy 0.0009 s

Sampling 10000 from 100000
Using iter_sample_fast 0.1830 s
Using iter_sample_dist_naive 0.0402 s
Using iter_sample_easy 0.0103 s

Sampling 100000 from 1000000
Using iter_sample_fast 1.7965 s
Using iter_sample_dist_naive 0.6726 s
Using iter_sample_easy 0.2268 s

Sampling 200000 from 1000000
Using iter_sample_fast 1.7467 s
Using iter_sample_dist_naive 0.8209 s
Using iter_sample_easy 0.3297 s

samplesize 相对于len(list(iterable)) 非常小时,我们真正获胜。

Sampling 20 from 10000
Using iterSample 0.0202 s
Using sample_from_iterable 0.0047 s
Using iter_sample_fast 0.0196 s
Using iter_sample_easy 0.0001 s
Using iter_sample_dist_naive 0.0004 s

Sampling 20 from 100000
Using iterSample 0.2004 s
Using sample_from_iterable 0.0522 s
Using iter_sample_fast 0.1903 s
Using iter_sample_easy 0.0016 s
Using iter_sample_dist_naive 0.0029 s

Sampling 20 from 1000000
Using iterSample 1.9343 s
Using sample_from_iterable 0.4907 s
Using iter_sample_fast 1.9533 s
Using iter_sample_easy 0.0211 s
Using iter_sample_dist_naive 0.0319 s

Sampling 20 from 10000000
Using iterSample 18.6686 s
Using sample_from_iterable 4.8120 s
Using iter_sample_fast 19.3525 s
Using iter_sample_easy 0.3162 s
Using iter_sample_dist_naive 0.3210 s

Sampling 20 from 100000000
Using iter_sample_easy 2.8248 s
Using iter_sample_dist_naive 3.3817 s

【讨论】:

    【解决方案5】:

    如果迭代器中的项目数已知(通过其他地方计算项目),另一种方法是:

    def iter_sample(iterable, iterlen, samplesize):
        if iterlen < samplesize:
            raise ValueError("Sample larger than population.")
        indexes = set()
        while len(indexes) < samplesize:
            indexes.add(random.randint(0,iterlen))
        indexesiter = iter(sorted(indexes))
        current = indexesiter.next()
        ret = []
        for i, item in enumerate(iterable):
            if i == current:
                ret.append(item)
                try:
                    current = indexesiter.next()
                except StopIteration:
                    break
        random.shuffle(ret)
        return ret
    

    我发现这更快,尤其是当 sampsize 相对于 iterlen 较小时。但是,当要求提供整体或接近整体的样本时,就会出现问题。

    iter_sample (iterlen=10000, samplesize=100) 时间: (1, 'ms') iter_sample_fast (iterlen=10000, samplesize=100) 时间:(15, 'ms')

    iter_sample (iterlen=1000000, samplesize=100) 时间: (65, 'ms') iter_sample_fast (iterlen=1000000, samplesize=100) 时间:(1477, 'ms')

    iter_sample (iterlen=1000000, samplesize=1000) 时间: (64, 'ms') iter_sample_fast (iterlen=1000000, samplesize=1000) 时间:(1459, 'ms')

    iter_sample (iterlen=1000000, samplesize=10000) 时间: (86, 'ms') iter_sample_fast (iterlen=1000000, samplesize=10000) 时间:(1480, 'ms')

    iter_sample (iterlen=1000000, samplesize=100000) 时间: (388, 'ms') iter_sample_fast (iterlen=1000000, samplesize=100000) 时间:(1521, 'ms')

    iter_sample (iterlen=1000000, samplesize=1000000) 时间: (25359, 'ms') iter_sample_fast (iterlen=1000000, samplesize=1000000) 时间:(2178, 'ms')

    【讨论】:

      【解决方案6】:

      当您知道生成器有多长(并且将渐近均匀分布)时,最快的方法直到被证明不是这样:

      def gen_sample(generator_list, sample_size, iterlen):
          num = 0
          inds = numpy.random.random(iterlen) <= (sample_size * 1.0 / iterlen)
          results = []
          iterator = iter(generator_list)
          gotten = 0
          while gotten < sample_size: 
              try:
                  b = iterator.next()
                  if inds[num]: 
                      results.append(b)
                      gotten += 1
                  num += 1    
              except: 
                  num = 0
                  iterator = iter(generator_list)
                  inds = numpy.random.random(iterlen) <= ((sample_size - gotten) * 1.0 / iterlen)
          return results
      

      它在小型迭代和大型迭代中都是最快的(可能介于两者之间)

      # Huge
      res = gen_sample(xrange(5000000), 200000, 5000000)
      timing: 1.22s
      
      # Small
      z = gen_sample(xrange(10000), 1000, 10000) 
      timing: 0.000441    
      

      【讨论】:

      • except 应该捕获什么。一个迭代器结束了吗?
      【解决方案7】:

      如果人口规模 n 已知,这里有一些内存高效的代码循环生成器,只提取目标样本:

      from random import sample
      from itertools import count, compress
      
      targets = set(sample(range(n), k=10))
      for selection in compress(pop, map(targets.__contains__, count())):
          print(selection)
      

      这会按照人口生成器生成的顺序输出选择。

      该技术是使用标准库 random.sample() 随机选择目标索引进行选择。第二个like 确定给定索引是否在目标中,如果是,则从生成器中给出相应的值。

      例如,给定目标{6, 2, 4}

      0  1  2  3  4  5  6  7  8  9  10   ...  output of count()
      F  F  T  F  T  F  T  F  F  F  F    ...  is the count in targets?
      A  B  C  D  E  F  G  H  I  J  K    ...  output of the population generator
      -  -  C  -  E  -  G  -  -  -  -    ...  selections emitted by compress
      

      这种技术适用于循环一个太大而无法放入内存的语料库(否则,您可以直接在总体上使用 sample())。

      【讨论】:

        【解决方案8】:

        这是一个完全不同的变体,它使用一个集合作为一桶项目。它首先用 pool 项启动桶,然后从桶中产生样本,从迭代器中替换它们,最后它排出桶的剩余部分。

        HashWrapper 用于隐藏来自set 的不可散列类型。

        class HashWrapper(tuple):
            """Wrap unhashable type."""
            def __hash__(self):
                return id(self)
        
        
        def randomize_iterator(data: Iterator, pool=100) -> Iterator:
            """
            Randomize an iterator.
            """
        
            bucket = set()
            iterator = iter(data)
        
            # Prime the bucket
            for _ in range(pool):
                try:
                    bucket.add(HashWrapper(next(iterator)))
                except StopIteration:
                    # We've drained the iterator
                    break
        
            # Start picking from the bucket and replacing new items from the iterator
            for item in iterator:
                sample, = random.sample(bucket, 1)
                yield sample
                bucket.remove(sample)
                bucket.add(HashWrapper(item))
        
            # Drain the bucket
            yield from random.sample(bucket, len(bucket))
        

        【讨论】:

          猜你喜欢
          • 2018-05-16
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2020-06-29
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多