【发布时间】:2021-05-10 00:19:30
【问题描述】:
我遇到了一个使用 beam.GroupByKey() 的情况,我已经加载了一个行数为 42.854 的文件。
由于业务规则,我需要执行 GroupByKey();然而,在完成它的执行后,我注意到我几乎得到了两倍的行。如下所示:
GroupByKey() 之前的步骤:
为什么我会有这种行为?
我没有在我的管道中做任何特别的事情:
with beam.Pipeline(runner, options=opts) as p:
#LOAD FILE
elements = p | 'Load File' >> beam.Create(fileRNS.values)
#PREPARE VALUES (BULK INSERT)
Script_Values = elements | 'Prepare Bulk Insert' >> beam.ParDo(Prepare_Bulk_Insert())
Grouped_Values = Script_Values | 'Grouping values' >> beam.GroupByKey()
#BULK INSERT INTO POSTGRESQL
Grouped_Values | 'Insert PostgreSQL' >> beam.ParDo(ExecuteInsert)
2021-02-09
当我调试时,Prepare_Bulk_Insert() 有以下内容:
如您所见,元素的数量是正确的,我不明白为什么如果我发送正确的数量,GroupByKey() 的输入会包含更多的元素。
Grouped_Values | 'Insert PostgreSQL' >> beam.ParDo(funcaoMap) 的输入如下:
双倍金额。 =(
亲切的问候, 朱利亚诺·梅代罗斯
【问题讨论】:
标签: python google-cloud-platform google-cloud-dataflow apache-beam