【问题标题】:MRJob - Iterating over valuesMRJob - 迭代值
【发布时间】:2026-01-13 00:05:02
【问题描述】:

输入(姓名;日期;花费):

Alice;01/01/2020;100
Alice;02/01/2020;30
Alice;24/01/2020;50
Bob;24/01/2020;1500
Bob;24/01/2020;12
Bob;25/01/2020;16
Bob;25/01/2020;83
Bob;25/01/2020;91
Alice;13/02/2020;10
Alice;25/02/2020;3

输出必须是在至少 5 个不同的日子里购买的人的姓名。所以对于这个输入只有 Alice,因为 Bob 在 2 天内购买了 5 次。

当我尝试计算这些值时,我的问题就出现了。我已经使用集合解决了它:

from mrjob.job import MRJob

class MR_ex2(MRJob):
    def mapper(self, _, line):
        person, day, spent_money = line.split(';')
        yield person, day

    def reducer(self, key, counts):
        counts = set(counts)
        if len(counts) >= 5:
            yield key, key

if __name__ == '__main__':
    MR_ex2.run()

但是当我尝试添加组合器时它会崩溃,我认为这不是最好的方法。因此,我尝试搜索示例,但找不到很多示例,而没有 MRJob 的示例带有一些变量来保存子问题的状态并迭代子问题的值(此问题中的 Alice 和 Bob)以加入它们,但我不知道如何在MRJob中使用这种方式。

简而言之,我的问题是如何以正确的方式解决这个问题?在reducer中加入所有key的不同值,然后检查它是否为5或更多?

【问题讨论】:

  • 有点固定,知道我可以通过values = [x for x in counts] 提取每个键的所有值。现在我可以继续前进了

标签: python mapreduce mrjob


【解决方案1】:

可能不是最好的答案,但万一有人来到这里并有类似的问题。我部分解决它的方法是从生成器中展开元素,在它创建集合之后,这样我可以避免错误,尽管不确定在 MapReduce 中使用集合是否是一种好技术。

但代码是:

from mrjob.job import MRJob

class MR_ex2(MRJob):

    def mapper(self, _, line):
        persona, dia, dinero_gastado = line.split(';')
        yield persona, dia

    def combiner(self, key, values):
        counts = set([item for sublist in values for item in sublist])
        if len(counts) >= 5:
            yield key, key
        yield key, tuple(values)

    def reducer(self, key, counts):

        counts = set([item for sublist in counts for item in sublist])
        if len(counts) >= 5:
            yield key, key

if __name__ == '__main__':
    MR_ex2.run()

【讨论】: