【问题标题】:Message not getting distributed in RoundRobin order when increasing the number of partition in Kafka增加 Kafka 中的分区数时,消息未按 RoundRobin 顺序分发
【发布时间】:2017-07-24 18:57:39
【问题描述】:

您好,我是 kafka 新手,我使用的是 kafka 版本 0.10.2 和 zookeeper 版本 3.4.9 。我有一个主题有两个分区和两个正在运行的消费者。所以为了提高处理速度,我决定将分区数增加到 10,这样我就可以增加消费者的数量。所以我运行了命令

./kafka-topics.sh --zookeeper localhost:2181 --alter --topic 主题 --partition 10.

所以我观察到了两件奇怪的事情

  1. 我的消费者仍然只连接到两个分区。分区的其余部分没有任何消费者。(两个消费者应该监听所有 10 个分区的预期行为)

  2. 消息仅被推送到两个(旧分区)。新分区未收到任何消息。(预期行为消息应以 RoundRobin 方式分布在所有分区中。)

我正在使用此命令查看有关分区的详细信息

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group 主题组

我的消费者代码:

class KafkaPollingConsumer implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(KafkaPollingConsumer.class)
        private static final String TAG = "[KafkaPollingConsumer]"
        private final KafkaConsumer<String, byte []> kafkaConsumer
        private Map<TopicPartition,OffsetAndMetadata> currentOffsetsMap = new HashMap<>()
        List topicNameList
        Map kafkaTopicConfigMap = new HashMap<String,Object>()
        Map kafkaTopicMessageListMap = new HashMap<String,List>()

        public KafkaPollingConsumer(String serverType, String groupName, String topicNameRegex){
            logger.debug("{} [Constructor] [Enter] Thread Name {} serverType group Name TopicNameRegex",TAG,Thread.currentThread().getName(),serverType,groupName,topicNameRegex)
            logger.debug("Populating Property for kafak consumer")
            Properties kafkaConsumerProperties = new Properties()
            kafkaConsumerProperties.put("group.id", groupName)
            kafkaConsumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
            kafkaConsumerProperties.put("value.deserializer", "com.custom.kafkaconsumer.deserializer.CustomObjectDeserializer")
            switch(serverType){
                case KafkaTopicConfigEntity.KAFKA_NODE_TYPE_ENUM.Priority.toString() :
                    kafkaConsumerProperties.put("bootstrap.servers",ConfigLoader.conf.kafkaServer.priority.kafkaNode)
                    kafkaConsumerProperties.put("enable.auto.commit",ConfigLoader.conf.kafkaServer.priority.consumer.enable.auto.commit)
                    kafkaConsumerProperties.put("auto.offset.reset",ConfigLoader.conf.kafkaServer.priority.consumer.auto.offset.reset)
                    break
                case KafkaTopicConfigEntity.KAFKA_NODE_TYPE_ENUM.Bulk.toString() :
                    kafkaConsumerProperties.put("bootstrap.servers",ConfigLoader.conf.kafkaServer.bulk.kafkaNode)
                    kafkaConsumerProperties.put("enable.auto.commit",ConfigLoader.conf.kafkaServer.bulk.consumer.enable.auto.commit)
                    kafkaConsumerProperties.put("auto.offset.reset",ConfigLoader.conf.kafkaServer.bulk.consumer.auto.offset.reset)
                    kafkaConsumerProperties.put("max.poll.records",10)
                    kafkaConsumerProperties.put("max.poll.interval.ms",900000)
                    kafkaConsumerProperties.put("request.timeout.ms",900000)
                    break
                default :
                    throw "Invalid server type"
                    break
            }
            logger.debug("{} [Constructor] KafkaConsumer Property Populated {}",properties.toString())
            kafkaConsumer = new KafkaConsumer<String, byte []>(kafkaConsumerProperties)
            topicNameList = topicNameRegex.split(Pattern.quote('|'))
            logger.debug("{} [Constructor] Kafkatopic List {}",topicNameList.toString())
            logger.debug("{} [Constructor] Exit",TAG)
        }

        private class HandleRebalance implements ConsumerRebalanceListener {
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            }

            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                if(currentOffsetsMap != null && !currentOffsetsMap.isEmpty()) {
                    logger.debug("{} In onPartitionsRevoked Rebalanced ",TAG)
                    kafkaConsumer.commitSync(currentOffsetsMap)
                }
            }
        }

        @Override
        void run() {
            logger.debug("{} Starting Thread ThreadName {}",TAG,Thread.currentThread().getName())
            populateKafkaConfigMap()
            initializeKafkaTopicMessageListMap()
            String topicName
            String consumerClassName
            String consumerMethodName
            Boolean isBatchJob
            Integer batchSize = 0
            final Thread mainThread = Thread.currentThread()
            Runtime.getRuntime().addShutdownHook(new Thread() {
                public void run() {
                    logger.error("{},gracefully shutdowning thread {}",TAG,mainThread.getName())
                    kafkaConsumer.wakeup()
                    try {
                        mainThread.join()
                    } catch (InterruptedException exception) {
                        logger.error("{} Error : {}",TAG,exception.getStackTrace().join("\n"))
                    }
                }
            })
            kafkaConsumer.subscribe(topicNameList , new HandleRebalance())
            try{
                while(true){
                    logger.debug("{} Starting Consumer with polling time in ms 100",TAG)
                    ConsumerRecords kafkaRecords = kafkaConsumer.poll(100)
                    for(ConsumerRecord record: kafkaRecords){
                        topicName = record.topic()
                        DBObject kafkaTopicConfigDBObject = kafkaTopicConfigMap.get(topicName)
                        consumerClassName = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.CLASS_NAME_KEY)
                        consumerMethodName = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.METHOD_NAME_KEY)
                        isBatchJob = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.IS_BATCH_JOB_KEY)
                        logger.debug("Details about Message")
                        logger.debug("Thread {}",mainThread.getName())
                        logger.debug("Topic {}",topicName)
                        logger.debug("Partition {}",record.partition().toString())
                        logger.debug("Offset {}",record.offset().toString())
                        logger.debug("clasName {}",consumerClassName)
                        logger.debug("methodName {}",consumerMethodName)
                        logger.debug("isBatchJob {}",isBatchJob.toString())
                        if(isBatchJob == true){
                            batchSize = Integer.parseInt(kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.BATCH_SIZE_KEY).toString())
                            logger.debug("batchSize {}",batchSize.toString())
                        }
                        Object message = record.value()
                        logger.debug("message {}",message.toString())
                        publishMessageToConsumers(consumerClassName,consumerMethodName,isBatchJob,batchSize,message,topicName)
                        Thread.sleep(60000)
                        currentOffsetsMap.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() +1))
                    }
                    logger.debug("{} Commiting Messages to Kafka",TAG)
                    kafkaConsumer.commitSync(currentOffsetsMap)
                }
            }
            catch(InterruptException exception){
                logger.error("{} In InterruptException",TAG)
                logger.error("{} Exception {}",TAG,exception.getStackTrace().join("\n"))
            }
            catch (WakeupException exception) {
                logger.error("{} In WakeUp Exception",TAG)
                logger.error("{} Exception {}",TAG,exception.getStackTrace().join("\n"))
            }
            catch(Exception exception){
                logger.error("{} In Exception",TAG)
                logger.error("{} Exception {}",TAG,exception.getStackTrace().join("\n"))
            }
            finally {
                logger.error("{} In finally commiting remaining offset ",TAG)
                publishAllKafkaTopicBatchMessages()
                kafkaConsumer.commitSync(currentOffsetsMap)
                kafkaConsumer.close()
                logger.error("{} Exiting Consumer",TAG)
            }
        }


