【发布时间】:2019-03-13 06:49:51
【问题描述】:
我正在处理一个 Apache Beam 项目,该项目遇到了与自定义时间戳属性相关的 Dataflow 服务和 PubsubIO 问题。 Beam SDK 的当前版本是 2.7.0。
在项目中,我们有 2 个 Dataflow 作业通过 PubSub 主题和订阅进行通信:
第一个管道(将数据下沉到 PubSub)
此管道适用于基于消息的消息,因此除了GlobalWindows(Beam 默认)之外,它没有应用自定义窗口策略。在这个管道的末端,我们将所有已经分配了一个属性映射(包括它们的事件时间戳,例如“published_at”)的消息下沉(写入)到一个使用 PubsubIO.writeMessages() 的 PubSub 主题。
注意:如果我们使用PubsubIO.writeMessages().withTimestampAttribute(),这个方法会告诉PubsubIO.ShardFn、PubsubIO.WriteFn和PubsubClient去write/overwrite下沉管道的处理时间 到地图中的此属性。
第二个管道(从 PubSub 读取数据)
在第二个流水线(读取流水线)中,我们尝试了PubsubIO.readMessagesWithAttributes().withTimestampAttribute("published_at")和PubsubIO.readStrings().withTimestampAttribute("published_at")作为源。
- 使用 DirectRunner 运行时,一切正常。消息
从 PubSub 订阅中读取并输出到
ProcessContext.timestamp()的下游阶段等于它们的 事件时间戳"published_at"。 - 但是当使用 DataflowRunner 运行时,
ProcessContext.timestamp()始终设置为接近实时,接近下沉 管道的处理时间。我们检查并可以确认那些 时间戳不是来自 PubSub 的发布时间。所有的数据都是 然后分配到错误的窗口与其事件域相比 时间戳。我们预计迟交的数据不会被分配 进入无效的窗口。
注意:在我们打开第二个管道以获取某种历史/后期数据之前,我们已经让 Pubsub 主题填充了大量数据。
Pubsub messages with invalid context timestamp
假设的根本原因
深入研究DataflowRunner的源代码,我们可以看到Dataflow Service使用完全不同的Pubsub代码(在管道构建时覆盖PubsubIO.Read)来读取和接收到Pubsub .
所以如果我们想使用 Beam SDK 的 PubsubIO,我们必须使用实验选项"enable_custom_pubsub_source"。但到目前为止还没有运气,因为我们遇到了这个问题https://jira.apache.org/jira/browse/BEAM-5674 并且无法测试 Beam SDK 的 Pubsub 代码。
解决办法
我们当前的解决方法是,在将窗口分配给消息的步骤之后,我们实现了 DoFn 来检查他们的事件时间戳与他们的@987654337 @。 如果窗口无效,那么我们只需删除消息,然后运行每周或半周的作业以从历史来源纠正它们。最好有一些缺失的数据,而不是计算不正确的数据。
Messages dropped due to invalid windows
请与我们分享此案的经验。我们知道,从 Dataflow 水印管理的角度来看,如果摄取的数据是稀疏的(超时不够密集),水印会自我调整为当前实时。
我们还认为,我们对 Dataflow 服务维护 PubsubUnboundedSource 的输出时间戳的方式存在误解,因为我们对 Apache Beam 和 Google 的 Dataflow 还很陌生,所以有些事情我们还不知道。
非常感谢!
【问题讨论】:
标签: google-cloud-dataflow apache-beam google-cloud-pubsub