> 文章列表 > RocketMQ5.0.0顺序消息

RocketMQ5.0.0顺序消息

RocketMQ5.0.0顺序消息

目录

一、顺序消息概览

二、顺序消息实现机制

1. 消息队列负载

        1):消费端发送加锁请求

        2):Broker处理加锁

2. 消息拉取

3. 消息消费

        1):启动ConsumeMessageOrderlyService服务

        2):拉取消息提交到线程池

        3):消费线程池任务

三、参考资料


一、顺序消息概览

        RocketMQ支持局部消息顺序消费,可以确保同一个消费队列中的消息被顺序消费,如果做到全局顺序消费则可以将主题配置成一个消费队列。并发(默认)消息消费参考​ 《RocketMQ5.0.0消息消费<一> _ PUSH模式的消息拉取》 ​、​ 《RocketMQ5.0.0消息消费<二> _ 消息队列负载均衡机制》 ​。本章主要介绍消费队列的顺序消费。如下图所示,是消费者UML图。其中org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService是顺序消费的实现类。

        顺序消息实际上是消费者的负载均衡后的消费队列, 在Broker端给消费队列加锁。同时,消费端给MessageQueue、ProcessQueue加锁来消费消息

二、顺序消息实现机制

1. 消息队列负载

        1):消费端发送加锁请求

        消费队列负载均衡参考​ 《RocketMQ5.0.0消息消费<二> _ 消息队列负载均衡机制》 ​,其中RebalanceImpl#rebalanceByTopic(final String topic, final boolean isOrder)是根据消费者订阅主题下消费队列重新负载均衡的核心方法。方法形参isOrder来定义是否是顺序消息,默认false,则是并发消息,其中调用updateProcessQueueTableInRebalance()就使用该参数,部分代码如下所示。

        遍历负载均衡后的每一个消费队列,若新增消费队列时,需要判定是不是顺序消息,若是则向Broker端发送锁定该消费队列(避免其他消费者消费),锁定失败后需要延迟重新负载均衡

/* 消费者对应的分配消息队列是否变化* step1:消费队列缓存表中不在本次均衡分配的消费队列时,则暂停消费并移除,且持久化待移除消费队列的消费进度;* step2:本次均衡分配的消费队列不在消费队列缓存表中,则新增:*         1):删除内存中该消费队列的消费进度;*         2):创建broker的消费队列;*         3):从磁盘中获取该消费队列的消费进度(若进度<0时,则根据配置矫正消费进度),创建拉取消息请求*              {@link RebalanceImpl#computePullFromWhere}* step3: 新增消费队列,则创建{@link PullRequest}加入到{@link PullMessageService},唤醒该线程拉取消息*              {@link RebalanceImpl#dispatchPullRequest}* step4:顺序消息时,则尝试向Broker请求锁定该消费队列,锁定失败延迟重新负载* @param topic 主题* @param mqSet 本次均衡分配的消费队列* @param isOrder 是否顺序* @return true变化;false未改变*/
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,final boolean isOrder) {......// 若是顺序消息,则尝试向Broker请求锁定该消费队列,锁定失败延迟重新负载if (isOrder && !this.lock(mq)) {log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);allMQLocked = false;continue;}......// 锁定消费队列失败,延迟重新负载if (!allMQLocked) {mQClientFactory.rebalanceLater(500);}......
}

        2):Broker处理加锁

        org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager是顺序消息Broker端给消费者分配到的消费队列加锁的核心类,请求码是RequestCode.LOCK_BATCH_MQ。该类的关键属性如下。注意事项:

  • 锁容器mqLockTable:当前消费队列被消费组中的哪个消费者持有,即持有锁对象LockEntry
  • tryLockBatch()与unlockBatch()方法:给消费队列加锁或释放锁的方法。
// 锁存活时间,默认60s,可配置
private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty("rocketmq.broker.rebalance.lockMaxLiveTime", "60000"));
private final Lock lock = new ReentrantLock();
// 锁容器:当前消费队列被消费组中的哪个消费者持有
private final ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable =new ConcurrentHashMap<String, ConcurrentHashMap<MessageQueue, LockEntry>>(1024);

