【问题标题】:Kafka consumer picking up topics dynamicallyKafka 消费者动态获取主题
【发布时间】:2019-06-25 06:33:04
【问题描述】:

我在 Spring Boot 中配置了一个 Kafka 消费者。这是配置类:

@EnableKafka
@Configuration
@PropertySource({"classpath:kafka.properties"})
public class KafkaConsumerConfig {

    @Autowired
    private Environment env;

    @Bean
    public ConsumerFactory<String, GenericData.Record> consumerFactory() {

        dataRiverProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("bootstrap.servers"));
        dataRiverProps.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("group.id"));
        dataRiverProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, env.getProperty("enable.auto.commit"));
        dataRiverProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, env.getProperty("auto.commit.interval.ms"));
        dataRiverProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, env.getProperty("session.timeout.ms"));
        dataRiverProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, env.getProperty("auto.offset.reset"));

        dataRiverProps.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, env.getProperty("schema.registry.url"));
        dataRiverProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
        dataRiverProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());

        return new DefaultKafkaConsumerFactory<>(dataRiverProps);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, GenericData.Record> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, GenericData.Record> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

这是消费者:

@Component
public class KafkaConsumer {

    @Autowired
    private MessageProcessor messageProcessor;

    @KafkaListener(topics = "#{'${kafka.topics}'.split(',')}", containerFactory = "kafkaListenerContainerFactory")
    public void consumeAvro(GenericData.Record message) {
        messageProcessor.process();
    }

}

请注意,我使用 topics = "#{'${kafka.topics}'.split(',')}" 从属性文件中获取主题。 这就是我的 kafka.properties 文件的样子:

kafka.topics=pwdChange,pwdCreation
bootstrap.servers=aaa.bbb.com:37900
group.id=pwdManagement
enable.auto.commit=true
auto.commit.interval.ms=1000
session.timeout.ms=30000
schema.registry.url=http://aaa.bbb.com:37800

现在如果我要向订阅中添加一个新主题,比如 pwdExpire,然后修改 prop 文件如下:

kafka.topics=pwdChange,pwdCreation,pwdExpire

有没有办法让我的消费者在不重新启动服务器的情况下开始订阅这个新主题? 我找到了这篇帖子 Spring Kafka - Subscribe new topics during runtime,但文档中有关于 metadata.max.age.ms 的说明:

我们强制刷新的时间段(以毫秒为单位) 元数据,即使我们没有看到任何分区领导更改 主动发现任何新的代理或分区。

在我看来这行不通。感谢您的帮助!

【问题讨论】:

  • 更多信息:我有一个计划任务,每 5 分钟运行一次以检查数据库。那么如果我们在数据库中添加一些控制值,有没有办法强制程序从属性文件中获取并再次消费呢?谢谢!
  • 你可以查看 spring 云配置并使用注释 @refreshscope。我不确定这是否适用于配置注释类。
  • 所以基本上你可以结合 spring config @refresh 和这个关于为生产者和消费者工厂重新初始化 spring bean 的链接 stackoverflow.com/questions/51218086/… 可能对你有用
  • 我认为刷新范围在这里不起作用; AFAIK,刷新范围仅适用于被动 bean;侦听器容器是一个活动组件(实现 SmartLifecycle 并由应用程序上下文启动/停止。
  • 我没有在spring cloud上运行该应用。

标签: spring-boot configuration kafka-consumer-api spring-kafka


【解决方案1】:

没有;唯一的方法是使用主题模式;随着新主题的添加(与模式匹配),默认情况下,代理将在 5 分钟后将它们添加到订阅中。

但是,您可以在运行时为新主题添加新的侦听器容器。

另一种选择是将@KafkaListener bean 加载到子应用程序上下文中,并在每次主题更改时重新创建上下文。

编辑

查看KafkaConsumer.subscribe(Pattern pattern)的javadocs...

/**
 * Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
 * The pattern matching will be done periodically against topics existing at the time of check.
 * <p>
 ...

【讨论】:

  • 谢谢!有没有我也可以参考的例子?
  • @GaryRussell 你介意添加一个模式匹配的例子。另外澄清一下,我猜你的意思是,一旦在代理上创建了一个主题,只要它与消费者正在寻找的模式相匹配,它就会每 5 分钟提取一次。这个 5 分钟计时器是消费者工厂默认设置的可配置属性。
  • @Matt 查看@KafkaListener 上的topicPattern 属性;这是一个正则表达式,例如pwd.* 用于以 pwd 开头的所有主题。 5 分钟是默认的metadata.max.age.ms,它是一个消费者属性(参见 kafka 文档)。 The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.@Hua 哪个选项的例子?
  • @GaryRussell 感谢您的回复!
  • 谢谢@GaryRussell 我对选项 1 很感兴趣。刚刚检查了 KafkaListener 的 API,老实说,我看不出“主题”和“主题模式”之间有什么区别。对于“主题”——此侦听器的主题。条目可以是“主题名称”、“属性占位符键”或“表达式”。必须将表达式解析为主题名称。对于“topicPattern”——此侦听器的主题模式。条目可以是“主题名称”、“属性占位符键”或“表达式”。必须将表达式解析为主题模式。
猜你喜欢
  • 2016-07-09
  • 2015-08-16
  • 2017-01-28
  • 2017-10-17
  • 2019-09-20
  • 2017-01-26
  • 1970-01-01
  • 2018-07-01
  • 2018-07-27
相关资源
最近更新 更多