> 文章列表 > Chapter11-最常用的消费类

Chapter11-最常用的消费类

Chapter11-最常用的消费类

11.1 整体流程

        我们使用 DefaultMQPushConsumer 的时候,一般流程是设置好 GroupName 、NameServer 地址 ,以及订阅的 Topic 名称, 然后填充Message 处理函数,最后调用 start () 。

        11.1.1 上层接口类

        DefaultMQPushConsumer 类在 org. apache.rocketmq. client. consumer 包中 ,这个类担任着上层接 口的角色,具体实现都在 DefaultMQPushConsumerlmpl 类中

    public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {this.defaultMQPushConsumer = defaultMQPushConsumer;this.rpcHook = rpcHook;this.pullTimeDelayMillsWhenException = defaultMQPushConsumer.getPullTimeDelayMillsWhenException();}/* Constructor specifying consumer group. @param consumerGroup Consumer group.*/public DefaultMQPushConsumer(final String consumerGroup) {this(null, consumerGroup, null, new AllocateMessageQueueAveragely());}

        我们常用的是最后这个构造函数,只传入一个 consumer Group 名称作为参数,这个构造函数会把 RPCHook 设为空,把负载均衡策略设置成平均策略。在构造函数的实现中 ,主要工作是创建 DefaultMQPushConsumerlmpl 对象

        11.1.2 DefaultMQPushConsumer 的实现者 

         DefaultMQPushConsumerlmpl 具体实 现了 DefaultMQPushConsumer 的业务逻辑, DefaultMQPushConsumerlmpl.java 在 org. apache.rocketmq.client.impl. consumer 这个包里,本节接下来从 start 方法着手分析。首先是初始化 MQClientinstance ,并且设置好 rebalance 策略和 pullAPIWraper ,有这些结构后才能发送 pull 请求获取消息

//参见org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
//  switch 的CREATE_JUST 语句块// 初始化MQClientInstance
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);// 初始化 PullAPIWrapper
if (this.pullAPIWrapper == null) {this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
}
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
// 确定 OffsetStore 。 OffsetStore 里存储的是当前消费者所消费的消息
在队列中的偏移量
if (this.defaultMQPushConsumer.getOffsetStore() != null) {this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();} else {switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;case CLUSTERING:this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;default:break;}this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);}this.offsetStore.load();

        根据消费消息方式的不同 , OffsetStore 的类型也不同。 如果是 BROADCASTING模式,使用的是 LocalFileOffsetStore, Offset 存到本地;如果是 CLUSTERING模式,使用的是 RemoteBrokerOffsetStore, Offset 存到 Broker 机器上。

//然后是初始化 consumeMessageService ,根据对消息顺序需求的不同,使用不同的 Service 类型
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {this.consumeOrderly = true;this.consumeMessageService =new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());//POPTODO reuse Executor ?this.consumeMessagePopService = new ConsumeMessagePopOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {this.consumeOrderly = false;this.consumeMessageService =new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());//POPTODO reuse Executor ?this.consumeMessagePopService =new ConsumeMessagePopConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());}this.consumeMessageService.start();// POPTODO
this.consumeMessagePopService.start();

        最后调用 MQClientlnstance 的 start 方法 ,开始获取数据。

mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;

        11.1.3 获取消息逻辑

        获取消息的逻辑实现在 public void pullMessage ( finalPullRequestpullRequest) 函数中,这是一个很大的函数,前半部分是进行一些判断, 是进行流量控制的逻辑(见代码清单 11-5 );中间是对返回消息结果做处理的逻辑 ;后面是发送获取消息请求的逻辑。

        long cachedMessageCount = processQueue.getMsgCount().get();long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);// 检查个数if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return;}// 检查总大小if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return;}

通过判断未处理消息的个数和总大小来控制是否继续请求消息。 对于顺序消息还有一些特殊判断逻辑。

if (!this.consumeOrderly){}else{// 顺序消息
}

获取的消息返回后,根据返回状态,调用相应的回调处理方法

PullCallback pullCallback = new PullCallback() {... ...};
// 具体判断见代码块
switch (pullResult.getPullStatus()) {case FOUND:... ... break;case NO_NEW_MSG:case NO_MATCHED_MSG:

最后是发送获取消息请求,

    try {this.pullAPIWrapper.pullKernelImpl(pullRequest.getMessageQueue(),subExpression,subscriptionData.getExpressionType(),subscriptionData.getSubVersion(),pullRequest.getNextOffset(),this.defaultMQPushConsumer.getPullBatchSize(),this.defaultMQPushConsumer.getPullBatchSizeInBytes(),sysFlag,commitOffsetValue,BROKER_SUSPEND_MAX_TIME_MILLIS,CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,CommunicationMode.ASYNC,pullCallback);} catch (Exception e) {log.error("pullKernelImpl exception", e);this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);}

这三个阶段不停地循环执行 ,直到程序被停止

11.2 消息的并发处理

        11.2.1 并发处理过程

         处理效率的高低是反应 Consumer 实现好坏的重要指标,本节以 Consume­MessageConcurrentlyService 类 为例来分析 RocketMQ 的实现方式。 Consume­MessageConcurrentlyService 类在 org.apache.rocketmq.client.impl.consumer 包中 。

