> 文章列表 > 【RocketMQ】负载均衡源码分析

【RocketMQ】负载均衡源码分析

【RocketMQ】负载均衡源码分析

RocketMQ在集群模式下,同一个消费组内,一个消息队列同一时间只能分配给组内的某一个消费者,也就是一条消息只能被组内的一个消费者进行消费,为了合理的对消息队列进行分配,于是就有了负载均衡。

img

接下来以集群模式下的消息推模式DefaultMQPushConsumerImpl为例,看一下负载均衡的过程。

消费者负载均衡

首先,消费者在启动时会做如下操作:

  1. 从NameServer更新当前消费者订阅主题的路由信息;
  2. 向Broker发送心跳,注册消费者;
  3. 唤醒负载均衡服务,触发一次负载均衡;
public class DefaultMQPushConsumerImpl implements MQConsumerInner {public synchronized void start() throws MQClientException {// ...// 更新当前消费者订阅主题的路由信息this.updateTopicSubscribeInfoWhenSubscriptionChanged();this.mQClientFactory.checkClientInBroker();// 向Broker发送心跳this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();// 唤醒负载均衡服务this.mQClientFactory.rebalanceImmediately();}
}

更新主题路由信息

为了保证消费者拿到的主题路由信息是最新的(topic下有几个消息队列、消息队列的分布信息等),在进行负载均衡之前首先要更新主题的路由信息,在updateTopicSubscribeInfoWhenSubscriptionChanged方法中可以看到,首先获取了当前消费者订阅的所有主题信息(一个消费者可以订阅多个主题),然后进行遍历,向NameServer发送请求,更新每一个主题的路由信息,保证路由信息是最新的:

public class DefaultMQPushConsumerImpl implements MQConsumerInner {private void updateTopicSubscribeInfoWhenSubscriptionChanged() {// 获取当前消费者订阅的主题信息Map<String, SubscriptionData> subTable = this.getSubscriptionInner();if (subTable != null) {// 遍历订阅的主题信息for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {final String topic = entry.getKey();// 从NameServer更新主题的路由信息this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);}}}
}

注册消费者

发送心跳

由于Broker需要感知消费者数量的增减,所以每个消费者在启动的时候,会调用sendHeartbeatToAllBrokerWithLock向Broker发送心跳包,进行消费者注册:

public class MQClientInstance {public void sendHeartbeatToAllBrokerWithLock() {if (this.lockHeartbeat.tryLock()) {try {// 调用sendHeartbeatToAllBroker向Broker发送心跳this.sendHeartbeatToAllBroker();this.uploadFilterClassSource();} catch (final Exception e) {log.error("sendHeartbeatToAllBroker exception", e);} finally {this.lockHeartbeat.unlock();}} else {log.warn("lock heartBeat, but failed. [{}]", this.clientId);}}
}

sendHeartbeatToAllBroker方法中,可以看到从brokerAddrTable中获取了所有的Broker进行遍历(主从模式下也会向从节点发送请求注册),调用MQClientAPIImplsendHearbeat方法向每一个Broker发送心跳请求进行注册:

public class MQClientInstance {// Broker路由表private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =new ConcurrentHashMap<String, HashMap<Long, String>>();// 发送心跳private void sendHeartbeatToAllBroker() {final HeartbeatData heartbeatData = this.prepareHeartbeatData();// ...if (!this.brokerAddrTable.isEmpty()) {long times = this.sendHeartbeatTimesTotal.getAndIncrement();// 获取所有的Broker进行遍历, key为 Broker Name, value为同一个name下的所有Broker实例(主从模式下Broker的name一致)Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();while (it.hasNext()) {Entry<String, HashMap<Long, String>> entry = it.next();String brokerName = entry.getKey(); // broker name// 获取同一个Broker Name下的所有Broker实例HashMap<Long, String> oneTable = entry.getValue();if (oneTable != null) {// 遍历所有的实例for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {Long id = entry1.getKey();String addr = entry1.getValue();if (addr != null) { // 如果地址不为空// ...try {// 发送心跳int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout());// ...} catch (Exception e) {// ...}}}}}}}
}

MQClientAPIImplsendHearbeat方法中,可以看到构建了HEART_BEAT请求,然后向Broker发送:

public class MQClientAPIImpl {public int sendHearbeat(final String addr, final HeartbeatData heartbeatData, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {// 创建HEART_BEAT请求RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);request.setLanguage(clientConfig.getLanguage());request.setBody(heartbeatData.encode());// 发送请求RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);// ...}
}

心跳请求处理

Broker在启动时注册了HEART_BEAT请求的处理器,可以看到请求处理器是ClientManageProcessor

public class BrokerController {public void registerProcessor() {ClientManageProcessor clientProcessor = new ClientManageProcessor(this);// 注册HEART_BEAT请求的处理器ClientManageProcessorthis.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);}
}

进入到ClientManageProcessorprocessRequest方法,如果请求是HEART_BEAT类型会调用heartBeat方法进行处理,这里也能看还有UNREGISTER_CLIENT类型的请求,从名字上可以看出是与取消注册有关的(这个稍后再说):

public class ClientManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {@Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {switch (request.getCode()) {case RequestCode.HEART_BEAT: // 处理心跳请求return this.heartBeat(ctx, request);case RequestCode.UNREGISTER_CLIENT: // 取消注册请求return this.unregisterClient(ctx, request);case RequestCode.CHECK_CLIENT_CONFIG:return this.checkClientConfig(ctx, request);default:break;}return null;}
}

进入到heartBeat方法,可以看到,调用了ConsumerManagerregisterConsumer注册消费者:

public class ClientManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {// ...for (ConsumerData data : heartbeatData.getConsumerDataSet()) {// ...// 注册Consumerboolean changed = this.brokerController.getConsumerManager().registerConsumer(data.getGroupName(), clientChannelInfo, data.getConsumeType(), data.getMessageModel(), data.getConsumeFromWhere(),data.getSubscriptionDataSet(), isNotifyConsumerIdsChangedEnable);// ...}// ...return response;}
}

进行注册

ConsumerManagerregisterConsumer方法的理逻辑如下:

  1. 根据组名称获取该消费者组的信息ConsumerGroupInfo对象。如果获取为空,会创建一个ConsumerGroupInfo,记录了消费者组的相关信息;
  2. 判断消费者是否发生了变更,如果如果发生了变化,会触发CHANGE变更事件(这个稍后再看);
  3. 触发REGISTER注册事件;
public class ConsumerManager {public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {// 根据组名称获取消费者组信息ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);if (null == consumerGroupInfo) { // 如果为空新增ConsumerGroupInfo对象ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);consumerGroupInfo = prev != null ? prev : tmp;}boolean r1 =consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,consumeFromWhere);boolean r2 = consumerGroupInfo.updateSubscription(subList);// 如果有变更if (r1 || r2) {if (isNotifyConsumerIdsChangedEnable) {// 通知变更this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());}}// 注册Consumerthis.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);return r1 || r2;}
}

进入到DefaultConsumerIdsChangeListenerhandle方法中,可以看到如果是REGISTER事件,会通过ConsumerFilterManagerregister方法进行注册,注册的详细过程这里先不展开讲解:

public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener {@Overridepublic void handle(ConsumerGroupEvent event, String group, Object... args) {if (event == null) {return;}switch (event) {case CHANGE:// 如果是消费者变更事件// ...break;case UNREGISTER: // 如果是取消注册事件this.brokerController.getConsumerFilterManager().unRegister(group);break;case REGISTER: // 如果是注册事件if (args == null || args.length < 1) {return;}Collection<SubscriptionData> subscriptionDataList = (Collection<SubscriptionData>) args[0];// 进行注册this.brokerController.getConsumerFilterManager().register(group, subscriptionDataList);break;default:throw new RuntimeException("Unknown event " + event);}}
}

负载均衡

经过以上步骤之后,会调用MQClientInstance的rebalanceImmediately唤醒负载均衡服务进行一次负载均衡,为消费者分配消息队列,需要注意的是负载均衡是由消费者端执行

// MQClientInstance
public class MQClientInstance {private final RebalanceService rebalanceService;public void rebalanceImmediately() {// 唤醒负载均衡服务this.rebalanceService.wakeup();}
}// RebalanceService
public class RebalanceService extends ServiceThread {@Overridepublic void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {this.waitForRunning(waitInterval);// 负载均衡this.mqClientFactory.doRebalance();}log.info(this.getServiceName() + " service end");}
}

负载均衡的过程在【RocketMQ】消息的拉取一文中已经讲解,这里挑一些重点内容看一下。

在负载均衡的时候,首先会获取当前消费者订阅的主题信息,对订阅的主题进行遍历,对每一个主题进行负载均衡,重新分配:

public abstract class RebalanceImpl {public void doRebalance(final boolean isOrder) {// 获取订阅的主题信息Map<String, SubscriptionData> subTable = this.getSubscriptionInner();if (subTable != null) {// 遍历所有订阅的主题for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {final String topic = entry.getKey();try {// 根据主题进行负载均衡this.rebalanceByTopic(topic, isOrder);} catch (Throwable e) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("rebalanceByTopic Exception", e);}}}}this.truncateMessageQueueNotMyTopic();}
}

根据主题进行负载均衡

rebalanceByTopic方法中根据消费模式进行了判断然后对主题进行负载均衡,这里我们关注集群模式下的负载均衡:

  1. topicSubscribeInfoTable中根据主题获取对应的消息队列集合,这一步可以得到主题下的所有消息队列信息

  2. 根据主题信息和消费者组名称,获取所有订阅了该主题的消费者ID集合,这一步得到了订阅该主题的所有消费者

  3. 如果主题对应的消息队列集合和消费者ID都不为空,对消息队列集合和消费ID集合进行排序,排序是为了接下来进行分配;

  4. 获取设置的分配策略,根据分配策略,为消费者分配对应的消费队列,以平均分配策略为例,它会根据消息队列的数量和消费者的个数计算每个消费者分配的队列个数:

    img

  5. 根据最新分配的消息队列信息,调用updateProcessQueueTableInRebalance更新当前消费者消费的处理队列ProcessQueue信息

public abstract class RebalanceImpl {// 根据主题进行负载均衡private void rebalanceByTopic(final String topic, final boolean isOrder) {switch (messageModel) {case BROADCASTING: { // 广播模式Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);// ... break;}case CLUSTERING: { // 集群模式// 根据主题获取订阅的消息队列Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);// 获取所有订阅了该主题的消费者idList<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);// ...if (mqSet != null && cidAll != null) { // 如果都不为空List<MessageQueue> mqAll = new ArrayList<MessageQueue>();mqAll.addAll(mqSet);// 对消息队列排序Collections.sort(mqAll);// 对消费者排序Collections.sort(cidAll);// 获取分配策略AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;List<MessageQueue> allocateResult = null;try {// 根据分配策略,为消费者分配消费队列allocateResult = strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll);} catch (Throwable e) {// ...}// 分配给当前消费的消费队列Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();if (allocateResult != null) {// 将分配结果加入到结果集合中allocateResultSet.addAll(allocateResult);}// 根据分配信息更新处理队列boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);// ...}break;}default:break;}}
}

更新处理队列

负载均衡之后,消费者负责的消息队列有可能发生变化,一个消息队列MessageQueue对应一个处理队列ProcessQueueprocessQueueTable记录了消费者负责的队列信息,此时需要对其进行更新,处理逻辑如下:

  1. processQueueTable进行遍历,处理每一个消息队列,这一步主要是判断重新分配之后,processQueueTable中记录的某些消息队列是否已经不再由当前消费者负责,如果是需要将消息队列置为dropped,表示删除,之后消费者不再从此消费队列中拉取消息;

  2. 判断是否有新分配给当前消费者的消息队列,如果某个消息队列在最新分配给当前消费者的消息队列集合mqSet中,但是不在processQueueTable中,

    中,进行以下处理:

    • 计算消息拉取偏移量,也就是从哪个位置开始消费,如果消息拉取偏移量大于0,创建ProcessQueue,并放入处理队列表中processQueueTable
    • 构建PullRequest,设置消息的拉取信息,并加入到拉取消息请求集合pullRequestList

    经过这一步,如果分配给当前消费者的消费队列不在processQueueTable中,就会构建拉取请求PullRequest,然后调用dispatchPullRequest处理消息拉取请求,之后会从该消息队列拉取消息,详细过程可参考【RocketMQ】消息的拉取。

public abstract class RebalanceImpl {// 处理队列表,KEY为消息队列,VALUE为对应的处理信息protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);// 负载均衡,topic表示当前要进行负载均衡的主题,mqSet中记录了重新分配给当前消费者的消息队列private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,final boolean isOrder) {boolean changed = false;// 处理队列表Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();while (it.hasNext()) {Entry<MessageQueue, ProcessQueue> next = it.next();// 获取消息队列MessageQueue mq = next.getKey();// 获取处理队列ProcessQueue pq = next.getValue();// 主题是否一致if (mq.getTopic().equals(topic)) {// 如果队列集合中不包含当前的队列if (!mqSet.contains(mq)) {// 设置为droppedpq.setDropped(true);if (this.removeUnnecessaryMessageQueue(mq, pq)) {it.remove();changed = true;log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);}} else if (pq.isPullExpired()) { // 是否过期switch (this.consumeType()) {case CONSUME_ACTIVELY:break;case CONSUME_PASSIVELY:pq.setDropped(true); // 设置为删除// ...break;default:break;}}}}// 创建拉取请求集合List<PullRequest> pullRequestList = new ArrayList<PullRequest>();// 遍历本次分配的消息队列集合for (MessageQueue mq : mqSet) {// 如果之前不在processQueueTable中if (!this.processQueueTable.containsKey(mq)) {// ...// 创建ProcessQueueProcessQueue pq = new ProcessQueue();long nextOffset = -1L;try {// 计算消息拉取偏移量nextOffset = this.computePullFromWhereWithException(mq);} catch (Exception e) {log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);continue;}// 如果偏移量大于等于0if (nextOffset >= 0) {// 放入处理队列表中ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);// 如果之前已经存在,不需要进行处理if (pre != null) {log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);} else {// 如果之前不存在,构建PullRequest,之后会加入到阻塞队列中,进行消息拉取log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);PullRequest pullRequest = new PullRequest();pullRequest.setConsumerGroup(consumerGroup);// 设置消费组pullRequest.setNextOffset(nextOffset);// 设置拉取偏移量pullRequest.setMessageQueue(mq);// 设置消息队列pullRequest.setProcessQueue(pq);// 设置处理队列pullRequestList.add(pullRequest);// 加入到拉取消息请求集合changed = true;}} else {log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);}}}// 添加消息拉取请求this.dispatchPullRequest(pullRequestList);return changed;}
}

Rebalance的触发

消费者启动时触发

在文章开头已经讲过,消费者在启动时会进行一次负载均衡,这里便不再赘述。

消费者变更时触发

在消费者注册时讲到,如果发现消费者有变更会触发变更事件,当处于以下两种情况之一时会被判断为消费者发生了变化,需要进行负载均衡:

  • 当前注册的消费者对应的Channel对象之前不存在;

  • 当前注册的消费者订阅的主题信息发生了变化,也就是消费者订阅的主题有新增或者删除;

public class ConsumerManager {/***  注册消费者* @param group 消费者组名称* @param clientChannelInfo 注册的消费者对应的Channel信息* @param consumeType 消费类型* @param messageModel * @param consumeFromWhere 消费消息的位置* @param subList 消费者订阅的主题信息* @param isNotifyConsumerIdsChangedEnable 是否通知变更* @return*/public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {// 根据组名称获取消费者组信息ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);if (null == consumerGroupInfo) { // 如果为空新增ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);consumerGroupInfo = prev != null ? prev : tmp;}// 更新Channelboolean r1 =consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,consumeFromWhere);// 更新订阅信息boolean r2 = consumerGroupInfo.updateSubscription(subList);// 如果有变更if (r1 || r2) {if (isNotifyConsumerIdsChangedEnable) {// 通知变更,consumerGroupInfo中存储了该消费者组下的所有消费者的channelthis.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());}}// 注册Consumerthis.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);return r1 || r2;}
}

Channel变更

updateChannel方法中,首先将变更状态updated初始化为false,然后根据消费者的channel从channelInfoTable路由表中获取对应的ClientChannelInfo对象:

  • 如果ClientChannelInfo对象获取为空,表示之前不存在该消费者的channel信息,将其加入到路由表中,变更状态置为true,表示消费者有变化;

  • 如果获取不为空,判断clientid是否一致,如果不一致更新为最新的channel信息,但是变更状态updated不发生变化;

也就是说,如果注册的消费者之前不存在,那么将变更状态置为true,表示消费者数量发生了变化。

 
// key为消费者对应的channle,value为chanel信息private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =new ConcurrentHashMap<Channel, ClientChannelInfo>(16);public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType,MessageModel messageModel, ConsumeFromWhere consumeFromWhere) {boolean updated = false; // 变更状态初始化为falsethis.consumeType = consumeType;this.messageModel = messageModel;this.consumeFromWhere = consumeFromWhere;// 从channelInfoTable中获取对应的Channel信息, ClientChannelInfo infoOld = this.channelInfoTable.get(infoNew.getChannel());if (null == infoOld) { // 如果为空// 新增ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew);if (null == prev) { // 如果之前不存在log.info("new consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType,messageModel, infoNew.toString());// 变更状态置为trueupdated = true;}infoOld = infoNew;} else {// 如果之前存在,判断clientid是否一致,如果不一致更新为最新的channelif (!infoOld.getClientId().equals(infoNew.getClientId())) { log.error("[BUG] consumer channel exist in broker, but clientId not equal. GROUP: {} OLD: {} NEW: {} ", this.groupName, infoOld.toString(), infoNew.toString());this.channelInfoTable.put(infoNew.getChannel(), infoNew);}}this.lastUpdateTimestamp = System.currentTimeMillis();infoOld.setLastUpdateTimestamp(this.lastUpdateTimestamp);return updated;}

主题信息订阅变更

updateSubscription方法中,主要判断了消费的主题订阅信息是否发生了变化,subscriptionTable中记录了之前记录的订阅信息:

  1. 判断是否有新增的主题订阅信息,主要是通过subscriptionTable是否存在某个主题进行判断的:
    • 如果不存在,表示之前没有订阅过某个主题的信息,将其加入到subscriptionTable中,并将变更状态置为true,表示主题订阅信息有变化;
    • 如果subscriptionTable中存在某个主题的订阅信息,表示之前就已订阅,将其更新为最新的,但是变更状态不发生变化;
  2. 判断是否有删除的主题,主要是通过subscriptionTable和subList的对比进行判断的,如果有删除的主题,将变更状态置为true;

如果消费者订阅的主题发生了变化,比如有新增加的主题或者删除了某个主题的订阅,会被判断为主题订阅信息发生了变化。

public class ConsumerGroupInfo {// 记录了订阅的主题信息,key为topic,value为订阅信息private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =new ConcurrentHashMap<String, SubscriptionData>();   public boolean updateSubscription(final Set<SubscriptionData> subList) {boolean updated = false;// 遍历订阅的主题信息for (SubscriptionData sub : subList) {//根据主题获取订阅信息SubscriptionData old = this.subscriptionTable.get(sub.getTopic());// 如果获取为空if (old == null) {// 加入到subscriptionTableSubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);if (null == prev) {updated = true; // 变更状态置为truelog.info("subscription changed, add new topic, group: {} {}", this.groupName, sub.toString());}} else if (sub.getSubVersion() > old.getSubVersion()) { // 如果版本发生了变化if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {log.info("subscription changed, group: {} OLD: {} NEW: {}", this.groupName, old.toString(), sub.toString());}// 更新为最新的订阅信息this.subscriptionTable.put(sub.getTopic(), sub);}}Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();// 进行遍历,这一步主要是判断有没有取消订阅的主题while (it.hasNext()) {Entry<String, SubscriptionData> next = it.next();String oldTopic = next.getKey();boolean exist = false;// 遍历最新的订阅信息for (SubscriptionData sub : subList) {// 如果在旧的订阅信息中存在就终止,继续判断下一个主题if (sub.getTopic().equals(oldTopic)) {exist = true;break;}}// 走到这里,表示有取消订阅的主题if (!exist) {log.warn("subscription changed, group: {} remove topic {} {}",this.groupName, oldTopic, next.getValue().toString());// 进行删除it.remove();// 变更状态置为trueupdated = true;}}this.lastUpdateTimestamp = System.currentTimeMillis();return updated;}
}

变更请求发送

上面讲解了两种被判定为消费者发生变化的情况,被判定为变化之后,会触调用DefaultConsumerIdsChangeListener中的handle方法触发变更事件,在方法中传入了消费者组下的所有消费者的channel对象,会发送变更请求通知该消费者组下的所有消费者,进行负载均衡。

DefaultConsumerIdsChangeListener中处理变更事件时,会对消费组下的所有消费者遍历,调用notifyConsumerIdsChanged方法向每一个消费者发送变更请求:

public class ConsumerManager {public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {// ...// 更新Channelboolean r1 =consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,consumeFromWhere);// 更新订阅信息boolean r2 = consumerGroupInfo.updateSubscription(subList);// 如果有变更if (r1 || r2) {if (isNotifyConsumerIdsChangedEnable) {// 触发变更事件,consumerGroupInfo中存储了该消费者组下的所有消费者的channelthis.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());}}// ...}
}// DefaultConsumerIdsChangeListener
public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener {@Overridepublic void handle(ConsumerGroupEvent event, String group, Object... args) {if (event == null) {return;}switch (event) {case CHANGE:// 如果是消费者变更事件case CHANGE:if (args == null || args.length < 1) {return;}// 获取所有的消费者对应的channelList<Channel> channels = (List<Channel>) args[0];if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {for (Channel chl : channels) {// 向每一个消费者发送变更请求this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group);}}break;// ...}}
}

请求的发送是在Broker2ClientnotifyConsumerIdsChanged方法中实现的,可以看到会创建NOTIFY_CONSUMER_IDS_CHANGED请求并发送:

    
public class Broker2Client {public void notifyConsumerIdsChanged(final Channel channel,final String consumerGroup) {if (null == consumerGroup) {log.error("notifyConsumerIdsChanged consumerGroup is null");return;}NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();requestHeader.setConsumerGroup(consumerGroup);// 创建变更通知请求RemotingCommand request =RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);try {// 发送请求this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);} catch (Exception e) {log.error("notifyConsumerIdsChanged exception. group={}, error={}", consumerGroup, e.toString());}}
}

变更通知请求处理

消费者对Broker发送的NOTIFY_CONSUMER_IDS_CHANGED的请求处理在ClientRemotingProcessorprocessRequest方法中,它会调用notifyConsumerIdsChanged方法进行处理,在notifyConsumerIdsChanged方法中可以看到触发了一次负载均衡:

public class ClientRemotingProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {@Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {switch (request.getCode()) {case RequestCode.CHECK_TRANSACTION_STATE:return this.checkTransactionState(ctx, request);case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED: // NOTIFY_CONSUMER_IDS_CHANGED请求处理// 处理变更请求return this.notifyConsumerIdsChanged(ctx, request); // ...default:break;}return null;}public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {try {// ...// 触发负载均衡this.mqClientFactory.rebalanceImmediately();} catch (Exception e) {log.error("notifyConsumerIdsChanged exception", RemotingHelper.exceptionSimpleDesc(e));}return null;}
}

消费者停止时触发

消费者在停止时,需要将当前消费者负责的消息队列分配给其他消费者进行消费,所以在shutdown方法中会调用MQClientInstanceunregisterConsumer方法取消注册:

public class DefaultMQPushConsumerImpl implements MQConsumerInner {public synchronized void shutdown(long awaitTerminateMillis) {switch (this.serviceState) {case CREATE_JUST:break;case RUNNING:this.consumeMessageService.shutdown(awaitTerminateMillis);this.persistConsumerOffset();// 取消注册this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());this.mQClientFactory.shutdown();// ...break;case SHUTDOWN_ALREADY:break;default:break;}}
}

unregisterConsumer方法中,又调用了unregisterClient方法取消注册,与注册消费者的逻辑相似,它会向所有的Broker发送取消注册的请求:

public class MQClientInstance {public synchronized void unregisterConsumer(final String group) {this.consumerTable.remove(group);// 取消注册this.unregisterClient(null, group);}private void unregisterClient(final String producerGroup, final String consumerGroup) {// 获取所有的BrokerIterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();// 进行遍历while (it.hasNext()) {// ...if (oneTable != null) {for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {String addr = entry1.getValue();if (addr != null) {try {// 发送取消注册请求this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, clientConfig.getMqClientApiTimeout());// ...} // ...}}}}}
}

取消注册请求的发送是在MQClientAPIImplunregisterClient方法实现的,可以看到构建了UNREGISTER_CLIENT请求并发送:

public class MQClientAPIImpl { public void unregisterClient(final String addr, final String clientID, final String producerGroup,  final String consumerGroup, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {// ...requestHeader.setConsumerGroup(consumerGroup);// 构建UNREGISTER_CLIENT请求RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, requestHeader);// 发送请求RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);// ...}
}

与注册消费者的请求处理一样,Broker对UNREGISTER_CLIENT的请求同样是在ClientManageProcessorprocessRequest中处理的,对于UNREGISTER_CLIENT请求是调用unregisterClient方法处理的,里面又调用了ConsumerManagerunregisterConsumer方法进行取消注册:

public class ClientManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {@Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {switch (request.getCode()) {case RequestCode.HEART_BEAT: // 处理心跳请求return this.heartBeat(ctx, request);case RequestCode.UNREGISTER_CLIENT: // 取消注册请求return this.unregisterClient(ctx, request);case RequestCode.CHECK_CLIENT_CONFIG:return this.checkClientConfig(ctx, request);default:break;}return null;}public RemotingCommand unregisterClient(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {// ...{final String group = requestHeader.getConsumerGroup();if (group != null) {// ...// 取消消费者的注册this.brokerController.getConsumerManager().unregisterConsumer(group, clientChannelInfo, isNotifyConsumerIdsChangedEnable);}}response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}
}

ConsumerManagerunregisterConsumer方法中,可以看到触发了取消注册事件,之后如果开启了允许通知变更,会触发变更事件,变更事件在上面已经讲解过,它会通知消费者组下的所有消费者进行一次负载均衡:


public class ConsumerManager {    public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo,boolean isNotifyConsumerIdsChangedEnable) {ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);if (null != consumerGroupInfo) {consumerGroupInfo.unregisterChannel(clientChannelInfo);if (consumerGroupInfo.getChannelInfoTable().isEmpty()) {ConsumerGroupInfo remove = this.consumerTable.remove(group);if (remove != null) {// 触发取消注册事件this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, group);}}// 触发消费者变更事件if (isNotifyConsumerIdsChangedEnable) {this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());}}}
}

消费者定时触发

RebalanceService的run方法中,可以看到设置了等待时间,默认是20s,所以消费者本身也会定时执行负载均衡:

public class RebalanceService extends ServiceThread {private static long waitInterval = Long.parseLong(System.getProperty("rocketmq.client.rebalance.waitInterval", "20000"));@Overridepublic void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {this.waitForRunning(waitInterval); // 等待// 负载均衡this.mqClientFactory.doRebalance();}log.info(this.getServiceName() + " service end");}
}

总结
【RocketMQ】负载均衡源码分析

参考

田守枝-深入理解RocketMQ Rebalance机制

RocketMQ版本:4.9.3