【发布时间】:2019-03-27 01:34:13
【问题描述】:
默认情况下,.windowedBy(SessionWindows.with(Duration.ofSeconds(60)) 为每个传入记录返回一条记录。
结合.count() 和.filter() 可以轻松检索第一条记录。
使用
.suppress(Suppressed.untilWindowCloses(unbounded())) 检索最后一条记录也很容易。
所以……我做了两次处理,你可以看到改编后的字数示例:
final KStream<String, String> streamsBranches = builder.<String,String>stream("streams-plaintext-input");
streamsBranches
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
.groupBy((key, value) -> ""+value)
.windowedBy(SessionWindows.with(Duration.ofSeconds(60)).grace(Duration.ofSeconds(2)))
.count(Materialized.with(Serdes.String(), Serdes.Long()))
.toStream()
.map((wk, v) -> new KeyValue<>(wk.key(), v == null ? -1l : v))
.filter((wk, v) -> v == 1)
.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
streamsBranches
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
.groupBy((key, value) -> ""+value)
.windowedBy(SessionWindows.with(Duration.ofSeconds(60)).grace(Duration.ofSeconds(2)))
.count(Materialized.with(Serdes.String(), Serdes.Long()))
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()
.map((wk, v) -> new KeyValue<>(wk.key(), v))
.filter((wk, v) -> v != null)
.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
但我想知道是否有更简单、更漂亮的方式来做同样的事情。
【问题讨论】:
-
您想获得什么样的第一条和最后一条记录?您使用基于不同 keys 的分组,所以我认为它不会像您预期的那样工作。
-
不同的key用来说明我想要什么。我编辑了帖子以删除它们以使其更清晰。我只想从会话窗口中获取第一条和最后一条记录。
-
我认为你的代码仍然没有达到你想要的效果。
标签: java apache-kafka apache-kafka-streams