private void publishMessageToConsumers(String consumerClassName,String consumerMethodName,Boolean isBatchJob,Integer batchSize,Object message, String topicName){
    logger.debug("{} [publishMessageToConsumer] Enter",TAG)
    if(isBatchJob == true){
        publishMessageToBatchConsumer(consumerClassName, consumerMethodName,batchSize, message, topicName)
    }
    else{
        publishMessageToNonBatchConsumer(consumerClassName, consumerMethodName, message)
    }
    logger.debug("{} [publishMessageToConsumer] Exit",TAG)
}

private void publishMessageToNonBatchConsumer(String consumerClassName, String consumerMethodName, message){
    logger.debug("{} [publishMessageToNonBatchConsumer] Enter",TAG)
    executeConsumerMethod(consumerClassName,consumerMethodName,message)
    logger.debug("{} [publishMessageToNonBatchConsumer] Exit",TAG)
}

private void publishMessageToBatchConsumer(String consumerClassName, String consumerMethodName, Integer batchSize, Object message, String topicName){
    logger.debug("{} [publishMessageToBatchConsumer] Enter",TAG)
    List consumerMessageList = kafkaTopicMessageListMap.get(topicName)
    consumerMessageList.add(message)
    if(consumerMessageList.size() == batchSize){
        logger.debug("{} [publishMessageToBatchConsumer] Pushing Messages In Batches",TAG)
        executeConsumerMethod(consumerClassName, consumerMethodName, consumerMessageList)
        consumerMessageList.clear()
    }
    kafkaTopicMessageListMap.put(topicName,consumerMessageList)
    logger.debug("{} [publishMessageToBatchConsumer] Exit",TAG)
}

