【问题标题】:Sorting apache beam wordcount_minimal output对 apache 束 wordcount_minimal 输出进行排序
【发布时间】:2018-09-19 03:36:35
【问题描述】:

我正在处理beams word count examples(在 python 中)。我能够在 DataflowRunner 上运行示例并接收输出。

当前的输出文件如下所示:

itself: 16
grey: 1
senses: 4
repair: 1
me: 228

是否有办法对 PCollection 进行排序,以便我的输出文件根据词频按降序排序?

在没有办法做到这一点的情况下,找到最常出现的单词的标准工作流程是什么?在将数据减少到字数之后,这是否会由一个单独的过程来处理?

【问题讨论】:

    标签: python apache-beam


    【解决方案1】:

    在 Beam 中,PCollection 的元素是无序的。我会将结果存储在数据库中并在那里执行排序。

    不确定您的用例是否确实需要在 Beam 中进行排序,但一种解决方法是将所有行分组到一个虚构的键上,使用 GroupByKey,并对分组数据执行排序,如下所示:

    word_count_list = [
        ('itself', 16),
        ('grey', 1),
        ('senses', 4),
        ('repair', 1),
        ('me', 228),
    ]
    
    def addKey(row):
        return (1, row)
    
    def sortGroupedData(row):
        (keyNumber, sortData) = row
        sortData.sort(key=lambda x: x[1], reverse=True)
        return sortData[0:3]
    
    word_count = (p 
                | 'CreateWordCountColl' >> beam.Create(word_count_list)
                | 'AddKey' >> beam.Map(addKey)
                | 'GroupByKey' >> beam.GroupByKey()
                | 'SortGroupedData' >> beam.Map(sortGroupedData)
                | 'Write' >> WriteToText('./sorting_results')
                )
    

    这将返回单行列表中的前 3 个。

    [('me', 228), ('itself', 16), ('senses', 4)]
    

    但是,考虑到您将放弃数据集的并行处理。

    【讨论】:

    • 感谢您的回复 :) 您提出的关于放弃并行处理的观点是有效的,值得思考。如果我要再次解决这个问题,我会将输出写入 BigQuery 表,然后可以在该表中查询数据。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-07-14
    • 1970-01-01
    • 1970-01-01
    • 2017-09-10
    • 2020-03-23
    相关资源
    最近更新 更多