【问题标题】:Create consumer dynamically spring kafka动态创建消费者spring kafka
【发布时间】:2021-04-12 21:24:09
【问题描述】:

我正在创建一个与另一个服务对话的服务,以识别要收听的 kafka 主题。 kafka 主题可能有不同的键和值类型。因此,我想为每个配置(主题、键类型、值类型)动态创建不同的 kafka 消费者,其中配置仅在运行时才知道。但是在 spring kafka 中,我看不到动态传递所有这些参数的方法(至少我不知道有任何方法)。我该怎么做呢。

【问题讨论】:

  • 如何让一个消费者使用ByteArrayDeserializer,然后在消费方法中动态调用不同的反序列化方法?
  • 在 Kafka 中,消费者消费来自一个主题的所有消息,并且您的消费者是消费者组的一部分。你不可能说我想根据消息属性来消费消息。因此,如果您认为您的消息/事件是不同的类型,那么您应该考虑每个事件类型的主题。否则,您可以让一个消费者组和多个消息处理器在消息到达时获取运行时。我认为运行时多态性应该是答案。
  • @SanjuThomas 我认为我的问题造成了一些混乱。一个主题只有一种类型的数据,但不同的主题会有不同类型的数据。我的问题在于使用新确定的主题及其键类型值类型信息来启动新消费者。
  • @OneCricketeer 那么如何动态配置主题呢?
  • 您必须为每个消费者实例启动一个新线程。我的评论更多的是不需要为每种类型的数据更改配置

标签: apache-kafka kafka-consumer-api spring-kafka


【解决方案1】:

只需在运行时创建一个新的监听器容器。

https://docs.spring.io/spring-kafka/docs/current/reference/html/#message-listener-container

如果你使用的是Spring Boot,你可以使用它自动配置的ConcurrentKafkaListenerContainerFactory;如果没有,只需手动创建和初始化容器。

【讨论】:

猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-04-02
  • 1970-01-01
  • 2019-04-07
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多