【问题标题】:Group an iterable by a predicate in Python在 Python 中通过谓词对可迭代对象进行分组
【发布时间】:2012-10-08 04:43:37
【问题描述】:

我正在解析这样的文件:

--标题-- 数据1 数据2 --标题-- 数据3 数据4 数据5 --标题-- --标题-- ...

我想要这样的组:

[ [header, data1, data2], [header, data3, data4, data5], [header], [header], ... ]

所以我可以像这样迭代它们:

for grp in group(open('file.txt'), lambda line: 'header' in line):
    for item in grp:
        process(item)

并将检测组逻辑与处理组逻辑分开。

但我需要一个可迭代的可迭代对象,因为这些组可以任意大,我不想存储它们。也就是说,每次遇到“哨兵”或“标题”项目时,我都想将可迭代对象拆分为子组,如谓词所示。似乎这将是一项常见任务,但我找不到有效的 Pythonic 实现。

这是一个愚蠢的追加到列表的实现:

def group(iterable, isstart=lambda x: x):
    """Group `iterable` into groups starting with items where `isstart(item)` is true.

    Start items are included in the group.  The first group may or may not have a 
    start item.  An empty `iterable` results in an empty result (zero groups)."""
    items = []
    for item in iterable:
        if isstart(item) and items:
            yield iter(items)
            items = []
        items.append(item)
    if items:
        yield iter(items) 

感觉必须有一个不错的itertools 版本,但它让我望而却步。 '明显' (?!) groupby 解决方案似乎不起作用,因为可能存在相邻的标题,并且它们需要分成不同的组。我能想到的最好的方法是(ab)使用groupby 和一个保持计数器的关键功能:

def igroup(iterable, isstart=lambda x: x):
    def keyfunc(item):
        if isstart(item):
            keyfunc.groupnum += 1       # Python 2's closures leave something to be desired
        return keyfunc.groupnum
    keyfunc.groupnum = 0
    return (group for _, group in itertools.groupby(iterable, keyfunc))

但我觉得 Python 可以做得更好——遗憾的是,这比哑列表版本还要慢:

#ipython %time deque(group(xrange(10 ** 7), lambda x: x % 1000 == 0), maxlen=0) CPU 时间:用户 4.20 秒,系统:0.03 秒,总计:4.23 秒 %time deque(igroup(xrange(10 ** 7), lambda x: x % 1000 == 0), maxlen=0) CPU 时间:用户 5.45 秒,系统:0.01 秒,总计:5.46 秒

为了方便您,这里有一些单元测试代码:

class Test(unittest.TestCase):
    def test_group(self):
        MAXINT, MAXLEN, NUMTRIALS = 100, 100000, 21
        isstart = lambda x: x == 0
        self.assertEqual(next(igroup([], isstart), None), None)
        self.assertEqual([list(grp) for grp in igroup([0] * 3, isstart)], [[0]] * 3)
        self.assertEqual([list(grp) for grp in igroup([1] * 3, isstart)], [[1] * 3])
        self.assertEqual(len(list(igroup([0,1,2] * 3, isstart))), 3)        # Catch hangs when groups are not consumed
        for _ in xrange(NUMTRIALS):
            expected, items = itertools.tee(itertools.starmap(random.randint, itertools.repeat((0, MAXINT), random.randint(0, MAXLEN))))
            for grpnum, grp in enumerate(igroup(items, isstart)):
                start = next(grp)
                self.assertTrue(isstart(start) or grpnum == 0)
                self.assertEqual(start, next(expected))
                for item in grp:
                    self.assertFalse(isstart(item))
                    self.assertEqual(item, next(expected))

那么:如何在 Python 中通过谓词优雅高效地对可迭代对象进行子分组?

