【发布时间】:2018-11-09 00:06:33
【问题描述】:
除了之前问过的关于交易的问题,我想问一下关于控制消费的问题:我有一个监听器,它处理生产数据。现在发生了一些不好的事情,出于任何原因,我们希望我们的应用程序启动,但停止处理记录。所以我想选择手动(开始)停止消费者(我知道 ContainerStoppingErrorHandler)。问题解决后,最终重新定位它们并重新启动它们。
我想我找到了一种方法来做到这一点,但我希望有人可以向我确认这一点,因为可能有很多陷阱。整个过程似乎并不容易,我不确定我是否做对了,也许有更好的方法。
首先,为了能够暂停/停止消费者,我必须有权访问 MessageListenerContainer。这意味着,我将在配置中创建:ConcurrentKafkaListenerContainerFactory 并(从 2.2 开始)使用它来创建 ConcurrentMessageListenerContainer 的托管 bean。然后可以使用这个 bean 来启动/停止消费者。作品。一旦它是并发的......我假设我传递给 setupMessageListener 的内容必须是无状态类的实例,以便可以从多个线程/消费者对其进行操作。因此,如果我想进行 spring 依赖注入,就像我之前在带有 @KafkaListener 注释方法的 bean 上所做的那样,我可以在这里传递无状态单例 bean 的实例。
现在关于重新定位:这似乎很容易。只需在通过 setupMessageListener 注册的 messageListener 类中实现 ConsumerSeekAware 并存储回调。然后你可以自动装配 ConsumerSeekAware messageListener 单例,然后进行搜索。相关的 MessageListenerContainer,无论分区/并发设置如何,都应该进行查找。因为当 ConcurrentMessageListenerContainer 以大于 1 的并发开始时,它会启动多个 KafkaMessageListenerContainer a)在所有分区上(?),但由于所有分区共享相同的 groupId,因此只有一个消费者将消费消息,或者 b)每个 KafkaMessageListenerContainer 将有一些分区子集.但是在 seek 中,我们必须指定 topic+partition+offset,所以在任何一种情况下,seek 都应该由适当的 KafkaMessageListenerContainer 拾取。
对吗?
我知道这是令人生畏的文字/问题,但也许这对其他人也有用。
【问题讨论】:
标签: spring-kafka