2. 消息拉取

        消费队列负载均衡后,便从重新负载后的消费队列拉取消息,参考​ 《RocketMQ5.0.0消息消费<一> _ PUSH模式的消息拉取》 ​,其中拉取消息DefaultMQPushConsumerImpl#pullMessage方法中,关于顺序消息的代码如下所示。

        消费端的ProcessQueue消息处理队列是否被锁定,若是没有锁定则延迟3s再次将拉取消息请求PullRequest放入到拉取任务中再次拉取;若是被锁定,则向Broker拉取消息成功后,提交到消费线程池中供消费者消费。

/* 拉取消息* step1:消息处理队列是否被丢弃{@link ProcessQueue};* step2:检查当前消费者状态:消费者是否被挂起;* step3:拉取消息流控:消息总条数、消息总大小、消息最大/最小间隔等流控,并每1000次打印流控信息;* step4:构建消息拉取的sysFlag;* step5:从Broker服务器拉取消息{@link PullAPIWrapper#pullKernelImpl};* step6:定义拉取成功后处理,即:异步拉取回调函数{@link PullCallback};*        异步回调函数{@link PullCallback}把拉取的消息提交消费消息{@link ConsumeMessageService#submitConsumeRequest)}* @param pullRequest 消息拉取请求{@link PullRequest}*/
public void pullMessage(final PullRequest pullRequest) {......// 并发消息if (!this.consumeOrderly) {......}// 顺序消息else {// 处理队列被锁住if (processQueue.isLocked()) {// 拉取请求是否锁定,默认不锁定falseif (!pullRequest.isPreviouslyLocked()) {long offset = -1L;try {// 获取消费队列的消费进度,若进度<0时,则根据配置矫正消费进度(DefaultMQPushConsumer.consumeFromWhere配置)offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());if (offset < 0) {throw new MQClientException(ResponseCode.SYSTEM_ERROR, "Unexpected offset " + offset);}} catch (Exception e) {this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e);return;}boolean brokerBusy = offset < pullRequest.getNextOffset();log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",pullRequest, offset, brokerBusy);if (brokerBusy) {log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",pullRequest, offset);}pullRequest.setPreviouslyLocked(true);pullRequest.setNextOffset(offset);}}// 处理队列未被锁住,则延迟else {this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);log.info("pull message later because not locked in broker, {}", pullRequest);return;}}......
}

3. 消息消费

        消息消费参考​ 《RocketMQ5.0.0消息消费<三> _ 消息消费》 ​,其中PullCallback处理拉取结果处理,拉取成功后,拉取的结果PullResult提交到线程池供消费者消费。根据是否是顺序消息,选择消费实现类,顺序消息的消费实现类是ConsumeMessageOrderlyService。

        1):启动ConsumeMessageOrderlyService服务

        ConsumeMessageOrderlyService#start是启动方法,其调用链如下所示。启动方法逻辑是,若是集群模式,则20s定时周期执行锁定分配的消费队列

        org.apache.rocketmq.client.impl.consumer.RebalanceImpl#lockAll是锁定分配的消费队列的核心逻辑方法,其代码如下。 注意事项:

  • brokerMqs:当前消费者均衡的消息队列缓存表processQueueTable 转换成 按Broker组织的消费队列集合。
  • findBrokerAddressInSubscribe():根据Broker名称获取主节点Broker
  • lockBatchMQ():向主Broker发送锁定消费队列请求,并返回锁定成功的消息队列:

                锁定成功的消费队列:锁定对应的ProcessQueue处理消费队列设置为锁定状态 + 更新加锁时间;

                锁定失败的消费队列:对应的ProcessQueue解锁,则暂停拉取消息与消息消费

