【发布时间】:2019-01-15 15:30:10
【问题描述】:
我想在 kafka 主题到达后使用它们。我希望使用事件的时间在消息的有效负载中。是否有可能在 Kafka 中实现类似的目标?它的缺点是什么?
实际示例:消息 M 在 12:10 产生,在 12:11 到达我的 kafka 主题,我希望消费者在 12:41(到达后 30 分钟)轮询它
【问题讨论】:
标签: apache-kafka kafka-consumer-api
我想在 kafka 主题到达后使用它们。我希望使用事件的时间在消息的有效负载中。是否有可能在 Kafka 中实现类似的目标?它的缺点是什么?
实际示例:消息 M 在 12:10 产生,在 12:11 到达我的 kafka 主题,我希望消费者在 12:41(到达后 30 分钟)轮询它
【问题讨论】:
标签: apache-kafka kafka-consumer-api
Kafka 的所有主题的默认保留期为 7 天。因此,您可以随时消耗最多一周的数据,如果您经常这样做,缺点是网络饱和。
如果你想消费不是最新偏移量的数据,那么对于任何新的消费者组,你应该设置auto.offset.reset=earliest。否则,对于现有组,您需要使用 kafka-consumer-groups --reset 命令才能重新使用已使用的记录。
有时您可能希望从主题的开头开始,例如,如果您有一个压缩主题,为了重建主题中数据的“增量” - 查找“流/表对偶性”
我希望使用事件的时间在消息的有效负载中
顺便说一下,由于KIP-32 每条消息在有效负载之外都有一个时间戳
我希望消费者对其进行投票...(到达后 30 分钟)
当然,您可以在任何时候启动消费者,只要数据在保留窗口内,您就会收到该事件。
除了在那个时间(例如 30 分钟后)真正让您的消费者发生之外,没有其他方法可以精细地控制何时。你可以玩max.poll.records 和max.poll.interval.ms,但我发现任何大于几秒的东西真的不是Kafka 的用例。
例如,您可能宁愿在消费者线程周围使用TimerTask,或者使用读取最大记录量的 Oozie/Airflow 任务安排 Spark 或 MapReduce。
【讨论】:
kafka-consumer-groups --reset?我不确定有效负载之外的时间戳是否适用于我的用例。我用一个实际的例子编辑了这个问题
new ProducerRecord 带有或不带有时间戳。如果你不设置它,那么它将使用当前系统时间,这可能大约是你的有效负载时间