【发布时间】:2016-12-20 12:38:38
【问题描述】:
我想做的是:
- 使用数字主题 (Long's) 中的记录
- 聚合(计数)每个 5 秒窗口的值
- 将最终聚合结果发送到另一个主题
我的代码如下所示:
KStream<String, Long> longs = builder.stream(
Serdes.String(), Serdes.Long(), "longs");
// In one ktable, count by key, on a five second tumbling window.
KTable<Windowed<String>, Long> longCounts =
longs.countByKey(TimeWindows.of("longCounts", 5000L));
// Finally, sink to the long-avgs topic.
longCounts.toStream((wk, v) -> wk.key())
.to("long-counts");
看起来一切都按预期工作,但聚合被发送到每个传入记录的目标主题。我的问题是如何只发送每个窗口的最终聚合结果?
【问题讨论】:
标签: apache-kafka apache-kafka-streams