【问题标题】:spring Kafka listening to regex春天卡夫卡听正则表达式
【发布时间】:2022-03-08 04:18:25
【问题描述】:

我正在尝试使用以下代码收听新创建的主题,但无法正常工作。请告诉我下面的代码是否正确?

public class KafkaMessageListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class);

    private final ProcessEventModel eventModel;

    @KafkaListener(topicPattern = "betsyncDataTopic*")
    public void receive(ConsumerRecord<String, String> consumerRecord) {
        LOGGER.info("received payload at '{}'", consumerRecord.timestamp());
        eventModel.process(consumerRecord.value());
    }

【问题讨论】:

    标签: java spring spring-kafka


    【解决方案1】:

    您的正则表达式无效;应该是betsyncDataTopic.*

    @KafkaListener(id = "xxx", topicPattern = "kbgh.*")
    public void listen(String in) {
        System.out.println(in);
    }
    

    ...

    partitions assigned: [kbgh290-0]
    

    编辑

    如果您稍后添加与该模式匹配的新主题,则在重新平衡之前会有延迟。根据KafkaConsumerjavadocs...

     * Subscribe to all topics matching specified pattern to get dynamically assigned partitions.
     * The pattern matching will be done periodically against topic existing at the time of check.
     * <p>
     * As part of group management, the consumer will keep track of the list of consumers that
     * belong to a particular group and will trigger a rebalance operation if one of the
     * following events trigger -
     * <ul>
     * <li>Number of partitions change for any of the subscribed list of topics
     * <li>Topic is created or deleted
     * <li>An existing member of the consumer group dies
     * <li>A new member is added to an existing consumer group via the join API
     * </ul>
    

    我刚刚进行了测试;在12:13:32添加了一个新的匹配主题;结果:

    2018-02-12 12:17:30.394  INFO 88028 --- [      xxx-0-C-1] o.s.k.l.KafkaMessageListenerContainer    
    : partitions revoked: [kbgh290-0]
    2018-02-12 12:17:30.450  INFO 88028 --- [      xxx-0-C-1] o.s.k.l.KafkaMessageListenerContainer    
    : partitions assigned: [kbgh290-0, kbghNew-0]
    

    所以默认需要 5 分钟。

    https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms

    即使我们没有看到任何分区领导层更改以主动发现任何新的代理或分区,我们也会强制刷新元数据的时间段(以毫秒为单位)。

    【讨论】:

    • 嗨@Gary,它工作但没有从动态创建的主题中挑选消息。你认为有办法从新创建的主题“kbgh.1234”中获取这些消息吗?
    • 谢谢,Gary,我看到了。一开始我有1000个主题,后来又加了几百个,所以延迟很长,但是当我有40-50个主题,我增加了10-15个时,速度更快。
    • 默认元数据刷新时间为 5 分钟。
    【解决方案2】:

    topicPattern 应该是 betsyncDataTopic.*

    创建主题后,消费者必须等待刷新新的元数据。由于 metadata.max.age.ms 默认值需要 5 分钟:https://kafka.apache.org/documentation/#producerconfigs_metadata.max.age.ms

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-12-17
      • 2017-02-08
      • 1970-01-01
      • 2015-06-18
      • 2021-11-30
      相关资源
      最近更新 更多