DefaultMQPushConsumer的负载均衡过程不需要使用者操心,客户端程序会自动处理,每个
1、DefaultMQPushConsumer启动后,会马上触发一个deRebalance动作;
1.1、DefaultMQPushConsumerImpl.start()
1 public synchronized void start() throws MQClientException { 2 switch (this.serviceState) { 3 case CREATE_JUST: 4 log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(), 5 this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode()); 6 this.serviceState = ServiceState.START_FAILED; 7 8 this.checkConfig(); 9 10 this.copySubscription(); 11 12 if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) { 13 this.defaultMQPushConsumer.changeInstanceNameToPID(); 14 } 15 16 this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook); 17 18 this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); 19 this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); 20 this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()); 21 this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); 22 23 this.pullAPIWrapper = new PullAPIWrapper( 24 mQClientFactory, 25 this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); 26 this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); 27 28 if (this.defaultMQPushConsumer.getOffsetStore() != null) { 29 this.offsetStore = this.defaultMQPushConsumer.getOffsetStore(); 30 } else { 31 switch (this.defaultMQPushConsumer.getMessageModel()) { 32 case BROADCASTING: 33 this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); 34 break; 35 case CLUSTERING: 36 this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); 37 break; 38 default: 39 break; 40 } 41 this.defaultMQPushConsumer.setOffsetStore(this.offsetStore); 42 } 43 this.offsetStore.load(); 44 45 if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { 46 this.consumeOrderly = true; 47 this.consumeMessageService = 48 new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); 49 } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { 50 this.consumeOrderly = false; 51 this.consumeMessageService = 52 new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); 53 } 54 55 this.consumeMessageService.start(); 56 57 boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); 58 if (!registerOK) { 59 this.serviceState = ServiceState.CREATE_JUST; 60 this.consumeMessageService.shutdown(); 61 throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() 62 + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), 63 null); 64 } 65 66 mQClientFactory.start(); 67 log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup()); 68 this.serviceState = ServiceState.RUNNING; 69 break; 70 case RUNNING: 71 case START_FAILED: 72 case SHUTDOWN_ALREADY: 73 throw new MQClientException("The PushConsumer service state not OK, maybe started once, " 74 + this.serviceState 75 + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), 76 null); 77 default: 78 break; 79 }
1.2、MQClientInstance.start()
1 public void start() throws MQClientException { 2 3 synchronized (this) { 4 switch (this.serviceState) { 5 case CREATE_JUST: 6 this.serviceState = ServiceState.START_FAILED; 7 // If not specified,looking address from name server 8 if (null == this.clientConfig.getNamesrvAddr()) { 9 this.mQClientAPIImpl.fetchNameServerAddr(); 10 } 11 // Start request-response channel 12 this.mQClientAPIImpl.start(); 13 // Start various schedule tasks 14 this.startScheduledTask(); 15 // Start pull service 16 this.pullMessageService.start(); 17 // Start rebalance service 18 this.rebalanceService.start(); 19 // Start push service 20 this.defaultMQProducer.getDefaultMQProducerImpl().start(false); 21 log.info("the client factory [{}] start OK", this.clientId); 22 this.serviceState = ServiceState.RUNNING; 23 break; 24 case RUNNING: 25 break; 26 case SHUTDOWN_ALREADY: 27 break; 28 case START_FAILED: 29 throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); 30 default: 31 break; 32 } 33 } 34 }
1.3、org.apache.rocketmq.common.ServiceThread.start()
RebalanceService.run()
1 @Override 2 public void run() { 3 log.info(this.getServiceName() + " service started"); 4 5 while (!this.isStopped()) { 6 this.waitForRunning(waitInterval); 7 this.mqClientFactory.doRebalance(); 8 } 9 10 log.info(this.getServiceName() + " service end"); 11 }
1.4、MQClientInstance.doRebalance()‘
1 public void doRebalance() { 2 for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { 3 MQConsumerInner impl = entry.getValue(); 4 if (impl != null) { 5 try { 6 impl.doRebalance(); 7 } catch (Throwable e) { 8 log.error("doRebalance exception", e); 9 } 10 } 11 } 12 }
2、而且在同一个ConsumerGroup里加入新的DefaultMQPushConsumer时,
各个Consumer都会被触发doRebalance动作
ClientRemotingProcessor.processRequest(ChannelHandlerContext, RemotingCommand)
1 @Override 2 public RemotingCommand processRequest(ChannelHandlerContext ctx, 3 RemotingCommand request) throws RemotingCommandException { 4 switch (request.getCode()) { 5 case RequestCode.CHECK_TRANSACTION_STATE: 6 return this.checkTransactionState(ctx, request); 7 case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED: 8 return this.notifyConsumerIdsChanged(ctx, request); 9 case RequestCode.RESET_CONSUMER_CLIENT_OFFSET: 10 return this.resetOffset(ctx, request); 11 case RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT: 12 return this.getConsumeStatus(ctx, request); 13 14 case RequestCode.GET_CONSUMER_RUNNING_INFO: 15 return this.getConsumerRunningInfo(ctx, request); 16 17 case RequestCode.CONSUME_MESSAGE_DIRECTLY: 18 return this.consumeMessageDirectly(ctx, request); 19 default: 20 break; 21 } 22 return null; 23 }
consumer负载均衡策略接口AllocateMessageQueueStrategy
1 /** 2 * Strategy Algorithm for message allocating between consumers 3 */ 4 public interface AllocateMessageQueueStrategy { 5 6 /** 7 * Allocating by consumer id 8 * 9 * @param consumerGroup current consumer group 10 * @param currentCID current consumer id 11 * @param mqAll message queue set in current topic 12 * @param cidAll consumer set in current consumer group 13 * @return The allocate result of given strategy 14 */ 15 List<MessageQueue> allocate( 16 final String consumerGroup, 17 final String currentCID, 18 final List<MessageQueue> mqAll, 19 final List<String> cidAll 20 ); 21 22 /** 23 * Algorithm name 24 * 25 * @return The strategy name 26 */ 27 String getName(); 28 }