【问题标题】:How to read records from Kafka using Kafka-Stream by interval如何使用 Kafka-Stream 按间隔从 Kafka 读取记录
【发布时间】:2023-03-12 01:40:01
【问题描述】:

我想在 Kafka-Stream 消费者中使用从 Kafka 读取记录,是否可以选择在每个给定间隔读取记录?例如每 1 分钟?

【问题讨论】:

    标签: kafka-consumer-api apache-kafka-streams


    【解决方案1】:

    我不确定我是否正确理解了您的问题。但是,我认为有多种方法(取决于问题中不清楚的实际想要实现的目标)。

    1. 使用 Kafka Streams DSL (Kafka 0.10):使用 Kafka Streams(Java 流处理库),您可以将窗口聚合指定为任意大小的滚动窗口
    2. 利用时间戳 (Kafka 0.10):如果你想使用 KafkaConsumer,你可以读取消息并检查它们的时间戳以按时间间隔分块数据
    3. 基于系统时间(所有 Kafka 版本):只需从 Kafka 读取消息并将消息放入系统时间的间隔基础中。即,在处理下一条记录之前,请检查本地时钟以间隔放置消息。

    【讨论】:

    • 我的意思是,有一个选项可以每 1 分钟从 Kafka 拉取数据(例如:在 10:01 读取 10:00 到 10:01 的所有记录,在 10: 02 读取从 10:01 到 10:02 等的所有记录),而不是在运行时获取新记录?我想读取数据以处理每个给定的时间间隔,而不是在处理之前将数据保存在内存中。
    • 没有内置支持,因为 Kafka 是基于拉取的。您需要使用建议的方法之一将此逻辑放入客户端。如果我理解您的评论正确,您可能希望在开始 poll() 之前结合使用方法 (3) 来获取当前的日志结束偏移量,并且仅将消费者消息发送到获得的偏移量(以避免读取添加的记录开始消费后)
    • @MatthiasJ.Sax,我有类似的要求(连续处理最近 x 小时间隔内发布的记录)。是否可以使用带有跳窗的 Kafka Stream DSL API?
    • 是的,这属于 (1) - 根据您的要求,跳跃、翻滚或滑动窗口(在 v2.7.0 中添加)可能适合您。
    猜你喜欢
    • 2019-03-16
    • 1970-01-01
    • 2022-11-24
    • 2020-10-17
    • 2023-03-26
    • 1970-01-01
    • 1970-01-01
    • 2021-11-27
    • 1970-01-01
    相关资源
    最近更新 更多