【问题标题】:Apache Beam GroupByKey outputting duplicate elements with PubSubIOApache Beam GroupByKey 使用 PubSubIO 输出重复元素
【发布时间】:2020-09-24 22:30:06
【问题描述】:

我们需要按消息中的字段之一对 PubSub 消息进行分组。我们使用 15 分钟的固定窗口来对这些消息进行分组。

在数据流上运行时,用于消息分组的 GroupByKey 引入了太多重复元素,管道远端的另一个 GroupByKey 失败并出现“KeyCommitTooLargeException:阶段 P27 的提交请求和密钥 abc#123 的大小为 225337153,即超过了..的限制。'

我浏览了下面的链接,发现建议是使用 Reshuffle,但 Reshuffle 内部有 GroupByKey。 Why is GroupByKey in beam pipeline duplicating elements (when run on Google Dataflow)?

我的管道代码:

PCollection<String> messages = getReadPubSubSubscription(options, pipeline);

PCollection<String> windowedMessages = messages
    .apply(
        Window
            .<String>into(
                FixedWindows.of(Duration.standardMinutes(15)))
            .discardingFiredPanes());
            
PCollectionTuple objectsTuple = windowedMessages
    .apply(
        "UnmarshalStrings",
        ParDo
            .of(new StringUnmarshallFn())
            .withOutputTags(
                StringUnmarshallFn.mainOutputTag,
                TupleTagList.of(StringUnmarshallFn.deadLetterTag)));

PCollection<KV<String, Iterable<ABCObject>>> groupedObjects =
    objectsTuple.get(StringUnmarshallFn.mainOutputTag)
        .apply(
            "GroupByObjects",
            GroupByKey.<String, ABCObject>create());

PCollection results = groupedObjects
    .apply(
        "FetchForEachKey",
        ParDo.of(SomeFn())).get(SomeFn.tag)
    .apply(
        "Reshuffle",
        Reshuffle.viaRandomKey());
                
results.apply(...)

...

PubSub 肯定不会复制消息,也没有其他故障,GroupByKey 正在创建这些重复项,我使用的 Windowing 有问题吗?

一个观察结果是 GroupBy 产生的元素数量与下一步产生的元素数量相同。我附上了两张截图,一张用于 GroupByKey,另一张用于 Fetch Function。

GroupByKey 步骤 获取步骤

进一步分析后更新

Stage P27 实际上是第一个 GroupByKey,它输出的元素比预期的要多。我不能将它们视为实际输出元素的副本,因为下一个 Fetch 步骤不会处理所有这些百万元素。我不确定这些是数据流引入的一些虚拟元素还是数据流中的错误度量。

我仍在进一步分析为什么会抛出这个 KeyCommitTooLargeException,因为我只有一个输入元素并且分组应该只产生一个可迭代的元素。我也用 Google 开了一张票。

【问题讨论】:

  • 您设置了discardingFiredPanes,但您没有更改触发器或允许延迟,所以这没有效果。
  • 从代码中,objectsTuple 不用于获取以下转换,这很奇怪,因为图表显示并非如此。另外,您提到了两个 GroupByKey。据我了解,第一个是groupedObjects,一个返回重复项,另一个有错误在Reshuffle 内。你不觉得priceChangesTuple.get(StringUnmarshallFn.mainOutputTag) 可能在做一些重复数据的事情吗?
  • @rsantiago,感谢您指出变量名。我不得不掩盖一些细节来隐藏域名,我错过了重命名它。我现在已经更新了变量名。当我继续研究时,我发现 P27 阶段实际上是第一组。发现这一点有点棘手。我正在添加其他发现作为问题的更新

标签: java google-cloud-dataflow apache-beam apache-beam-io


【解决方案1】:

GroupByKey 按按键和窗口分组。在没有触发器的情况下,每个键和窗口只输出一个元素,每个输入元素也最多输出一个元素。

如果您发现任何其他行为,则可能是错误,您可以报告。您可能需要提供更多步骤来重现问题,包括示例数据和整个可运行管道。

【讨论】:

    【解决方案2】:

    由于在 UPDATE 中您澄清没有重复,而是以某种方式添加了虚拟记录(这真的很奇怪),this old thread 报告了类似的问题,答案很有趣,因为指出了由以下原因引起的 protobuf 序列化问题将大量数据分组到一个窗口中。

    我建议使用可用的故障排除步骤(例如 12)来确定问题是从代码的哪一部分开始的。例如,我仍然认为new StringUnmarshallFn() 可能正在执行有助于生成虚拟记录的任务。您可能希望在步骤中实现counters,以尝试确定每个步骤生成多少记录。

    如果您没有找到解决方案,最好的选择是联系GCP Support,也许他们可以解决。

    【讨论】:

      猜你喜欢
      • 2021-05-10
      • 1970-01-01
      • 2019-02-12
      • 2018-11-06
      • 2018-05-30
      • 2018-01-05
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多