> 文章列表 > RocketMQ 消费者Rebalance 解析——图解、源码级解析

RocketMQ 消费者Rebalance 解析——图解、源码级解析

RocketMQ 消费者Rebalance 解析——图解、源码级解析

🍊 Java学习:Java从入门到精通总结

🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

🍊 绝对不一样的职场干货:大厂最佳实践经验指南

📆 最近更新:2023年4月15日

🍊 新专栏筹备中,还是熟悉的源码,还是熟悉的感觉!

🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!


文章目录

  • 什么是消息负载均衡?
  • Rebalance的触发条件
  • 负载策略使用方法
  • 消息消费默认策略

什么是消息负载均衡?

Rebalance机制: 将一个Topic下的多个队列在同一个消费者组下的多个消费者实例之间进行重新分配。该机制的目的是为了提高消息的并行处理能力

例如: 一个Topic下有5个队列,如果只有一个消费者的话,这个消费者就会处理所有队列的消息。如果有两个消费者的话,就可以两个消费者共同处理这5个队列

在这里插入图片描述


但Rebalance机制也存在明显的限制与危害:

  • 如果消费者组下的消费者实例数量大于队列数量时,多余的消费者将分配不到任何实例
  • 消费暂停: 只有Concumer 1时,其负责所有队列的消息处理;如果此时新增了Consumer 2,触发Rebalance时,需要给它分配几个队列,此时Concumer 1就需要停止这几个队列的消费
  • 重复消费: Concumer 2在消费分配到自己的队列时,必须从Concumer 1已经消费到的地方(offset)开始继续消费。默认情况下,offset是异步提交的,如果Concumer 1之前消费到了第10条数据,但此时Broker记录的offset还是第8条数据,如果Concumer 2从第8条数据开始消费的话,就会有两条消息重复。异步提交offset的间隔时间越久,可能造成的重复消费就越多
  • 消费洪峰: 如果因为Rebalance机制造成了需要重复消费的消息过多 或者 由于Rebalance导致的暂停时间过长,导致有消息积压,就有可能在Rebalance之后需要瞬间消费很多消息

Broker起到的作用: Broker端主要负责Rebalance相关的元数据的维护,通知机制,扮演协调者的角色

Rebalance的触发条件

从根本上来看,触发Rebalance的原因只有两个:

  1. Topic下的队列数量变化
  2. Consumer组信息变化

其中会导致这两项变化的典型场景为:

队列数量变化 典型场景:
1. Broker宕机
2. Broker停机维护
3. 队列扩容 / 缩容
Consumer组信息变化 典型场景:
1. 消费者机器启动 / 停止
2. 消费者宕机
3. 网络异常导致消费者与Broker断联
4. 消费者扩容 / 缩容
5. Topic订阅信息变化

无论发生上面的哪种情况,Broker都会主动通知消费者组下的所有实例进行Rebalance,相关源码如ConsumerManagerregisterConsumer方法所示:

public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {// 查找consumer组信息,如果没有则创建一个新的// consumerTable:维护所有的ConsumerConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);if (null == consumerGroupInfo) {ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);// put到map里ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);consumerGroupInfo = prev != null ? prev : tmp;}// 更新Consumer信息,客户端信息,返回消费者组下实例信息是否变化boolean r1 =consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,consumeFromWhere);// 更新订阅Topic信息,返回消费者订阅信息是否变化boolean r2 = consumerGroupInfo.updateSubscription(subList);// 如果消费者实例 或 消费者订阅信息变化,则rebalanceif (r1 || r2) {if (isNotifyConsumerIdsChangedEnable) {this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());}}this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);return r1 || r2;
}

负载策略使用方法

消费者使用不同的消费附在策略只需要简单地使用set方法即可,即直接使用consumer.setAllocateMessageQueueStrategy(策略对象)即可,源码如下所示

public static void main(String[] args) throws Exception{DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Group Name");consumer.subscribe("Target Topic", "*");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 设置平均负载策略consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("Consumer Started.");}

消息消费默认策略

如果没有手动指定负载均衡均衡策略算法,RocketMQ将会使用内部的默认策略,DefaultMQPushConsumer通过构造函数内设置AllocateMessageQueueAveragely来指定默认策略。

在这里插入图片描述

在RocketMQ的实现中,Producer会把消息发送给对应的Topic,同一个Topic下的所有消息会被负载均衡至多个Queue里,之后消息队列的Broker会将同一个Topic下的所有Queue再分配至订阅了该Topic的Consumer组里,再由组内所有消费者进行消费。

消费者机器与Queue的个数可能有以下几种情况:

  1. 消费者机器数大于Queue个数时:部分机器会处理不到消息
    在这里插入图片描述

  2. 消费者机器数等于Queue个数时:每个消费者负责消费一个Queue里的消息
    在这里插入图片描述

  3. 消费者机器数小于Queue个数时:每个消费者可能消费多个Queue里的消息
    在这里插入图片描述