【问题标题】:controlling kafka listeners consumption控制 kafka 听众的消费
【发布时间】:2018-11-09 00:06:33
【问题描述】:

除了之前问过的关于交易的问题,我想问一下关于控制消费的问题:我有一个监听器,它处理生产数据。现在发生了一些不好的事情,出于任何原因,我们希望我们的应用程序启动,但停止处理记录。所以我想选择手动(开始)停止消费者(我知道 ContainerStoppingErrorHandler)。问题解决后,最终重新定位它们并重新启动它们。

我想我找到了一种方法来做到这一点,但我希望有人可以向我确认这一点,因为可能有很多陷阱。整个过程似乎并不容易,我不确定我是否做对了,也许有更好的方法。

首先,为了能够暂停/停止消费者,我必须有权访问 MessageListenerContainer。这意味着,我将在配置中创建:ConcurrentKafkaListenerContainerFactory 并(从 2.2 开始)使用它来创建 ConcurrentMessageListenerContainer 的托管 bean。然后可以使用这个 bean 来启动/停止消费者。作品。一旦它是并发的......我假设我传递给 setupMessageListener 的内容必须是无状态类的实例,以便可以从多个线程/消费者对其进行操作。因此,如果我想进行 spring 依赖注入,就像我之前在带有 @KafkaListener 注释方法的 bean 上所做的那样,我可以在这里传递无状态单例 bean 的实例。

现在关于重新定位:这似乎很容易。只需在通过 setupMessageListener 注册的 messageListener 类中实现 ConsumerSeekAware 并存储回调。然后你可以自动装配 ConsumerSeekAware messageListener 单例,然后进行搜索。相关的 MessageListenerContainer,无论分区/并发设置如何,都应该进行查找。因为当 ConcurrentMessageListenerContainer 以大于 1 的并发开始时,它会启动多个 KafkaMessageListenerContainer a)在所有分区上(?),但由于所有分区共享相同的 groupId,因此只有一个消费者将消费消息,或者 b)每个 KafkaMessageListenerContainer 将有一些分区子集.但是在 seek 中,我们必须指定 topic+partition+offset,所以在任何一种情况下,seek 都应该由适当的 KafkaMessageListenerContainer 拾取。

对吗?

我知道这是令人生畏的文字/问题,但也许这对其他人也有用。

【问题讨论】:

    标签: spring-kafka


    【解决方案1】:

    您不需要创建自己的容器

    @KafkaListener(id = "foo", topics = " ... "
    

    然后,获取对KafkaListenerEndpointRegistry 的引用(自动接线等)。

    然后

    registry.getListenerContainer("foo").stop();
    

    registery.stop();
    

    停止所有@KafkaListeners。

    当并发>1时;你至少需要这么多分区,否则一些消费者将处于空闲状态。

    分区由Kafka分布在消费者之间;回调将被每个人调用,并带有分配给它的分区列表。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-07-01
      • 1970-01-01
      • 2020-11-14
      • 2018-08-30
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多