/* 锁定消费者分配到的消费队列* step1:当前消费者均衡的消息队列缓存表processQueueTable 转换成 按Broker组织的消费队列集合* step2:根据Broker名称获取主节点Broker* step3:向主Broker发送锁定消费队列请求,并返回锁定成功的消息队列* step4:锁定成功的消费队列对应的ProcessQueue待处理消费队列设置为锁定状态 + 更新加锁时间* step5:锁定失败的消费队列对应的ProcessQueue待处理消费队列设置为解锁,暂停拉取消息与消息消费*/
public void lockAll() {// 当前消费者均衡的消息队列缓存表processQueueTable 转换成 按Broker组织的消费队列集合HashMap<String/* brokerName */, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();Iterator<Entry<String, Set<MessageQueue>>> it = brokerMqs.entrySet().iterator();while (it.hasNext()) {Entry<String, Set<MessageQueue>> entry = it.next();final String brokerName = entry.getKey();final Set<MessageQueue> mqs = entry.getValue();if (mqs.isEmpty()) {continue;}// 根据Broker名称获取主节点BrokerFindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);if (findBrokerResult != null) {LockBatchRequestBody requestBody = new LockBatchRequestBody();requestBody.setConsumerGroup(this.consumerGroup);requestBody.setClientId(this.mQClientFactory.getClientId()); // 消费者IDrequestBody.setMqSet(mqs); // 消费者分配到的消息队列try {// 向Broker发送锁定消费队列请求,并返回锁定成功的消息队列Set<MessageQueue> lockOKMQSet =this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);// 锁定成功的消费队列对应的ProcessQueue待处理消费队列设置为锁定状态 + 更新加锁时间for (MessageQueue mq : lockOKMQSet) {// 锁定成功的消费队列对应的ProcessQueueProcessQueue processQueue = this.processQueueTable.get(mq);if (processQueue != null) {if (!processQueue.isLocked()) {log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq);}// ProcessQueue设置为锁定状态 + 更新加锁时间processQueue.setLocked(true);processQueue.setLastLockTimestamp(System.currentTimeMillis());}}// 没有锁定成功的,则相应ProcessQueue解锁,则暂停拉取消息与消息消费for (MessageQueue mq : mqs) {if (!lockOKMQSet.contains(mq)) {ProcessQueue processQueue = this.processQueueTable.get(mq);if (processQueue != null) {processQueue.setLocked(false);log.warn("the message queue locked Failed, Group: {} {}", this.consumerGroup, mq);}}}} catch (Exception e) {log.error("lockBatchMQ exception, " + mqs, e);}}}
}

        2):拉取消息提交到线程池

        org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#submitConsumeRequest是顺序消息拉取后提交到线程池的方法,代码如下。注意事项:

  • dispathToConsume:只有顺序消息使用该参数,并发消息忽略该参数
// 提交消费请求,到消费线程池,供消费者消费
@Override
public void submitConsumeRequest(final List<MessageExt> msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispathToConsume) {// 是否转发到消费线程池中(注意,并发消息忽略该参数)if (dispathToConsume) {// 构建消费任务ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);// 提交到消费线程池this.consumeExecutor.submit(consumeRequest);}
}

        3):消费线程池任务

        org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService.ConsumeRequest是顺序消息的线程池任务,它是个线程。其run()方法,如下代码所示。注意事项:

  • fetchLockObject():获取MessageQueue消费队列的锁对象,并加synchronized锁,这样一个消费队列同一时间被一个消费线程消费消息
  • 根据集群模式消费:广播模式 或 ProcessQueue加锁且锁未过期 时,则直接消费;集群下未加锁 或 ProcessQueue加锁且锁过期 时,延迟100ms重新消费
  • takeMessages():顺序消息不是消费当前拉取消息,而是从ProcessQueue.msgTreeMap消息临时存储在ProcessQueue.consumingMsgOrderlyTreeMap,而后再去消费
