【发布时间】:2020-09-24 22:30:06
【问题描述】:
我们需要按消息中的字段之一对 PubSub 消息进行分组。我们使用 15 分钟的固定窗口来对这些消息进行分组。
在数据流上运行时,用于消息分组的 GroupByKey 引入了太多重复元素,管道远端的另一个 GroupByKey 失败并出现“KeyCommitTooLargeException:阶段 P27 的提交请求和密钥 abc#123 的大小为 225337153,即超过了..的限制。'
我浏览了下面的链接,发现建议是使用 Reshuffle,但 Reshuffle 内部有 GroupByKey。 Why is GroupByKey in beam pipeline duplicating elements (when run on Google Dataflow)?
我的管道代码:
PCollection<String> messages = getReadPubSubSubscription(options, pipeline);
PCollection<String> windowedMessages = messages
.apply(
Window
.<String>into(
FixedWindows.of(Duration.standardMinutes(15)))
.discardingFiredPanes());
PCollectionTuple objectsTuple = windowedMessages
.apply(
"UnmarshalStrings",
ParDo
.of(new StringUnmarshallFn())
.withOutputTags(
StringUnmarshallFn.mainOutputTag,
TupleTagList.of(StringUnmarshallFn.deadLetterTag)));
PCollection<KV<String, Iterable<ABCObject>>> groupedObjects =
objectsTuple.get(StringUnmarshallFn.mainOutputTag)
.apply(
"GroupByObjects",
GroupByKey.<String, ABCObject>create());
PCollection results = groupedObjects
.apply(
"FetchForEachKey",
ParDo.of(SomeFn())).get(SomeFn.tag)
.apply(
"Reshuffle",
Reshuffle.viaRandomKey());
results.apply(...)
...
PubSub 肯定不会复制消息,也没有其他故障,GroupByKey 正在创建这些重复项,我使用的 Windowing 有问题吗?
一个观察结果是 GroupBy 产生的元素数量与下一步产生的元素数量相同。我附上了两张截图,一张用于 GroupByKey,另一张用于 Fetch Function。
进一步分析后更新
Stage P27 实际上是第一个 GroupByKey,它输出的元素比预期的要多。我不能将它们视为实际输出元素的副本,因为下一个 Fetch 步骤不会处理所有这些百万元素。我不确定这些是数据流引入的一些虚拟元素还是数据流中的错误度量。
我仍在进一步分析为什么会抛出这个 KeyCommitTooLargeException,因为我只有一个输入元素并且分组应该只产生一个可迭代的元素。我也用 Google 开了一张票。
【问题讨论】:
-
您设置了
discardingFiredPanes,但您没有更改触发器或允许延迟,所以这没有效果。 -
从代码中,
objectsTuple不用于获取以下转换,这很奇怪,因为图表显示并非如此。另外,您提到了两个 GroupByKey。据我了解,第一个是groupedObjects,一个返回重复项,另一个有错误在Reshuffle 内。你不觉得priceChangesTuple.get(StringUnmarshallFn.mainOutputTag)可能在做一些重复数据的事情吗? -
@rsantiago,感谢您指出变量名。我不得不掩盖一些细节来隐藏域名,我错过了重命名它。我现在已经更新了变量名。当我继续研究时,我发现 P27 阶段实际上是第一组。发现这一点有点棘手。我正在添加其他发现作为问题的更新
标签: java google-cloud-dataflow apache-beam apache-beam-io