【发布时间】:2019-06-20 06:41:21
【问题描述】:
默认情况下,.windowedBy(SessionWindows.with(...)) 将返回每条新的传入记录。那么,如何在返回当前会话窗口的最后一个结果之前等待至少 1 秒?
我正在尝试字数统计示例:
final KStream<String, String> source = builder.stream("streams-plaintext-input");
final KStream<String, Long> wordCounts = source
// Split each text line, by whitespace, into words.
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
// Group the stream by word to ensure the key of the record is the word.
.groupBy((key, word) -> word)
.windowedBy(SessionWindows.with(Duration.ofSeconds(10)))
// Count the occurrences of each word (message key).
.count(Materialized.with(Serdes.String(), Serdes.Long()))
.suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(1), Suppressed.BufferConfig.unbounded()))
// Convert to KStream<String, Long>
.toStream((windowedId, count) -> windowedId.key());
wordCounts.foreach((word, count) -> {
System.out.println(word + " : " + count);
});
这是producer的input和client的result,其实是错误的:
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
>hello kafka stream
(nothing)
>hello kafka stream
hello : 1
kafka : 1
stream : 1
>hello kafka stream
hello : null
kafka : 1
stream : 1
我该如何解决这个问题?非常感谢您阅读我的问题:)
【问题讨论】:
-
您知道如何实现您的用例吗?我也在努力实现它。
-
顺便说一句,你的问题更像是“debounce”而不是“throttle”:)
标签: java apache-kafka apache-kafka-streams