【问题标题】:Can a single Spring's KafkaConsumer listener listens to multiple messages from same/one partition?单个 Spring Kafka Consumer 侦听器可以侦听来自同一/一个分区的多条消息吗?
【发布时间】:2017-12-17 20:35:55
【问题描述】:

提供了一个具有 n 个分区的主题。 Spring 的 KafkaConsumer 监听器有什么方法可以一次监听来自同一个/一个分区的多条消息?

我通过设置setBatchListener(true); 尝试了ConcurrentKafkaListenerContainerFactory,但消费者已经开始使用来自不同分区而不是一个分区的多条消息。

public class BatchReceiverConfig {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch4");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // maximum records per batch receive
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");

        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(2);
        // enable batch listeners
//        factory.setBatchListener(true);

        return factory;
    }

    @Bean
    public BatchReceiver receiver() {
        return new BatchReceiver();
    }
}


/* Listener */
@KafkaListener(id = "batch-listener", topics = TOPIC_TEST_BATCH)
    public void receive(List<String> data,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                        @Header(KafkaHeaders.OFFSET) List<Long> offsets) {

        LOGGER.info("start of batch receive");
        for (int i = 0; i < data.size(); i++) {
            LOGGER.info("received message='{}' with partition-offset='{}'", data.get(i),
                    partitions.get(i) + "-" + offsets.get(i));
            // handle message

            latch.countDown();
        }
        LOGGER.info("end of batch receive");
    }

【问题讨论】:

  • 重新表述您的问题:“单个侦听器如何仅使用来自一个分区的消息?”。对吗?
  • 这是正确的,但在 spring-kafka 模块的上下文中。

标签: java spring apache-kafka kafka-consumer-api spring-kafka


【解决方案1】:

如果一个组中只有一个消费者,那么您可以将那个ConcurrentKafkaListener 中的concurrencyLevel 设置为n

【讨论】:

  • 它没有用,我相信 'max.poll.records' 是决定要获取的记录数的属性。
  • 告诉我如何配置 Spring Kafka 和 Kafka 消费者本身。
  • 已添加有问题的 sn-p。
【解决方案2】:

如果您使用组管理,Kafka 会为您的消费者分配分区。每个消费者(或使用并发时的线程)将获得 0、1 或更多分区,具体取决于有多少消费者和分区。

如果您希望特定消费者仅从一个分区获取消息,则必须自己分配分区,而不是让 Kafka 进行分配。

如果你使用并发容器;您必须分配与并发相同数量的分区,因此每个线程只能获得一个分区。

【讨论】:

  • 在有问题的 sn-p 中,分区数 = 并发,即使这样它也不起作用。此外,# of consumer = # of partitions。
  • 如果您使用的是组管理,则分配分区需要时间;最初,消费者会得到它们;唯一的保证是自己分配分区,而不是使用组管理。 props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch4");
  • 在集群环境中,很难定义这一点,因为相同的代码部署到所有节点。有什么办法可以动态配置吗?
  • 有一个new option in the 0.11.0.0 client来延迟分区分配。我们正在努力增加对该客户端的支持。但是,即使这样,仍然无法保证 - 如果您的一个实例死亡,它的分区将被重新分配。唯一的保证是使用手动赋值;您可以使用属性占位符;每个实例都有不同的属性。
猜你喜欢
  • 2023-04-06
  • 2019-06-17
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-02-18
  • 1970-01-01
相关资源
最近更新 更多