【问题标题】:Spring Kafka multiple consumer for single topic consume different messages单个主题的Spring Kafka多个消费者消费不同的消息
【发布时间】:2019-01-10 04:47:31
【问题描述】:

在我的 Spring Boot Kafka 应用程序中,我有以下消费者配置:

@Bean
public ConsumerFactory<String, Post> postConsumerFactory(KafkaProperties kafkaProperties) {
    return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(Post.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {

    kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
    kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);

    ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    factory.setConsumerFactory(postConsumerFactory(kafkaProperties));

    return factory;
}

和消费者:

@KafkaListener(topics = "${kafka.topic.post.send}", containerFactory = "postKafkaListenerContainerFactory")
public void sendPost(ConsumerRecord<String, Post> consumerRecord, Acknowledgment ack) {

    // do some logic

    ack.acknowledge();
}

如果我理解正确的话,现在我只有一个消费者实例。我想增加帖子消费者的数量,假设有 5 个消费者将消费来自${kafka.topic.post.send} 的不同(不相同)消息,以加快消息消费。

是不是就这么简单,把factory.setConcurrency(5);加到我的postKafkaListenerContainerFactory()上,比如:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {

    kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
    kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);

    ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    factory.setConsumerFactory(postConsumerFactory(kafkaProperties));
    factory.setConcurrency(5);

    return factory;
}

还是我需要做一些额外的工作才能实现它?

【问题讨论】:

  • 您需要多个group.id,因此消息会在同一主题的同一组中的各个消费者之间分发。你可以有 1 个微服务来完成这个过程并使用不同的 group.id 运行多个实例
  • 或者,您可以在同一个应用程序中定义多个具有不同 group.id 的侦听器,即:第一个 @KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory") 第二个 @KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
  • @Paizo 谢谢,所以我需要将我的sendPost 方法克隆为 number = 具有不同 ID 的所需听众编号,就这样?
  • 另外,为什么我们有factory.setConcurrency(5); 方法?这种方法的目的是什么?
  • 我的错我不知道;除了我提到的两种方法之外,并发性也应该可以解决问题,请参阅 docs.spring.io/spring-kafka/reference/htmlsingle 4.1.3 Receiving Messages: If, say, 6 TopicPartition s are provided and the concurrency is 3; each container will get 2 partitions

标签: java spring-boot apache-kafka spring-kafka


【解决方案1】:

这不是 Apache Kafka 的工作方式。一个想法总是在单个线程中的同一分区中存在进程记录。 factory.setConcurrency(5); 肯定是围绕一个主题中有多少个分区。所以,如果你只有一个,这个属性不会带来任何价值。如果主题中有 10 个分区,那么 Spring Kafka 会产生 5 个线程,每个线程将处理 2 个分区。

我想说这在Reference Manual 中很清楚:

例如,如果提供了 6 个 TopicPartition 并且并发为 3;每个容器将获得 2 个分区。 5个TopicPartition,2个容器分2个分区,第3个分1个。如果并发大于TopicPartition个数,则调低并发,每个容器分1个分区。

所以,如果你想要你描述的这样一个并发,你确实必须在你的主题中创建 5 个分区。只有在此之后,您才能并行处理同一主题中的记录。

【讨论】:

  • 如果我有一个主题和一组消费者在听这个主题(比如10个),是不是意味着10条消息会被并行处理?
  • 如果所有这些消息都分布在 10 个分区之间,那么是的:它是并行的。来自同一分区的记录按顺序处理。请阅读 Kafka 文档以了解架构
  • 感谢您的回答。我已经阅读了文档,但有一个快速的问题供我理解。我的理解是,如果我将消息放入主题而不提及分区,那么所有消息都会进入相同的默认分区。这种理解正确吗?
  • 不,那不是:默认情况下记录是均匀分布的
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2017-01-26
  • 1970-01-01
  • 2020-08-11
  • 1970-01-01
  • 2019-04-07
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多