【问题讨论】:

  • 您的“附加到列表”版本与您所说的不一致。它将源迭代中的每个项目生成为一个项目列表。你能澄清你想要做什么吗?为什么不举一个例子说明你打算如何使用结果(即,你打算用嵌套的 for 循环或什么来迭代它)?
  • @BrenBarn:生成器将[1, 0, 0, 0, 1, 0, 0, 1, 0, 1, 0] 转换为[[1, 0, 0, 0], [1, 0, 0], [1, 0], [1, 0]]
  • 啊,我明白了,我没有注意到默认的isstart 在做什么。但是最好有一个你希望如何使用它的例子。
  • @BrenBarn:当元素表示一个部分时,第二个参数返回True,因此对于那个特定示例,我使用了igroup(l, lambda x: x == 1))。我想列表版本的行为相同。
  • 你说得对,我不是很清楚;我添加了示例用法,也使示例更加困难。 :)

标签: python performance iterator


【解决方案1】:

如何在 Python 中通过谓词优雅高效地对可迭代对象进行子分组?

这是一个简洁、节省内存的实现,与您的问题非常相似:

from itertools import groupby, imap
from operator import itemgetter

def igroup(iterable, isstart):
    def key(item, count=[False]):
        if isstart(item):
           count[0] = not count[0] # start new group
        return count[0]
    return imap(itemgetter(1), groupby(iterable, key))

它支持无限组。

基于tee 的解决方案稍微快一些,但它会消耗当前组的内存(类似于问题中基于list 的解决方案):

from itertools import islice, tee

def group(iterable, isstart):
    it, it2 = tee(iterable)
    count = 0
    for item in it:
        if isstart(item) and count:
            gr = islice(it2, count)
            yield gr
            for _ in gr:  # skip to the next group
                pass
            count = 0
        count += 1
    if count:
       gr = islice(it2, count)
       yield gr
       for _ in gr:  # skip to the next group
           pass

groupby-solution 可以用纯 Python 实现:

def igroup_inline_key(iterable, isstart):
    it = iter(iterable)

    def grouper():
        """Yield items from a single group."""
        while not p[START]:
            yield p[VALUE]  # each group has at least one element (a header)
            p[VALUE] = next(it)
            p[START] = isstart(p[VALUE])

    p = [None]*2 # workaround the absence of `nonlocal` keyword in Python 2.x
    START, VALUE = 0, 1
    p[VALUE] = next(it)
    while True:
        p[START] = False # to distinguish EOF and a start of new group
        yield grouper()
        while not p[START]: # skip to the next group
            p[VALUE] = next(it)
            p[START] = isstart(p[VALUE])

为避免重复代码,while True 循环可以写成:

while True:
    p[START] = False  # to distinguish EOF and a start of new group
    g = grouper()
    yield g
    if not p[START]:  # skip to the next group
        for _ in g:
            pass
        if not p[START]:  # EOF
            break

虽然以前的变体可能更明确和可读。

我认为纯 Python 中的通用内存效率解决方案不会比基于 groupby 的解决方案快得多。

如果process(item)igroup() 快,并且可以在字符串中有效地找到标头(例如,对于固定的静态标头),则you could improve performance by reading your file in large chunks and splitting on the header value。它应该使您的任务受 IO 限制。

【讨论】:

  • 感谢您的调查。我只是想看看是否有一些聪明的 chain-zip-slice-groupby-whatever 咒语明显更好,但听起来好像没有。无论出于何种原因,对我来说,使用属性和生成器都比使用参数和 itemgetter 更快。
  • 请注意,与groupby 一样,如果您尝试列出外部可迭代对象,这将消耗源可迭代对象。因此,尽管您可以对其执行list() 以获得长度等于组数的列表,但您实际上不能在该列表中使用生成的子迭代,因为 groupby 已经用尽了它们以便对它们进行分组.我想这取决于你想做什么。
  • @BrenBarn:是正确的。 groupby docs mention it:«因为源是共享的,所以当 groupby() 对象前进时,之前的组不再可见。因此,如果以后需要该数据,则应将其存储为列表»。不使用groupby的解决方案直接有“跳到下一组”注释。