/* 运行提交消费任务,即:消费线程消费任务* step1:待处理队列是否丢弃,若丢弃,则停止消费* step2:获取指定消费队列的锁对象,目的:一个消费队列同一时间被一个线程消费* step3:给锁对象加锁,进行消费* step4:广播模式 或 加锁且锁未过期 时,则直接消费;*       集群下未加锁 或 加锁且锁过期 时,延迟100ms重新消费* step5:顺序取出消息(顺序消息时,临时存储在ProcessQueue.consumingMsgOrderlyTreeMap中)*        {@link ProcessQueue#takeMessages(int)}* step6:获取消费锁,消费监听器调用消费逻辑*        {@link MessageListenerOrderly#consumeMessage}* step7:根据消费结果,判定是否重试消费:{@link ConsumeMessageOrderlyService#processConsumeResult}*        检查消费次数,判断是否进入DLQ队列(进入DLQ成功,则认为消费成功)*                    {@link ConsumeMessageOrderlyService#checkReconsumeTimes}*/
@Override
public void run() {// 待处理队列是否丢弃,若丢弃,则停止消费if (this.processQueue.isDropped()) {log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);return;}// 获取指定消费队列的锁对象,目的:一个消费队列同一时间被一个线程消费final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);// 给消费队列的锁对象加锁,然后才消费消息synchronized (objLock) {// 广播模式 或 加锁且锁未过期 时,则直接消费if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())|| this.processQueue.isLocked() && !this.processQueue.isLockExpired()) {final long beginTime = System.currentTimeMillis();// continueConsume是否继续消费,不是根据消息条数,而是连续消费最大时间ConsumeMessageOrderlyService.MAX_TIME_CONSUME_CONTINUOUSLYfor (boolean continueConsume = true; continueConsume; ) {if (this.processQueue.isDropped()) {log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);break;}if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())&& !this.processQueue.isLocked()) {log.warn("the message queue not locked, so consume later, {}", this.messageQueue);ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);break;}if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())&& this.processQueue.isLockExpired()) {log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);break;}// 超出连续消费最大时间ConsumeMessageOrderlyService.MAX_TIME_CONSUME_CONTINUOUSLYlong interval = System.currentTimeMillis() - beginTime;if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);break;}// 从ProcessQueue取出消息条数final int consumeBatchSize =ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();// 顺序取出消息(顺序消息时,临时存储在ProcessQueue.consumingMsgOrderlyTreeMap中)List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);// 恢复重试消息主题名(消息重试机制决定)defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());if (!msgs.isEmpty()) {final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);ConsumeOrderlyStatus status = null;// 执行消费前钩子函数ConsumeMessageContext consumeMessageContext = null;if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext = new ConsumeMessageContext();consumeMessageContext.setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());consumeMessageContext.setMq(messageQueue);consumeMessageContext.setMsgList(msgs);consumeMessageContext.setSuccess(false);// init the consume context typeconsumeMessageContext.setProps(new HashMap<String, String>());ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);}long beginTimestamp = System.currentTimeMillis();ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;boolean hasException = false; // 消费是否有异常try {// 获取消费锁this.processQueue.getConsumeLock().lock();if (this.processQueue.isDropped()) {log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",this.messageQueue);break;}// 消费监听器调用业务方,具体的消费逻辑status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",RemotingHelper.exceptionSimpleDesc(e),ConsumeMessageOrderlyService.this.consumerGroup,msgs,messageQueue), e);hasException = true;} finally {this.processQueue.getConsumeLock().unlock();}if (null == status|| ConsumeOrderlyStatus.ROLLBACK == status|| ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}",ConsumeMessageOrderlyService.this.consumerGroup,msgs,messageQueue);}long consumeRT = System.currentTimeMillis() - beginTimestamp;if (null == status) {if (hasException) {returnType = ConsumeReturnType.EXCEPTION;} else {returnType = ConsumeReturnType.RETURNNULL;}} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {returnType = ConsumeReturnType.TIME_OUT;} else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {returnType = ConsumeReturnType.FAILED;} else if (ConsumeOrderlyStatus.SUCCESS == status) {returnType = ConsumeReturnType.SUCCESS;}if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());}if (null == status) {status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}// 执行消费后(正常或异常)的钩子函数if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {consumeMessageContext.setStatus(status.toString());consumeMessageContext.setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);}ConsumeMessageOrderlyService.this.getConsumerStatsManager().incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);// 处理消费结果,是否继续消费(注意:进入DLQ认为消费成功,继续消费)continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);} else {continueConsume = false;}}}// 集群下未加锁 或 加锁且锁过期 时,延迟100ms重新消费else {if (this.processQueue.isDropped()) {log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);return;}// 延迟100ms重新消费ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);}}
}

三、参考资料

顺序消息 | RocketMQ

https://www.cnblogs.com/shanml/p/16909874.html

RocketMQ原理:RocketMQ顺序消息 - 墨天轮

RocketMQ5.0.0消息消费<一> _ PUSH模式的消息拉取_爱我所爱0505的博客-CSDN博客

RocketMQ5.0.0消息消费<二> _ 消息队列负载均衡机制_爱我所爱0505的博客-CSDN博客

RocketMQ5.0.0消息消费<三> _ 消息消费_爱我所爱0505的博客-CSDN博客

石膏模具网