我们一直在使用测试工具为我们的应用程序流运行基准测试,但随后又恢复为对当前开箱即用的 Google 提供的 PubSub 到 PubSub 模板流进行基准测试(请参阅:https://cloud.google.com/dataflow/docs/templates/overview,尽管此处未列出 -您可以从控制台创建它)。
我们的测试工具生成并发送了数百万条带有时间戳的数百字节的 JSON 格式消息,并比较了两端的延迟。
很简单:
测试发布者 -> PubSub -> 数据流 -> PubSub -> 测试订阅者。
对于单实例发布者和订阅者,我们改变了消息速率并试验了窗口和触发策略,看看我们是否可以改善平均延迟,但通常无法改善超过 1.7 秒每秒 1,500 - 2000 条消息的端到端(我们的典型工作负载)。
然后,我们从等式中删除了 Dataflow,只是将发布者直接连接到订阅者,发现相同消息速率的延迟通常约为 20-30 毫秒。
恢复使用标准 PubSub 到 PubSub 数据流模板,我们看到端到端延迟与我们的应用程序数据流相似,约为 1.5 - 1.7 秒。
我们在管道中的不同点对时间戳进行采样,并将值写入多个自定义指标,并发现将消息添加到 PubSubIO.Read 的初始 PCollection 的平均延迟约为 380 毫秒,但最小值低至 25msec,由于启动开销,我们忽略了较高的值。但似乎有一个我们无法影响的开销。
我们尝试的窗口策略如下所示:
Pipeline p = Pipeline.create(options);
/*
* Attempt to read from PubSub Topic
*/
PCollectionTuple feedInputResults =
p.apply(feedName + ":read", PubsubIO.readStrings().fromTopic(inboundTopic))
.apply(Window.<String>configure()
.triggering(Repeatedly
.forever(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.millis(windowDelay)))
// Fire on any late data
.withLateFirings(AfterPane.elementCountAtLeast(windowMinElementCount))))
.discardingFiredPanes())
.apply(feedName + ":parse", ParDo.of(new ParseFeedInputFn())
.withOutputTags(validBetRecordTag,
// Specify the output with tag startsWithBTag, as a TupleTagList.
TupleTagList.of(invalidBetRecordTag)));