【解决方案2】:

我没有完全阅读您的所有代码,但我认为这可能会有所帮助:

from itertools import izip, tee, chain


def pairwise(iterable):
    a, b = tee(iterable)
    return izip(a, chain(b, [next(b, None)]))


def group(iterable, isstart):

    pairs = pairwise(iterable)

    def extract(current, lookahead, pairs=pairs, isstart=isstart):
        yield current
        if isstart(lookahead):
            return
        for current, lookahead in pairs:
            yield current
            if isstart(lookahead):
                return

    for start, lookahead in pairs:
        gen = extract(start, lookahead)
        yield gen
        for _ in gen:
            pass


for gen in group(xrange(4, 16), lambda x: x % 5 == 0):
    print '------------------'
    for n in gen:
        print n

print [list(g) for g in group([], lambda x: x % 5 == 0)]

结果:

$ python gen.py
------------------
4
------------------
5
6
7
8
9
------------------
10
11
12
13
14
------------------
15
[]

编辑:

这是另一种解决方案,与上述类似,但没有 pairwise() 和哨兵。不知道哪个更快:

def group(iterable, isstart):

    sentinel = object()

    def interleave(iterable=iterable, isstart=isstart, sentinel=sentinel):
        for item in iterable:
            if isstart(item):
                yield sentinel
            yield item

    items = interleave()

    def extract(item, items=items, isstart=isstart, sentinel=sentinel):
        if item is not sentinel:
            yield item
        for item in items:
            if item is sentinel:
                return
            yield item

    for lookahead in items:
        gen = extract(lookahead)
        yield gen
        for _ in gen:
            pass

现在两者都通过了测试用例,这要感谢 J.F.Sebastians 关于耗尽跳过子组生成器的想法。

