【问题标题】:Throttle in Kafka Streams SessionWindowsKafka Streams SessionWindows 中的节流
【发布时间】:2019-06-20 06:41:21
【问题描述】:

默认情况下,.windowedBy(SessionWindows.with(...)) 将返回每条新的传入记录。那么,如何在返回当前会话窗口的最后一个结果之前等待至少 1 秒?

我正在尝试字数统计示例:

        final KStream<String, String> source = builder.stream("streams-plaintext-input");

        final KStream<String, Long> wordCounts = source

                // Split each text line, by whitespace, into words.
                .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))

                // Group the stream by word to ensure the key of the record is the word.
                .groupBy((key, word) -> word)

                .windowedBy(SessionWindows.with(Duration.ofSeconds(10)))

                // Count the occurrences of each word (message key).
                .count(Materialized.with(Serdes.String(), Serdes.Long()))

                .suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(1), Suppressed.BufferConfig.unbounded()))

                // Convert to KStream<String, Long>
                .toStream((windowedId, count) -> windowedId.key());

        wordCounts.foreach((word, count) -> {
            System.out.println(word + " : " + count);
        });

这是producer的input和client的result,其实是错误的:

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
>hello kafka stream

(nothing)

>hello kafka stream

hello : 1
kafka : 1
stream : 1

>hello kafka stream

hello : null
kafka : 1
stream : 1

我该如何解决这个问题?非常感谢您阅读我的问题:)

【问题讨论】:

  • 您知道如何实现您的用例吗?我也在努力实现它。
  • 顺便说一句,你的问题更像是“debounce”而不是“throttle”:)

标签: java apache-kafka apache-kafka-streams


【解决方案1】:

【讨论】:

  • @matthias-j-sax - 我已经阅读了所有这 3 个文档,但我无法弄清楚 pro 如何实现上述用例。你能告诉我吗?提前致谢
  • “无法弄清楚”不是很具有描述性——您观察到什么行为?你设置了什么参数?
猜你喜欢
  • 1970-01-01
  • 2021-01-25
  • 1970-01-01
  • 1970-01-01
  • 2019-01-12
  • 2018-10-06
  • 1970-01-01
  • 1970-01-01
  • 2019-05-19
相关资源
最近更新 更多