这个类定义了三个线程池,一个主线程池用来正常执行收到的消息,用户可以自定义通过 consumeThreadMin 和 consumeThreadMax 来自定义线程个数。另外两个都是单线程的线程池,一个用来执行推迟消费的消息,另一个用来定期清理超时消息( 15 分钟)。

 org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService

public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,MessageListenerConcurrently messageListener) {this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;this.messageListener = messageListener;this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();this.consumeRequestQueue = new LinkedBlockingQueue<>();String consumerGroupTag = (consumerGroup.length() > 100 ? consumerGroup.substring(0, 100) : consumerGroup) + "_";this.consumeExecutor = new ThreadPoolExecutor(this.defaultMQPushConsumer.getConsumeThreadMin(),this.defaultMQPushConsumer.getConsumeThreadMax(),1000 * 60,TimeUnit.MILLISECONDS,this.consumeRequestQueue,new ThreadFactoryImpl("ConsumeMessageThread_" + consumerGroupTag));this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_" + consumerGroupTag));this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_" + consumerGroupTag));}

        从 Broker 获取到一批消息以后,根据 BatchSize 的 设置 ,把一批消息封装到一个 ConsumeRequest 中 ,然后把这个 ConsumeRequest 提交到consumeExecutor 线程池中执行  。

    @Overridepublic void submitConsumeRequest(final List<MessageExt> msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispatchToConsume) {final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();if (msgs.size() <= consumeBatchSize) {ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);try {this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {this.submitConsumeRequestLater(consumeRequest);}} else {for (int total = 0; total < msgs.size(); ) {List<MessageExt> msgThis = new ArrayList<>(consumeBatchSize);for (int i = 0; i < consumeBatchSize; i++, total++) {if (total < msgs.size()) {msgThis.add(msgs.get(total));} else {break;}}ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);try {this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {for (; total < msgs.size(); total++) {msgThis.add(msgs.get(total));}this.submitConsumeRequestLater(consumeRequest);}}}}

        消息的处理结果可能有不同的值,主要 的两个是 CONSUME SUCCESS 和 RECONSUME LATER 。 如果消费不成功,要把消息提交到上面说的scheduledExecutorService 线程池中, 5 秒后再执行;如果消费模式是  CLUSTERING模式,未消费成功的消息会先被发送回 Broker ,供这个 ConsumerGroup 里的其他 Consumer 消费,如果发送回 Broker 失败 , 再调用阻CONSUME_LATER

    public void processConsumeResult(final ConsumeConcurrentlyStatus status,final ConsumeConcurrentlyContext context,final ConsumeRequest consumeRequest) {int ackIndex = context.getAckIndex();if (consumeRequest.getMsgs().isEmpty())return;// 消息的处理结果可能有不同的值,主要的两个是 CONSUME_SUCCESS 和 RECONSUME_LATERswitch (status) {case CONSUME_SUCCESS:if (ackIndex >= consumeRequest.getMsgs().size()) {ackIndex = consumeRequest.getMsgs().size() - 1;}int ok = ackIndex + 1;int failed = consumeRequest.getMsgs().size() - ok;this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);break;case RECONSUME_LATER:ackIndex = -1;this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),consumeRequest.getMsgs().size());break;default:break;}switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());}break;case CLUSTERING:List<MessageExt> msgBackFailed = new ArrayList<>(consumeRequest.getMsgs().size());for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);// Maybe message is expired and cleaned, just ignore it.if (!consumeRequest.getProcessQueue().containsMessage(msg)) {log.info("Message is not found in its process queue; skip send-back-procedure, topic={}, "+ "brokerName={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getBrokerName(),msg.getQueueId(), msg.getQueueOffset());continue;}boolean result = this.sendMessageBack(msg, context);if (!result) {msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);msgBackFailed.add(msg);}}if (!msgBackFailed.isEmpty()) {consumeRequest.getMsgs().removeAll(msgBackFailed);this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());}break;default:break;}long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);}}

        处理逻辑是用户自定义 的, 当消息量大的时候,处理逻辑执行效率的高低影 响系统的吞吐量。 可以把多条消息组合起来处理,或者提高线程数,以提高系统的吞吐量。

        11.2.2 ProcessQueue 对象

        在前面的源码中,有个 Process Queue 类型的对象,这个对象的功能是什么呢?从 Broker 获得的消息,因为是提交到线程池里并行执行,很难监控和控制执行状态 , 比如如何获得当前消息堆积的数量,如何解决处理超时情况等 。RocketMQ 定义了一个快照类 Process Queue 来解决这些问题,在 Push Consumer 运行的时候,每个 Message Queue 都会有一个对应的 Process Queue 对象,保存了这个 Message Queue 消息处理状态的快照。Process Queue 对象里主要 的内容是一个 TreeMap 和 一个读写锁。 TreeMap里以 Message Queue 的 Offset 作为 Key,以消息内容的引用为 Value ,保存了所有从 MessageQueue 获取到但是还未被处理的消息,读写锁控制着多个线程对 TreeMap 对象的并发访问 。

        有了 ProcessQueue 对象,可以随时停止、启动消息的消费,同时也可用于帮助实现顺序消费消息 。 顺序消息是通过 ConsumeMessage-OrderlyService 类实现的 ,主要流程和 ConsumeMessageConcurrentlyService 类似 ,区别只是在对并发消费的控制上 。

11.3 生产者消费者的底层类 

         MQClientinstance 是客户端各种类型的 Consumer 和 Producer 的底层类。这个类首先从 NameServer 获取并保存各种配置信息,比如 Topic 的 Route 信息 。 同时 MQClientlnstance 还会通过 MQClientAPIImpl 类实现消息的收发,也就是从 Broker 获取消息或者发送消息到 Broker 。既然 MQClientinstance 实现的是底层通信功能和获取并保存元数据的功能,就没必要每个 Consumer 或 Producer 都创建一个对象,一个 MQClientlnstance对象可以被多个 Consumer 或 Producer 公用 。 RocketMQ 通过一个工厂类达到共用 MQClientlnstance 的目的 。 

// org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#start L737
//    org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start L915
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

        注意, MQClientlnstance 是通过工厂类被创建的,并不是一个单例模式,有些情况下需要创建多个实例。 

    public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {// clientld 的 格式是“ clientlp ”+ @ +“ InstanceName ” InstanceName  有默认值String clientId = clientConfig.buildMQClientId();MQClientInstance instance = this.factoryTable.get(clientId);if (null == instance) {instance =new MQClientInstance(clientConfig.cloneClientConfig(),this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);if (prev != null) {instance = prev;log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);} else {log.info("Created new MQClientInstance for clientId:[{}]", clientId);}}return instance;}

        普通情况下, 一个用到 RocketMQ 客户端的 Java 程序,或者说一个JVM 进程只要有 一个 MQClientlnstance 实例就够了 。 这时候创建一个或 多 个Consumer 或者 Producer , 底层使用的是同一个 MQClientlnstance 实例。

        在 quick start 文档中创建一个 DefaultMQPushConsumer 来接收消息,没有设置这个 Consumer 的 InstanceName 参数(通过 setlnstanceName 函数进行设置), 这个时候 InstanceName 的值是默认的 “ DEFAULT ” 。 实际创建 的MQClientlnstance 个数由设定的逻辑进行控制 。

// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
public synchronized void start() throws MQClientException {switch (this.serviceState) {case CREATE_JUST:log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());this.serviceState = ServiceState.START_FAILED;this.checkConfig();this.copySubscription();// MQC!ientlnstance 个数由设定的逻辑if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {this.defaultMQPushConsumer.changeInstanceNameToPID();}this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);if (this.pullAPIWrapper == null) {this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());}this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);if (this.defaultMQPushConsumer.getOffsetStore() != null) {this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();} else {switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;case CLUSTERING:this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;default:break;}this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);}this.offsetStore.load();if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {this.consumeOrderly = true;this.consumeMessageService =new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());//POPTODO reuse Executor ?this.consumeMessagePopService = new ConsumeMessagePopOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {this.consumeOrderly = false;this.consumeMessageService =new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());//POPTODO reuse Executor ?this.consumeMessagePopService =new ConsumeMessagePopConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());}this.consumeMessageService.start();// POPTODOthis.consumeMessagePopService.start();boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}mQClientFactory.start();log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());this.serviceState = ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The PushConsumer service state not OK, maybe started once, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);default:break;}this.updateTopicSubscribeInfoWhenSubscriptionChanged();this.mQClientFactory.checkClientInBroker();this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();this.mQClientFactory.rebalanceImmediately();}
    public void changeInstanceNameToPID() {if (this.instanceName.equals("DEFAULT")) {this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();}}

        从 InstanceName 的创建逻辑就可以看出,如果创建 Consumer 或者 Producer类型的时候不手动指定 InstanceName ,进程中只会有一个 MQClientlnstance对象。

        有些情况下只有一个 MQClientlnstance 对象是不够的,比如一个 Java 程序需要连接两个 RoceketMQ 集群 ,从一个集群读取消息,发送到另一个集群, 一个 MQClientlnstance 对象无法支持这种场景 。 这种情况下一定要手动指定不同的 InstanceName ,底层会创建两个 MQClientlnstance 对象。

        11.3.2 MQClientlnstance 类的功能 

        首先来看一下 MQClientlnstance 类的 Start 函数,从 Start 函数中的逻辑能大致了解 MQClientlnstance 类的功能 

        Start 函数中的 MQClientAPIImpl 对 象用来负责底层消息通信 , 然后启动pullMessageService 和 rebalanceService 。在类的成员变量中,用 topicRouteTable 、brokerAddrTable 等来存储从 NameServer 中获得的集群状态信息,并通过一个Scheduled Task 来维护这些信息 。

org.apache.rocketmq.client.impl.factory.MQClientInstance#startScheduledTask 

        从代码中可以看出, MQClientlnstance 会定时进行如下几个操作:获 取NameServer 地址 、 更新 TopicRoute 信息 、清理离线的 Broker 和保存消费者的Offset 。