> 文章列表 > RocketMQ源码分析之监控指标分析

RocketMQ源码分析之监控指标分析

RocketMQ源码分析之监控指标分析

这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党

Rocketmq版本

  • version: 5.1.0

背景

继续上次的高可用topic二开已经有了一段时间,现在我们需要对我们的限流数据进行监控,所以现在我们来研究研究RocketMQ的监控源码

入口

这里我们源码的切入点还是以client为切入点

首先我们来看看比如我们要统计topic发送消息的数量是如何统计的。
入口代码我这里直接看的是rocketmq-exporter的源代码,我这里给出部分核心代码

@Resourceprivate MQAdminExt mqAdminExt;BrokerStatsData bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.TOPIC_PUT_NUMS, topic);BrokerStatsData viewBrokerStatsData(final String brokerAddr, final String statsName, final String statsKey)throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException,InterruptedException;

可以看到核心方法就是通过viewBrokerStatsData方法
知道了入口后我们就去RocketMQ源码里面具体分析

客户端的监控指标获取

RocketMQ源码分析之监控指标分析

老规矩我们这里直接进去看看netty的通信code

RocketMQ源码分析之监控指标分析

这里可以看到的通信code就是RequestCode.VIEW_BROKER_STATS_DATA

所以我们直接去找到broker对应的业务handler

RocketMQ源码分析之监控指标分析

很快我们就锁定了方法ViewBrokerStatsData(ctx, request)
这里这个方法命名有点奇怪,竟然是大写开头,不过我们不用在意这些小问题

可以考虑给社区提一个pr,不过merge不merge就不知道了,毕竟现在社区的pr已经高达200多个没处理了

RocketMQ源码分析之监控指标分析

进入到ViewBrokerStatsData方法后,我们可以看到有有一个比较核心的方法

RocketMQ源码分析之监控指标分析

这里还记得requestHeader.getStatsName()requestHeader.getStatsKey()的值吗

没错就是我们开始传进来的这两个值

  • requestHeader.getStatsName():BrokerStatsManager.TOPIC_PUT_NUMS
  • requestHeader.getStatsKey(): topic

监控核心数据结构

在上面的分析我们就已经看到了RocketMQ的核心数据结构,没错就是
BrokerStatsManager

RocketMQ源码分析之监控指标分析

这里我们重点关注的属性就是

private final HashMap<String, StatsItemSet> statsTable = new HashMap<>();

这里我简单理了一下他们的关系图

RocketMQ源码分析之监控指标分析

这里我可以简单给大家看看debug实际里面的数据结构

RocketMQ源码分析之监控指标分析

其中statsTable的可以全在Stats类中,大致有如下一些,我这里简单截个图

RocketMQ源码分析之监控指标分析

之前我们使用的BrokerStatsManager.TOPIC_PUT_NUMS实际是不推荐使用了,推荐使用Stats

所以如果我们想要自定义一些监控指标就可以在这里面加一些我们自己的key

StatsItem中的统计维度主要有三个:

  • 分钟:csListMinute
  • 小时:csListHour
  • 天:csListDay

我们在AdminBrokerProcessor中处理客户端的请求的时候拼装返回数据也可以看到

RocketMQ源码分析之监控指标分析

RocketMQ源码分析之监控指标分析

StatsItemSet的初始化

StatsItemSet的初始化主要是在BrokerStatsManagerinit方法

RocketMQ源码分析之监控指标分析

数据是如何写入

通过上面的从clientbroker我们大致知道了数据查询以及存储的数据结构,接下来我们就来看看数据是如何写入的

通过上面的分析我们知道数据是存储在BrokerStatsManagerstatsTable

RocketMQ源码分析之监控指标分析

所以我们看看statsTable中的调用关系

RocketMQ源码分析之监控指标分析

我们很快就定位到了写入TOPIC_PUT_NUMS的方法

    public void incTopicPutNums(final String topic) {this.statsTable.get(Stats.TOPIC_PUT_NUMS).addValue(topic, 1, 1);}public void incTopicPutNums(final String topic, int num, int times) {this.statsTable.get(Stats.TOPIC_PUT_NUMS).addValue(topic, num, times);}

再通过这两个方法的调用关系,我们发现业务处理器即发消息的处理器SendMessageProcessor有调用

RocketMQ源码分析之监控指标分析

我们可以看到发送消息成功后就会更新改值的内存值

RocketMQ源码分析之监控指标分析

    this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);public void incTopicPutNums(final String topic, int num, int times) {this.statsTable.get(Stats.TOPIC_PUT_NUMS).addValue(topic, num, times);}

最终将数据写入到了StatsItem中,那么我们的csListMinutecsListHourcsListDay是如何统计的呢?这里我们有一个多个定时任务,每隔10s会去统计一次

RocketMQ源码分析之监控指标分析

统计逻辑就是在samplingInSeconds();方法中,可以看到这里启动的定时任务是10s统计一次

    public void samplingInSeconds() {synchronized (this.csListMinute) {if (this.csListMinute.size() == 0) {this.csListMinute.add(new CallSnapshot(System.currentTimeMillis() - 10 * 1000, 0, 0));}this.csListMinute.add(new CallSnapshot(System.currentTimeMillis(), this.times.sum(), this.value.sum()));if (this.csListMinute.size() > 7) {this.csListMinute.removeFirst();}}}

这里是总共统计7个值,超过就将之前的移除掉

RocketMQ源码分析之监控指标分析
第一次会添加一个初始化,可以看到我们到第60s刚好会将初始值0移除掉

计算我们的统计指标sumtps则是在computeStatsData这个方法

    private static StatsSnapshot computeStatsData(final LinkedList<CallSnapshot> csList) {StatsSnapshot statsSnapshot = new StatsSnapshot();synchronized (csList) {double tps = 0;double avgpt = 0;long sum = 0;long timesDiff = 0;if (!csList.isEmpty()) {CallSnapshot first = csList.getFirst();CallSnapshot last = csList.getLast();sum = last.getValue() - first.getValue();tps = (sum * 1000.0d) / (last.getTimestamp() - first.getTimestamp());timesDiff = last.getTimes() - first.getTimes();if (timesDiff > 0) {avgpt = (sum * 1.0d) / timesDiff;}}statsSnapshot.setSum(sum);statsSnapshot.setTps(tps);statsSnapshot.setAvgpt(avgpt);statsSnapshot.setTimes(timesDiff);}return statsSnapshot;}

总结

至此RocketMQ的一些监控指标的处理就分析完成了,我们从指标的获取->写入->统计都分析到了。
包括如何添加我们自己的监控指标,当然一些小细节就限于篇幅就没具体分析比如,统计计数使用的LongAdder而不是AtomicLong