DefaultMQPushConsumer的负载均衡过程不需要使用者操心,客户端程序会自动处理,每个

1、DefaultMQPushConsumer启动后,会马上触发一个deRebalance动作;

      1.1、DefaultMQPushConsumerImpl.start()

 1     public synchronized void start() throws MQClientException {
 2         switch (this.serviceState) {
 3             case CREATE_JUST:
 4                 log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
 5                     this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
 6                 this.serviceState = ServiceState.START_FAILED;
 7 
 8                 this.checkConfig();
 9 
10                 this.copySubscription();
11 
12                 if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
13                     this.defaultMQPushConsumer.changeInstanceNameToPID();
14                 }
15 
16                 this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
17 
18                 this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
19                 this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
20                 this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
21                 this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
22 
23                 this.pullAPIWrapper = new PullAPIWrapper(
24                     mQClientFactory,
25                     this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
26                 this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
27 
28                 if (this.defaultMQPushConsumer.getOffsetStore() != null) {
29                     this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
30                 } else {
31                     switch (this.defaultMQPushConsumer.getMessageModel()) {
32                         case BROADCASTING:
33                             this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
34                             break;
35                         case CLUSTERING:
36                             this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
37                             break;
38                         default:
39                             break;
40                     }
41                     this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
42                 }
43                 this.offsetStore.load();
44 
45                 if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
46                     this.consumeOrderly = true;
47                     this.consumeMessageService =
48                         new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
49                 } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
50                     this.consumeOrderly = false;
51                     this.consumeMessageService =
52                         new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
53                 }
54 
55                 this.consumeMessageService.start();
56 
57                 boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
58                 if (!registerOK) {
59                     this.serviceState = ServiceState.CREATE_JUST;
60                     this.consumeMessageService.shutdown();
61                     throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
62                         + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
63                         null);
64                 }
65 
66                 mQClientFactory.start();
67                 log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
68                 this.serviceState = ServiceState.RUNNING;
69                 break;
70             case RUNNING:
71             case START_FAILED:
72             case SHUTDOWN_ALREADY:
73                 throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
74                     + this.serviceState
75                     + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
76                     null);
77             default:
78                 break;
79         }

 

      1.2、MQClientInstance.start()

 1     public void start() throws MQClientException {
 2 
 3         synchronized (this) {
 4             switch (this.serviceState) {
 5                 case CREATE_JUST:
 6                     this.serviceState = ServiceState.START_FAILED;
 7                     // If not specified,looking address from name server
 8                     if (null == this.clientConfig.getNamesrvAddr()) {
 9                         this.mQClientAPIImpl.fetchNameServerAddr();
10                     }
11                     // Start request-response channel
12                     this.mQClientAPIImpl.start();
13                     // Start various schedule tasks
14                     this.startScheduledTask();
15                     // Start pull service
16                     this.pullMessageService.start();
17                     // Start rebalance service
18                     this.rebalanceService.start();
19                     // Start push service
20                     this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
21                     log.info("the client factory [{}] start OK", this.clientId);
22                     this.serviceState = ServiceState.RUNNING;
23                     break;
24                 case RUNNING:
25                     break;
26                 case SHUTDOWN_ALREADY:
27                     break;
28                 case START_FAILED:
29                     throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
30                 default:
31                     break;
32             }
33         }
34     }

 

      1.3、org.apache.rocketmq.common.ServiceThread.start()

              RebalanceService.run()

 1     @Override
 2     public void run() {
 3         log.info(this.getServiceName() + " service started");
 4 
 5         while (!this.isStopped()) {
 6             this.waitForRunning(waitInterval);
 7             this.mqClientFactory.doRebalance();
 8         }
 9 
10         log.info(this.getServiceName() + " service end");
11     }

  1.4、MQClientInstance.doRebalance()‘

 1     public void doRebalance() {
 2         for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
 3             MQConsumerInner impl = entry.getValue();
 4             if (impl != null) {
 5                 try {
 6                     impl.doRebalance();
 7                 } catch (Throwable e) {
 8                     log.error("doRebalance exception", e);
 9                 }
10             }
11         }
12     }

 

 

2、而且在同一个ConsumerGroup里加入新的DefaultMQPushConsumer时,

各个Consumer都会被触发doRebalance动作

ClientRemotingProcessor.processRequest(ChannelHandlerContext, RemotingCommand)

 1     @Override
 2     public RemotingCommand processRequest(ChannelHandlerContext ctx,
 3         RemotingCommand request) throws RemotingCommandException {
 4         switch (request.getCode()) {
 5             case RequestCode.CHECK_TRANSACTION_STATE:
 6                 return this.checkTransactionState(ctx, request);
 7             case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED:
 8                 return this.notifyConsumerIdsChanged(ctx, request);
 9             case RequestCode.RESET_CONSUMER_CLIENT_OFFSET:
10                 return this.resetOffset(ctx, request);
11             case RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT:
12                 return this.getConsumeStatus(ctx, request);
13 
14             case RequestCode.GET_CONSUMER_RUNNING_INFO:
15                 return this.getConsumerRunningInfo(ctx, request);
16 
17             case RequestCode.CONSUME_MESSAGE_DIRECTLY:
18                 return this.consumeMessageDirectly(ctx, request);
19             default:
20                 break;
21         }
22         return null;
23     }

 consumer负载均衡策略接口AllocateMessageQueueStrategy

 1 /**
 2  * Strategy Algorithm for message allocating between consumers
 3  */
 4 public interface AllocateMessageQueueStrategy {
 5 
 6     /**
 7      * Allocating by consumer id
 8      *
 9      * @param consumerGroup current consumer group
10      * @param currentCID current consumer id
11      * @param mqAll message queue set in current topic
12      * @param cidAll consumer set in current consumer group
13      * @return The allocate result of given strategy
14      */
15     List<MessageQueue> allocate(
16         final String consumerGroup,
17         final String currentCID,
18         final List<MessageQueue> mqAll,
19         final List<String> cidAll
20     );
21 
22     /**
23      * Algorithm name
24      *
25      * @return The strategy name
26      */
27     String getName();
28 }
View Code

相关文章:

  • 2023-01-10
  • 2021-06-11
  • 2022-02-19
  • 2022-02-09
  • 2021-07-20
  • 2021-07-09
  • 2021-12-21
  • 2021-11-18
猜你喜欢
  • 2022-12-23
  • 2021-06-12
  • 2021-06-08
  • 2022-12-23
  • 2021-11-05
  • 2021-09-07
  • 2021-04-19
相关资源
相似解决方案