【发布时间】:2019-10-15 07:18:18
【问题描述】:
我对 KStreams 聚合和窗口有疑问。我想将一条记录聚合到一个具有相同键的记录列表中,只要它位于时间窗口内。 我选择 SessionWindows 是因为我必须在会话中使用移动窗口:假设记录 A 在 10:00:00 到达;然后具有相同键的所有其他记录到达 在 10 秒窗口时间内(直到 10:00:10)将进入同一会话,请记住,如果它在 10:00:03 到达,窗口将移动到 10:00:13(+10 秒)。
这导致我们从给定键收到的最后一条记录开始有一个 +10 秒的移动窗口。
现在的问题是:我想获得最后的聚合结果。我使用 .suppress() 表示我不想要任何中间结果,我只想要窗口关闭时的最后一个结果。这 工作不正常,因为虽然它没有发送任何中间聚合结果,但当时间窗口结束时,我没有得到任何结果。我注意到,为了接收它,我需要发布另一个 消息进入主题,这在我的情况下是不可能的。
阅读 .suppress() 我得出的结论是,它可能不是实现我想要的方式,这就是为什么我的问题是:如何强制关闭窗口并发送最新的聚合计算结果?
@StreamListener(ExtractContractBinding.RECEIVE_PAGE)
@SendTo(ExtractCommunicationBinding.AGGREGATED_PAGES)
public KStream<String, List<Records>> aggregatePages(KStream<?, Record> input) {
input.map(this::getRecord)
.groupBy(keyOfElement)
.windowedBy(SessionWindows.with(Duration.ofSeconds(10L)).grace(Duration.ofSeconds(10L)))
.aggregate(...do stuff...)
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()
.map(this::createAggregatedResult);
}
【问题讨论】:
标签: java apache-kafka spring-cloud apache-kafka-streams spring-cloud-stream