【问题标题】:Process item in a window with Kafka streams在带有 Kafka 流的窗口中处理项目
【发布时间】:2020-03-16 00:20:52
【问题描述】:

我正在尝试使用 kafka 流在滑动窗口中处理一些事件,但我认为我不了解 kafka 流的一些细节,所以我无法做我想做的事。

我有什么:

  • 输入带有键/值的事件主题,例如 (Int, Person)

我想要什么:

  • 在 10 分钟的滑动窗口内阅读这些事件
  • 处理滑动窗口中的每个元素
  • 过滤和计数一些元素,向另一个 kafka 触发一些事件 主题(比如检测到错误值)

简单来说:在 10 分钟的滑动窗口中获取所有事件,对它们进行 foreach,在窗口的上下文中计算一些统计信息/事件,转到下一个窗口...

我尝试了什么: 我尝试混合 Stream 和处理器 API,例如:

    val streamBuilder = new StreamsBuilder()
    streamBuilder.stream[Int, Person](topic)
      .groupBy((_, value) => PersonWrapper(value.id, value.name))
      .windowedBy(TimeWindows.of(10 * 60 * 1000L).advanceBy(1 * 60 * 1000L))
// now I have a window of (PersonWrapper, Person) right ?
    streamBuilder.build().addProcessor(....)

现在我要在这个拓扑中添加一个处理器来处理滑动窗口的每个事件。 我不明白什么是 TimeWindowStream 以及为什么我们应该有一个 KGroupedStream 来在事件上应用一个窗口。如果有人可以启发我关于 Kafka 流以及我正在尝试做的事情。

【问题讨论】:

    标签: scala apache-kafka apache-kafka-streams


    【解决方案1】:

    您是否阅读过文档:https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#windowing

    1. 加窗是一种特殊的分组形式(基于时间的分组)
    2. 在 Kafka Streams 中计算聚合总是需要分组
    3. 拥有分组和窗口化流后,您调用aggregate() 进行实际处理(无需手动附加Processor;对aggregate() 的调用将为您隐式添加Processor)。李>

    顺便说一句:Kafka Streams 并不真正支持聚合的“滑动窗口”。您定义的窗口称为跳跃窗口

    KGroupedStreamTimeWindowedKStreams 基本上只是帮助类和允许流畅 API 设计的中间表示。

    教程也是入门的好方法:https://docs.confluent.io/current/streams/quickstart.html

    您还应该查看示例:https://github.com/confluentinc/kafka-streams-examples

    【讨论】:

    • 你好马蒂亚斯,谢谢你,我找到了答案。我需要的是一个计算元素数量的聚合。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-03-23
    • 2018-08-21
    • 1970-01-01
    • 2017-01-08
    相关资源
    最近更新 更多