private void populateKafkaConfigMap(){
    logger.debug("{} [populateKafkaConfigMap] Enter",TAG)
    KafkaTopicConfigDBService kafkaTopicConfigDBService = KafkaTopicConfigDBService.getInstance()
    topicNameList.each { topicName ->
        DBObject kafkaTopicDBObject = kafkaTopicConfigDBService.findByTopicName(topicName)
        kafkaTopicConfigMap.put(topicName,kafkaTopicDBObject)
    }
    logger.debug("{} [populateKafkaConfigMap] kafkaConfigMap {}",TAG,kafkaTopicConfigMap.toString())
    logger.debug("{} [populateKafkaConfigMap] Exit",TAG)
}

private void initializeKafkaTopicMessageListMap(){
    logger.debug("{} [initializeKafkaTopicMessageListMap] Enter",TAG)
    topicNameList.each { topicName ->
        kafkaTopicMessageListMap.put(topicName,[])
    }
    logger.debug("{} [populateKafkaConfigMap] kafkaTopicMessageListMap {}",TAG,kafkaTopicMessageListMap.toString())
    logger.debug("{} [initializeKafkaTopicMessageListMap] Exit",TAG)
}

private void executeConsumerMethod(String className, String methodName, def messages){
    try{
        logger.debug("{} [executeConsumerMethod] Enter",TAG)
        logger.debug("{} [executeConsumerMethod] className  {} methodName {} messages {}",TAG,className,methodName,messages.toString())
        Class.forName(className)."$methodName"(messages)
    } catch (Exception exception){
        logger.error("{} [{}] Error while executing method : {} of class: {} with params : {} - {}", TAG, Thread.currentThread().getName(), methodName,
                className, messages.toString(), exception.getStackTrace().join("\n"))
    }
    logger.debug("{} [executeConsumerMethod] Exit",TAG)
}

private void publishAllKafkaTopicBatchMessages(){
    logger.debug("{} [publishAllKafkaTopicBatchMessages] Enter",TAG)
    String consumerClassName = null
    String consumerMethodName = null
    kafkaTopicMessageListMap.each { topicName,messageList ->
        DBObject kafkaTopicDBObject = kafkaTopicConfigMap.get(topicName)
        consumerClassName = kafkaTopicDBObject.get(KafkaTopicConfigEntity.CLASS_NAME_KEY)
        consumerMethodName = kafkaTopicDBObject.get(KafkaTopicConfigEntity.METHOD_NAME_KEY)
        logger.debug("{} Pushing message in topic {} className {} methodName {} ",TAG,topicName,consumerClassName,consumerMethodName)
        if(messageList != null && messageList.size() > 0){
            executeConsumerMethod(consumerClassName, consumerMethodName, messageList)
            messageList.clear()
            kafkaTopicMessageListMap.put(topicName,messageList)
        }
    }
    logger.debug("{} [publishAllKafkaTopicBatchMessages] Exit",TAG)
}