【讨论】:

  • 1. [list(g) for g in group([0,0,0], lambda x: x == 0)] -> [[0, 0], [0]] 2. CPU times: user 8.16 s, sys: 0.04 s, total: 8.20 s 3. $ wc -l gen.py -> 18
  • @DoctorJ:你是对的。我想,我只是觉得有点懒散……我现在就修。
  • 我的代码中有一个奇怪的错误,例如。当第一个项目是 4 时。现在已修复。
  • 更近了!当iterable 为空时呢? [list(g) for g in group([], lambda x: x == 0)] -> [[]]。 :( 另外,你的表现已经倒退了:CPU times: user 13.15 s, sys: 0.01 s, total: 13.16 s.
  • @DoctorJ:这是我目前能做的最好的事情。我不认为它会起作用,除非你完全按照创建的顺序完全消耗每个组的每个项目。当您有两组并从第一个中读取一个项目,然后从第二个中读取一个,然后再从第一个中读取一个,顺序将被搞砸。那是因为这个东西是真正动态的,没有任何东西(除了前瞻项)被缓存,而且一切都在同一个迭代器上运行。
【解决方案3】:

关键是你必须编写一个生成子生成器的生成器。我的解决方案在概念上与@pillmuncher 的解决方案相似,但更加独立,因为它避免使用 itertools 机器制作辅助生成器。缺点是我必须使用一个有点不雅的临时列表。在 Python 3 中,使用nonlocal 可能会做得更好。

def grouper(iterable, isstart):
    it = iter(iterable)
    last = [next(it)]
    def subgroup():
        while True:
            toYield = last[0]
            try:
                last.append(next(it))
            except StopIteration, e:
                last.pop(0)
                yield toYield
                raise StopIteration
            else:
                yield toYield
                last.pop(0)
            if isstart(last[0]):
                raise StopIteration
    while True:
        sg = subgroup()
        yield sg
        if len(last) == 2:
            # subgenerator was aborted before completion, let's finish it
            for a in sg:
                pass
        if last:
            # sub-generator left next element waiting, next sub-generator will yield it
            pass
        else:
            # sub-generator left "last" empty because source iterable was exhausted
            raise StopIteration

>>> for g in grouper([0, 1, 1, 0, 1, 0, 1, 1, 1, 1, 0], lambda x: x==0):
...     print "Group",
...     for i in g:
...         print i,
...     print
Group 0 1 1
Group 0 1
Group 0 1 1 1 1
Group 0

我不知道这在性能方面是什么样的,我只是这样做是因为它只是一件有趣的事情。

编辑:我在你原来的两个和我的上运行了你的单元测试。看起来我的比你的 igroup 快一点,但仍然比基于列表的版本慢。在这里,您必须在速度和内存之间做出权衡,这似乎很自然。如果您知道组不会太大,请使用基于列表的版本来提高速度。如果组可能很大,请使用基于生成器的版本来降低内存使用量。

编辑:上面的编辑版本以不同的方式处理中断。如果你跳出子生成器但恢复外部生成器,它将跳过中止组的其余部分并从下一个组开始:

>>> for g in grouper([0, 1, 2, 88, 3, 0, 1, 88, 2, 3, 4, 0, 1, 2, 3, 88, 4], lambda x: x==0):
...     print "Group",
...     for i in g:
...         print i,
...         if i==88:
...             break
...     print
Group 0 1 2 88
Group 0 1 88
Group 0 1 2 3 88

【讨论】:

  • 几乎!不幸的是,如果您没有完全消耗每个组,这将挂起。我没有提到这是一个要求,但这是一个很好的测试用例。 :) 此外,单元测试是随机的,不是一个好的性能测试。 1. %time deque(chain.from_iterable(grouper(xrange(10**7), lambda x: x % 1000 == 0)), maxlen=0) -> CPU times: user 10.11 s, sys: 0.05 s, total: 10.16 s 2. $ wc -l grouper.py -> 22.
  • @DoctorJ:“挂起”是什么意思?
  • @DoctorJ:你的意思是如果你在子生成器的中间中断,它也会结束外部生成器?我不会真正称其为“挂起”,但我看到我的解决方案确实做到了这一点。我用一个不同的版本编辑了我的答案:中止子生成器但继续使用外部生成器现在在下一组的开头恢复。
  • 我的意思是:list(grouper([0,1,2], lambda x: x == 0)) 永远循环,耗尽我的内存并破坏我的机器。 :( 尝试更新的单元测试。
  • @DoctorJ:你真的需要清楚你的要求。我认为这件事的全部意义在于你不会在这件事上打电话给list。如果你列出它,你想让它做什么?将子生成器耗尽到子列表中?
【解决方案4】:

所以这是另一个版本,它试图将来自groupbychain 的子组对拼接在一起。对于给定的性能测试,它明显更快,但在有许多小组时要慢得多(比如isstart = lambda x: x % 2 == 0)。它欺骗和缓冲重复的标题(你可以用 read-all-but-last 迭代器技巧来解决这个问题)。也是优雅系的退步,所以我觉得还是更喜欢原版的。

def group2(iterable, isstart=lambda x: x):
    groups = itertools.groupby(iterable, isstart)
    start, group = next(groups)
    if not start:                   # Deal with initial non-start group
        yield group
        _, group = next(groups)
    groups = (grp for _, grp in groups)
    while True:                     # group will always be start item(s) now      
        group = list(group)         
        for item in group[0:-1]:    # Back-to-back start items... and hope this doesn't get very big.  :)
            yield iter([item])      
        yield itertools.chain([group[-1]], next(groups, []))       # Start item plus subsequent non-start items
        group = next(groups)
%time deque(group2(xrange(10 ** 7), lambda x: x % 1000 == 0), maxlen=0) CPU 时间:用户 3.13 秒,系统:0.00 秒,总计:3.13 秒

【讨论】:

  • here's my entry 用于简单问题竞赛的拜占庭式解决方案;)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-12-20
  • 1970-01-01
相关资源
最近更新 更多