【问题标题】:How to create multiple consumers in a consume group using Spring provided Kafka apis如何使用 Spring 提供的 Kafka api 在消费组中创建多个消费者
【发布时间】:2020-07-06 14:11:08
【问题描述】:

我正在尝试在一个消费者组中创建多个消费者以进行并行处理,因为我们有大量消息流入。我正在使用 spring boot 和 KafkTemplate。我们如何在 Spring Boot 应用程序的单个实例中创建属于单个消费者组的多个消费者? 使用@KafkaListener 注解的多个方法会创建多个消费者吗?

【问题讨论】:

    标签: java apache-kafka spring-kafka


    【解决方案1】:

    你必须使用ConcurrentMessageListenerContainer。它委托给一个或多个KafkaMessageListenerContainer 实例以提供多线程消费。

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(10);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
    

    factory.setConcurrency(10) 创建 10 个KafkaMessageListenerContainer 实例。每个实例都有一定数量的分区。这取决于您在创建主题时配置的分区数。

    一些准备步骤:

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    
    private final static String BOOTSTRAP_ADDRESS = "localhost:9092";
    private final static String CONSUMER_GROUP = "consumer-group-1";
    private final static String TOPIC = "test-topic";
    
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_ADDRESS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }
    
    
    @KafkaListener(topics = TOPIC, containerFactory = "kafkaListenerContainerFactory")
    public void listen(@Payload String message) {
        logger.info(message);
    }
    
    public void start() {
        try {
            Thread.sleep(5000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        for (int i = 0; i < 10; i++) {
            kafkaTemplate.send(TOPIC, i, String.valueOf(i), "Message " + i);
        }
        
        logger.info("All message are sent");
    }
    

    如果您运行上面的方法,您可以看到每个KafkaMessageListenerContainer 实例处理被放入该实例所服务的分区的消息。 添加 Thread.sleep() 等待消费者初始化。

    2020-07-01 15:48:34.800  INFO 201566 --- [ntainer#0-4-C-1] r.s.c.KafkaConsumersDemo                 : Message 5
    2020-07-01 15:48:34.801  INFO 201566 --- [ntainer#0-6-C-1] r.s.c.KafkaConsumersDemo                 : Message 7
    2020-07-01 15:48:34.800  INFO 201566 --- [ntainer#0-7-C-1] r.s.c.KafkaConsumersDemo                 : Message 8
    2020-07-01 15:48:34.800  INFO 201566 --- [ntainer#0-9-C-1] r.s.c.KafkaConsumersDemo                 : Message 1
    2020-07-01 15:48:34.800  INFO 201566 --- [ntainer#0-0-C-1] r.s.c.KafkaConsumersDemo                 : Message 0
    2020-07-01 15:48:34.800  INFO 201566 --- [ntainer#0-8-C-1] r.s.c.KafkaConsumersDemo                 : Message 9
    2020-07-01 15:48:34.800  INFO 201566 --- [ntainer#0-3-C-1] r.s.c.KafkaConsumersDemo                 : Message 4
    2020-07-01 15:48:34.801  INFO 201566 --- [ntainer#0-2-C-1] r.s.c.KafkaConsumersDemo                 : Message 3
    2020-07-01 15:48:34.801  INFO 201566 --- [ntainer#0-1-C-1] r.s.c.KafkaConsumersDemo                 : Message 2
    2020-07-01 15:48:34.800  INFO 201566 --- [ntainer#0-5-C-1] r.s.c.KafkaConsumersDemo                 : Message 6
    

    【讨论】:

    • 谢谢@Salavat Yalalo!我可以创建多个消费者,但我们如何知道哪个消费者已经处理/读取了来自主题的消息?日志中是否会打印任何消费者 ID?我在您的回答中看到“[ntainer#0-5-C-1]”,并且我假设在日志中的某处打印了消费者 ID(如果存在)。如果是,你能告诉我吗?
    【解决方案2】:

    是的,@KafkaListener 将为您创建多个消费者。

    您可以将它们全部配置为使用相同的主题并属于同一组。 Kafka 协调器会将分区分配给您的消费者。

    虽然如果你的topic只有一个partition,并发是不会发生的:单个partition是在一个thread中处理的。

    另一种选择确实是配置concurrency,并且将再次根据concurrency &lt;-&gt; partition 状态创建多个消费者。

    【讨论】:

    • Yes, the @KafkaListener will create multiple consumers for you.。快速检查确实表明存在concurrency 元素,如果这就是您的意思。无论如何,谢谢 Artem!
    【解决方案3】:

    正如@Salavat Yalalo 建议的那样,我将我的 Kafka 容器工厂设置为 ConcurrentKafkaListenerContainerFactory。在@KafkaListenere 方法中,我添加了名为 concurrency 的选项,它接受一个整数作为字符串,表示要跨越的消费者数量,如下所示

    @KafakListener(concurrency ="4", containerFactory="concurrentKafkaListenerContainerFactory(bean name of the factory)",..other optional values)
    public void topicConsumer(Message<MyObject> myObject){
    //.....
    }
    

    运行时,我看到在一个消费者组中创建了 4 个消费者。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-04-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-12-27
      相关资源
      最近更新 更多