【问题标题】:Kafka Consumer taking total control of partitions and offsets?Kafka Consumer 完全控制分区和偏移量?
【发布时间】:2021-10-10 02:07:09
【问题描述】:

我有一个用例,我有多个最终用户通过 websocket 连接到我的基于 java 的 Spring Boot 微服务。

User A --> WebsocketSession 1
User B --> WebsocketSession 2

现在在后端,我有一堆 Kafka 侦听器从主题的分区接收数据并将其广播到所有“活动”Websocket(在上面的示例中,用户 A 和用户 B 将接收数据)。 “活动”网络套接字是存储在内存中的简单映射。

现在的挑战是,如果服务崩溃并且用户重新连接到不同的实例,我需要他们从上次中断的地方开始接收消息。

看起来我需要在这里完全控制分区和偏移量(即手动提交并将其记录在持久存储中)?

这是否太过分了(即命令式地做事而不是推荐的方法)?

此类用例的推荐方法是什么?

【问题讨论】:

  • 您提到的“推荐方法”是什么?

标签: apache-kafka kafka-consumer-api spring-kafka


【解决方案1】:

现在的挑战是服务是否崩溃并且用户重新连接到 另一个实例,我需要他们开始接收消息 从他们离开的地方开始

这还不是默认的消费者行为吗?您是否可以为每个唯一用户设置一个 consumer group.id,然后开箱即用地解决此问题?

如果您已经在使用大量 kafka 侦听器,那么很可能必须重做所有代码以管理偏移量。


话虽如此

我编写了一个类,它对“全局”消费者行为执行类似的操作,我必须手动管理消费者偏移量并手动分配分区。这应该让您了解所涉及的内容

用例是:create a consumer to pull all data from all partitions in a topic up until it hits the current offsets at time t

因此,例如,如果您有 3 个分区,其结束偏移量分别为 100、200 和 150,那么消费者将从偏移量 0 读取到 N=100,200,150 的每个分区,然后返回结果集合。

以此为基准,您可以修改行为以从消费者组 id 的最后一次提交开始(每个用户唯一),手动读取并提交消费者组的偏移量。退出条件可能是 websocket 会话的结束。

另外需要注意的是,当您像我一样手动分配分区时,您会失去消费者组平衡行为。

/**
 * 
 * This class pulls an entire Kafka topic and then exits based on the latest
 * offsets in the topic
 * 
 * @author Fermi-4
 *
 * @param <K>
 * @param <T>
 */
@Slf4j
public class KafkaRunnerImpl<K, T> implements KafkaRunner<T> {

    final Deserializer<K> _keySerde;
    final Deserializer<T> _valueSerde;
    final String _bootstrapServers;
    final String _topic;
    final int _requestTimeout;
    final int _sampleTimeout;
    
    public KafkaRunnerImpl(Deserializer<K> _keySerde, Deserializer<T> _valueSerde, String _bootstrapServers, String _topic, int _requestTimeout,
            int _sampleTimeout) {
        super();
        this._topic = _topic;
        this._keySerde = _keySerde;
        this._valueSerde = _valueSerde;
        this._bootstrapServers = _bootstrapServers;
        this._requestTimeout = _requestTimeout;
        this._sampleTimeout = _sampleTimeout;
    }
    
    private Map<TopicPartition, Long> _endOffsets;
    
    /**
     * 
     * https://stackoverflow.com/questions/52837302/strategies-to-ensure-that-each-kafka-consumer-has-a-different-unique-group-id
     * 
     * in order to ensure we get all of the data from the partitions of a topic we need 
     * to manually assign the partitions to the consumer.
     * 
     * There are tradeoffs to doing this and is not the intended use for kafka (see above)
     * 
     * 
     */
    public Collection<T> pull() {
        try(KafkaConsumer<K, T> consumer = new KafkaConsumer<K, T>(getConsumerProps())) {
        
            Collection<T> records = new ArrayList<T>();
        
            /*
             * subscribe to topic
             */
            
            List<PartitionInfo> info = consumer.partitionsFor(_topic);
            
            /*
             * manually assign the partitions
             */
             consumer.assign(info.stream().map(partitionInfo -> new TopicPartition(_topic, partitionInfo.partition())).collect(Collectors.toList()));
            
            /*
             * partitions belonging to this consumer
             */
            Set<TopicPartition> partitions = consumer.assignment();
            
            /*
             * seek the beginning of assigned partitions
             */
            consumer.seekToBeginning(partitions);
            
            /*
             * the target partition end offsets
             */
            _endOffsets = consumer.endOffsets(partitions);
            log.info(_endOffsets.toString());

            
            /*
             * start loop
             */
            while(comparePartitionOffsets(consumer)) {
                /*
                 * poll for messages
                 */
                ConsumerRecords<K, T> sample = consumer.poll(Duration.ofMillis(_sampleTimeout));        
                
                for(TopicPartition partition : partitions) {
                    /*
                     * grab the records only from this partition
                     */
                    List<ConsumerRecord<K, T>> recordsForPartition = sample.records(partition);
                    
                    /*
                     * iterate through all the records and add to the collection 
                     * ONLY if the record offset is less than the target offset
                     * for current partition
                     */
                    for(ConsumerRecord<K, T> consumerRecord : recordsForPartition) {
                        log.debug("record offset: " + consumerRecord.offset() + " partition: " + consumerRecord.partition());
                        if(consumerRecord.offset() <= _endOffsets.get(partition)) {
                            records.add(consumerRecord.value());
                        }
                    }
                }
                
            }
        
            return records;
        
        } catch (Exception e) {
            // handle the exception 
        }
        
        return null;
    }
    
    // compare current offsets to the target offsets, return false if offsets not met yet    
    private boolean comparePartitionOffsets(KafkaConsumer<K, T> consumer) {
        return !_endOffsets.keySet().stream().allMatch(tp -> {
            log.info("consumer: " +consumer.position(tp) + " target: " + _endOffsets.get(tp));
            boolean offsetsReached = consumer.position(tp) >= (_endOffsets.get(tp)-1);
            if(offsetsReached) log.info("Target offsets reached for partition: " + tp.partition());
            return offsetsReached;
        });
    }

    private Properties getConsumerProps() {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "runner-group");
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        properties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, _requestTimeout); // polls until this timeout is reached then closes
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, _bootstrapServers);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, _valueSerde.getClass());
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, _keySerde.getClass());
        return properties;
    }



}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-11-25
    • 2018-05-30
    • 1970-01-01
    • 1970-01-01
    • 2021-02-22
    • 2023-03-02
    • 1970-01-01
    相关资源
    最近更新 更多