【问题标题】:Is it possible consumer Kafka messages after arrival?到达后是否有可能消费者 Kafka 消息?
【发布时间】:2019-01-15 15:30:10
【问题描述】:

我想在 kafka 主题到达后使用它们。我希望使用事件的时间在消息的有效负载中。是否有可能在 Kafka 中实现类似的目标?它的缺点是什么?

实际示例:消息 M 在 12:10 产生,在 12:11 到达我的 kafka 主题,我希望消费者在 12:41(到达后 30 分钟)轮询它

【问题讨论】:

    标签: apache-kafka kafka-consumer-api


    【解决方案1】:

    Kafka 的所有主题的默认保留期为 7 天。因此,您可以随时消耗最多一周的数据,如果您经常这样做,缺点是网络饱和。

    如果你想消费不是最新偏移量的数据,那么对于任何新的消费者组,你应该设置auto.offset.reset=earliest。否则,对于现有组,您需要使用 kafka-consumer-groups --reset 命令才能重新使用已使用的记录。

    有时您可能希望从主题的开头开始,例如,如果您有一个压缩主题,为了重建主题中数据的“增量” - 查找“流/表对偶性”

    我希望使用事件的时间在消息的有效负载中

    顺便说一下,由于KIP-32 每条消息在有效负载之外都有一个时间戳

    我希望消费者对其进行投票...(到达后 30 分钟)

    当然,您可以在任何时候启动消费者,只要数据在保留窗口内,您就会收到该事件。

    除了在那个时间(例如 30 分钟后)真正让您的消费者发生之外,没有其他方法可以精细地控制何时。你可以玩max.poll.recordsmax.poll.interval.ms,但我发现任何大于几秒的东西真的不是Kafka 的用例。

    例如,您可能宁愿在消费者线程周围使用TimerTask,或者使用读取最大记录量的 Oozie/Airflow 任务安排 Spark 或 MapReduce。

    【讨论】:

    • 我为什么要使用kafka-consumer-groups --reset?我不确定有效负载之外的时间戳是否适用于我的用例。我用一个实际的例子编辑了这个问题
    • 如果您已经有一个活跃的消费者组,并且您已经消费了消息,那么您需要将组重置为后退
    • 并且在 Producer API 中,您可以使 new ProducerRecord 带有或不带有时间戳。如果你不设置它,那么它将使用当前系统时间,这可能大约是你的有效负载时间
    • 我也不想稍后再启动消费者;消费者应该继续收听新消息,但我应该找到一种方法来检查消息的内容,以确定该消息何时被完全消费
    • 那肯定有可能,所以我不明白这个问题
    猜你喜欢
    • 2020-10-16
    • 1970-01-01
    • 2020-04-13
    • 2018-09-12
    • 1970-01-01
    • 2017-09-23
    • 1970-01-01
    • 2018-06-04
    • 2017-12-21
    相关资源
    最近更新 更多