消费者属性是:

auto.commit.interval.ms = 5000 
auto.offset.reset = earliest 
bootstrap.servers = [localhost:9092] 
check.crcs = true 
client.id = consumer-1 
connections.max.idle.ms = 540000 
enable.auto.commit = false 
exclude.internal.topics = true 
fetch.max.bytes = 52428800 
fetch.max.wait.ms = 500 
fetch.min.bytes = 1 
group.id = t1 
heartbeat.interval.ms = 3000 
interceptor.classes = null 
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 
max.partition.fetch.bytes = 1048576 
max.poll.interval.ms = 36000000 
max.poll.records = 10 
metadata.max.age.ms = 300000 
metric.reporters = [] 
metrics.num.samples = 2 
metrics.recording.level = INFO 
metrics.sample.window.ms = 30000 
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] 
receive.buffer.bytes = 65536 
reconnect.backoff.ms = 50 
request.timeout.ms = 36000000 
retry.backoff.ms = 100 
sasl.jaas.config = null 
sasl.kerberos.kinit.cmd = /usr/bin/kinit 
sasl.kerberos.min.time.before.relogin = 60000 
sasl.kerberos.service.name = null 
sasl.kerberos.ticket.renew.jitter = 0.05 
sasl.kerberos.ticket.renew.window.factor = 0.8 
sasl.mechanism = GSSAPI 
security.protocol = PLAINTEXT 
send.buffer.bytes = 131072 
session.timeout.ms = 10000 
ssl.cipher.suites = null 
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] 
ssl.endpoint.identification.algorithm = null 
ssl.key.password = null 
ssl.keymanager.algorithm = SunX509 
ssl.keystore.location = null 
ssl.keystore.password = null 
ssl.keystore.type = JKS 
ssl.protocol = TLS 
ssl.provider = null 
ssl.secure.random.implementation = null 
ssl.trustmanager.algorithm = PKIX 
ssl.truststore.location = null 
ssl.truststore.password = null 
ssl.truststore.type = JKS 
value.deserializer = class com.custom.kafkaconsumer.deserializer.CustomObjectDeserializer

生产者代码:

 Properties kafkaProducerProperties = getKafkaProducerProperties(topicName)
        if(kafkaProducerProperties != null){
            priorityKafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer<String, byte[]>(kafkaProducerProperties)
                ProducerRecord<String,byte []> record = new ProducerRecord<String,byte []>(topicName, messageMap)
            try {
                priorityKafkaProducer.send(record).get()
                priorityKafkaProducer.close()
            } catch (Exception e) {
                e.printStackTrace()
            }

        }
        else{
            throw "Invalid Producer Properties for " + topicName
        }

生产者配置:

acks = 1
    batch.size = 16384
    block.on.buffer.full = false
    bootstrap.servers = [localhost:9092]
    buffer.memory = 33554432
    client.id = 
    compression.type = none
    connections.max.idle.ms = 540000
    interceptor.classes = null
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.fetch.timeout.ms = 60000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 0
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    timeout.ms = 30000
    value.serializer = class com.abhimanyu.kafkaproducer.serializer.CustomObjectSerializer

我面临的问题是预期行为还是我遗漏了什么?

【问题讨论】:

    标签: apache-kafka kafka-consumer-api kafka-producer-api


    【解决方案1】:

    您是否等待了 5 分钟(或任何配置的元数据刷新时间间隔)?

    【讨论】:

    • 嗨汉斯我已经设置了属性 metadata.max.age.ms = 5000 。但是,添加一个新分区仍然需要 5 分钟。我是否设置了错误的属性?
    • 尝试设置生产者属性 topic.metadata.refresh.interval.ms 见stackoverflow.com/questions/27722871/…
    • 感谢您的回复,我认为 topic.metadata.refresh.ms 适用于 Kafka 0.80 版,我使用的是 Kafka 0.10 版。我仍然在我的 producer.properties 中添加了它,但它没有用。
    • 是的,你当然是对的。很抱歉混淆了新旧属性。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-03-04
    • 2021-01-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-07-01
    • 2023-04-07
    相关资源
    最近更新 更多