【发布时间】:2017-12-01 08:24:03
【问题描述】:
在我们的项目中,我们使用 spring-cloud-stream-binder-kafka 1.1.2 版与 kafka 集成。最近我们遇到了一种情况,我们的一项服务在启动后消耗了来自主题的旧消息(已经消耗)。该主题有 2 个分区和 2 个消费者分组在一个消费者组下。我们不确定偏移量是否正确提交给zookeeper。在启动过程中,每条消息都会抛出以下错误消息。
[-kafka-listener-2] ERROR o.s.k.listener.LoggingErrorHandler.handle - Error while processing: ConsumerRecord(topic = statemachine_deal_notification, partition = 1, offset = 926, key = null, value = [B@6fab0a32)
为了确保不再发生这种情况,我们一直希望只阅读主题中的最新消息。我发现将resetOffsets 设置为true 和startOffset 设置为latest 就可以了。但这些属性不会影响消费者。后来发现这个功能已经下架了。
有没有其他方法可以确保特定组中的消费者只消费最新消息???.
【问题讨论】:
标签: spring-integration spring-cloud-stream spring-kafka