在 Beam 中,PCollection 的元素是无序的。我会将结果存储在数据库中并在那里执行排序。
不确定您的用例是否确实需要在 Beam 中进行排序,但一种解决方法是将所有行分组到一个虚构的键上,使用 GroupByKey,并对分组数据执行排序,如下所示:
word_count_list = [
('itself', 16),
('grey', 1),
('senses', 4),
('repair', 1),
('me', 228),
]
def addKey(row):
return (1, row)
def sortGroupedData(row):
(keyNumber, sortData) = row
sortData.sort(key=lambda x: x[1], reverse=True)
return sortData[0:3]
word_count = (p
| 'CreateWordCountColl' >> beam.Create(word_count_list)
| 'AddKey' >> beam.Map(addKey)
| 'GroupByKey' >> beam.GroupByKey()
| 'SortGroupedData' >> beam.Map(sortGroupedData)
| 'Write' >> WriteToText('./sorting_results')
)
这将返回单行列表中的前 3 个。
[('me', 228), ('itself', 16), ('senses', 4)]
但是,考虑到您将放弃数据集的并行处理。