RocketMQ 消息发送源码解读
可靠同步发送、可靠异步发送、单向发送、批量消息发送。
RocketMQ 消息发送需要考虑以下3个问题。
1)消息队列如何进行负载?
2)消息发送如何实现高可用?
3)批量消息发送如何实现一致性?
org.apache.rocketmq.common.message.Message
public class Message implements Serializable { private String topic; // 主题 private int flag; // 消息标记(不做处理) private Map<String, String> properties; // 扩展属性 private byte[] body; // 消息体 private String transactionId; // 事务 id public Message() { } }
消息扩展属性:
org.apache.rocketmq.common.sysflag.MessageSysFlag
?这是啥意思
public class MessageSysFlag {
public final static int COMPRESSED_FLAG = 0x1;
public final static int MULTI_TAGS_FLAG = 0x1 << 1;
public final static int TRANSACTION_NOT_TYPE = 0;
public final static int TRANSACTION_PREPARED_TYPE = 0x1 << 2;
public final static int TRANSACTION_COMMIT_TYPE = 0x2 << 2;
public final static int TRANSACTION_ROLLBACK_TYPE = 0x3 << 2;
public final static int BORNHOST_V6_FLAG = 0x1 << 4;
public final static int STOREHOSTADDRESS_V6_FLAG = 0x1 << 5; //Mark the flag for batch to avoid conflict
public final static int NEED_UNWRAP_FLAG = 0x1 << 6;
public final static int INNER_BATCH_FLAG = 0x1 << 7; // COMPRESSION_TYPE
public final static int COMPRESSION_LZ4_TYPE = 0x1 << 8;
public final static int COMPRESSION_ZSTD_TYPE = 0x2 << 8;
public final static int COMPRESSION_ZLIB_TYPE = 0x3 << 8;
public final static int COMPRESSION_TYPE_COMPARATOR = 0x7 << 8; }
tags:消息 tag,用于消息过滤。
keys:消息索引键,用空格隔开,RocketMQ 可以根据这些 key(键)快速检索消息。
waitStoreMsgOK:消息发送时是否等消息存储完成后再返回。
DefaultMQProducer
继承了 MQAdmin 接口
public interface MQAdmin {// 创建 topic void createTopic(String key, String newTopic, int queueNum, int topicSysFlag, Map<String, String> attributes) throws MQClientException; // 根据时间搜索下标 long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException; // 最大下标 long maxOffset(final MessageQueue mq) throws MQClientException; // 最小下标 long minOffset(final MessageQueue mq) throws MQClientException; // 获取最早消息存储时间 long earliestMsgStoreTime(final MessageQueue mq) throws MQClientException; // 根据 msgId 查询消息 MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; // 查询消息 QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin, final long end) throws MQClientException, InterruptedException; }
org.apache.rocketmq.client.producer.MQProducer
主要方法有:
public interface MQProducer extends MQAdmin {
// 获取发布的消息队列
List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;
// 同步发送
SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
// 异步发送
void send(final Message msg, final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException;
// 单向消息发送
void sendOneway(final Message msg) throws MQClientException, RemotingException, InterruptedException;
// 同步方式发送消息,且发送到指定的消息队列。
SendResult send(final Message msg, final MessageQueue mq) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
// 异步方式发送消息,且发送到指定的消息队列。
void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException;
// 单向方式发送消息,且发送到指定的消息队列。
void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException, RemotingException, InterruptedException;
// 同步批量消息发送。
SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
// 异步批量消息发送。
void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException;
// 单向批量消息发送。
void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg) throws MQClientException, RemotingException, InterruptedException;
// 发送事务消息
TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;
// 同步批量消息发送。
SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
}
DefaultMQProducer 的核心属性
public class DefaultMQProducer extends ClientConfig implements MQProducer {protected final transient DefaultMQProducerImpl defaultMQProducerImpl; // 实际的生存者调用实现 private String producerGroup; // 生产者组 private volatile int defaultTopicQueueNums = 4; // 默认 4 个 队列 private int sendMsgTimeout = 3000; // 发送超时时间 private int compressMsgBodyOverHowmuch = 1024 * 4; // 消息体最大大小 private int retryTimesWhenSendFailed = 2; // 同步重试次数 private int retryTimesWhenSendAsyncFailed = 2; // 异步重试次数 private boolean retryAnotherBrokerWhenNotStoreOK = false; // 发送失败是否会指定另外的 broker private int maxMessageSize = 1024 * 1024 * 4; // 最大消息为 4M
}
消息生产者启动流程
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start (boolean)
// 检查配置 this.checkConfig(); if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) { // 修改生产者 id this.defaultMQProducer.changeInstanceNameToPID(); } // 创建实例 this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook); // 注册实例 boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); }
// 成功就放入方便管理 this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo()); // 启动 if (startFactory) { mQClientFactory.start(); }
org.apache.rocketmq.client.impl.MQClientManager#getOrCreateMQClientInstance (org.apache.rocketmq.client.ClientConfig, org.apache.rocketmq.remoting.RPCHook)
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) { // 实例 id String clientId = clientConfig.buildMQClientId(); MQClientInstance instance = this.factoryTable.get(clientId); // 不存在则创建一个实例 if (null == instance) { instance = new MQClientInstance(clientConfig.cloneClientConfig(), this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook); MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance); if (prev != null) { instance = prev; log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId); } else { log.info("Created new MQClientInstance for clientId:[{}]", clientId); } } return instance;
}
消息发送基本流程
发送流程图
RocketMQ 发送-存储-消费全流程。
RocketMQ 发送主代码。
private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { // 消息验证 this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); // 拿 topic TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { ... // 发送主逻辑 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); } // 远程获取 nameSrc
validateNameServerSetting(); throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
查找路由信息
tryToFindTopicPublishInfo 是查找主题的路由信息的方法
获取路由信息,发哪个 Broker。
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { // 查找路由信息 TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { // 从 NameSpace 中更新路由信息 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; }
}
TopicPublishInfo 的 Class 结构:
public class TopicPublishInfo { private boolean orderTopic = false; private boolean haveTopicRouterInfo = false; private List<MessageQueue> messageQueueList = new ArrayList<>(); private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); private TopicRouteData topicRouteData;
} public class TopicRouteData extends RemotingSerializable { private String orderTopicConf; private List<QueueData> queueDatas; private List<BrokerData> brokerDatas; private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable; //It could be null or empty private Map<String/*brokerName*/, TopicQueueMappingInfo> topicQueueMappingByBroker;
}
MQClientInstance#updateTopicRouteInfoFromNameServer 从 NameSpace 拿路由信息的方法。
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) { // 去 NameSpace 拿路由信息 TopicRouteData topicRouteData; if (isDefault && defaultMQProducer != null) { topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(clientConfig.getMqClientApiTimeout()); if (topicRouteData != null) { for (QueueData data : topicRouteData.getQueueDatas()) { int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums()); data.setReadQueueNums(queueNums); data.setWriteQueueNums(queueNums); } } } else { topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout()); } if (topicRouteData != null) { // 获取路由信息 TopicRouteData old = this.topicRouteTable.get(topic); // 对比 boolean changed = topicRouteData.topicRouteDataChanged(old); if (!changed) { // 发生变化就改路由信息 changed = this.isNeedUpdateTopicRouteInfo(topic); } else { log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData); } if (changed) { // 更新 Broker 地址缓存表 for (BrokerData bd : topicRouteData.getBrokerDatas()) { this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs()); } // Update endpoint map { ConcurrentMap<MessageQueue, String> mqEndPoints = topicRouteData2EndpointsForStaticTopic(topic, topicRouteData); if (!mqEndPoints.isEmpty()) { topicEndPointsTable.put(topic, mqEndPoints); } } // Update Pub info { // 将 topicRouteData 转换为 TopicPublishInfo TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData); publishInfo.setHaveTopicRouterInfo(true); for (Entry<String, MQProducerInner> entry : this.producerTable.entrySet()) { MQProducerInner impl = entry.getValue(); if (impl != null) { impl.updateTopicPublishInfo(topic, publishInfo); } } } // Update sub info if (!consumerTable.isEmpty()) { // 更新 consumer 信息 Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData); for (Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { MQConsumerInner impl = entry.getValue(); if (impl != null) { impl.updateTopicSubscribeInfo(topic, subscribeInfo); } } } TopicRouteData cloneTopicRouteData = new TopicRouteData(topicRouteData); log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData); this.topicRouteTable.put(topic, cloneTopicRouteData); return true; } }
选择消息队列
首先会 broker + queue 的方式进行顺序排列。
[{“brokerName”: “broker-a”、“queueId”: 0}、
{“brokerName”:“broker-a”、“queueId”:1}、
{“brokerName”:“broker-a”、“queueId”:2}、
{“brokerName”:“broker-a”、“queueId”:3}、
{“brokerName”:“broker-b”、“queueId”:0}、
…]
Q:重试机制
由retryTimesWhenSendFailed指定同步方式重试次数
由 retryTimesWhenSend AsyncFailed 指定异常重试次数
选择消息队列有两种方式:
1)sendLatencyFaultEnable=false,默认不启用 Broker 故障延迟机制。
2)sendLatencyFaultEnable=true,启用 Broker 故障延迟机制。
如果 sendLatencyFaultEnable=false
延时故障启动机制,则调用 TopicPublishInfo #selectOneMessageQueue
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { if (this.sendLatencyFaultEnable) { try { int index = tpInfo.getSendWhichQueue().incrementAndGet(); // 1)轮询获取一个消息队列。 for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = index++ % tpInfo.getMessageQueueList().size(); MessageQueue mq = tpInfo.getMessageQueueList().get(pos); // 2)验证该消息队列是否可用,latencyFaultTolerance.isAvailable(mq.getBrokerName())是关键。 if (!StringUtils.equals(lastBrokerName, mq.getBrokerName()) && latencyFaultTolerance.isAvailable(mq.getBrokerName())) { return mq; } } // 如果返回的MessageQueue可用,则移除latencyFaultTolerance关于该topic的条目,表明该Broker故障已经修复。 final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getWriteQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { // Broker故障延迟机制 final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } return tpInfo.selectOneMessageQueue(); }
// 不启用 return tpInfo.selectOneMessageQueue(lastBrokerName); } // 故障机制,平移到其他 Queue public MessageQueue selectOneMessageQueue() { int index = this.sendWhichQueue.incrementAndGet(); int pos = index % this.messageQueueList.size(); return this.messageQueueList.get(pos); }
RocketMQ 故障延迟机制核心类
核心类解析:
1、LatenctFaultTolerance:接口
2、LatenctFaultToleranceImpl:故障容错实现类
3、FaultItem:延迟最小单位
4、MQFalutStrategy:操作 LatenctFaultTolerance
接口封装类
详解
todo
消息发送
DefaultMQProducerImpl#sendKernelImpl
参数:
1)Message msg:待发送消息。
2)MessageQueue mq:消息将发送到该消息队列上。
3)CommunicationMode communicationMode:消息发送模式,包括 SYNC、ASYNC、ONEWAY。
4)SendCallback sendCallback:异步消息回调函数。
5)TopicPublishInfo topicPublishInfo:主题路由信息。
6)long timeout:消息发送超时时间。
主发送逻辑
private SendResult sendKernelImpl() { // broker 地址 if (null == brokerAddr) { tryToFindTopicPublishInfo(mq.getTopic()); brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq); brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName); } SendMessageContext context = null; if (brokerAddr != null) { brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); byte[] prevBody = msg.getBody(); try { //for MessageBatch,ID has been set in the generating process // 对于 MsgBatch,在生成过程中已设置 ID if (!(msg instanceof MessageBatch)) { MessageClientIDSetter.setUniqID(msg); } // 标记压缩 FLAG int sysFlag = 0; boolean msgBodyCompressed = false; if (this.tryToCompressMessage(msg)) { sysFlag |= MessageSysFlag.COMPRESSED_FLAG; sysFlag |= compressType.getCompressionFlag(); msgBodyCompressed = true; } // 标记事务消息 final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (Boolean.parseBoolean(tranMsg)) { sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; } // 注册消息发送钩子函数 if (hasCheckForbiddenHook()) { CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext(); ...this.executeCheckForbiddenHook(checkForbiddenContext); } if (this.hasSendMessageHook()) { context = new SendMessageContext(); ... // 发送之前的增强逻辑 this.executeSendMessageHookBefore(context); } // 构建消息发送请求包 SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); ...// 发送消息 同步、异步、单向 SendResult sendResult = null; switch (communicationMode) { case ASYNC: break; case ONEWAY: break;case SYNC: break; } // 发送之后的处理逻辑 if (this.hasSendMessageHook()) { context.setSendResult(sendResult); this.executeSendMessageHookAfter(context); } return sendResult; } catch (RemotingException | InterruptedException | MQBrokerException e) { // 如果注册了钩子函数,就执行 after 逻辑 if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); } throw e; } finally { msg.setBody(prevBody); msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); } } throw new MQClientException("The broker[" + brokerName + "] not exist", null);
}
三种发送方式 :
1、同步
2、异步
3、OneWay
// 发送方式
switch (communicationMode) { case ONEWAY: this.remotingClient.invokeOneway(addr, request, timeoutMillis); return null; case ASYNC: final AtomicInteger times = new AtomicInteger(); long costTimeAsync = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTimeAsync) { throw new RemotingTooMuchRequestException("sendMessage call timeout"); } this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, context, producer); return null; case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTimeSync) { throw new RemotingTooMuchRequestException("sendMessage call timeout"); } return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request); default: assert false; break;}
其核心重试逻辑就是先记录到容错 Item 中,然后在通过回调方法就行发送。
org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessageAsync 。
try { this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { @Override public void operationComplete(ResponseFuture responseFuture) { long cost = System.currentTimeMillis() - beginStartTime; RemotingCommand response = responseFuture.getResponseCommand(); if (null == sendCallback && response != null) { try { SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr); if (context != null && sendResult != null) { context.setSendResult(sendResult); context.getProducer().executeSendMessageHookAfter(context); } } catch (Throwable e) { } producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false); return; } if (response != null) { try { SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr); assert sendResult != null; if (context != null) { context.setSendResult(sendResult); context.getProducer().executeSendMessageHookAfter(context); } try { sendCallback.onSuccess(sendResult); } catch (Throwable e) { } producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false); } catch (Exception e) { producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); // 重试 onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, e, context, false, producer); } } else { producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); if (!responseFuture.isSendRequestOK()) { MQClientException ex = new MQClientException("send request failed", responseFuture.getCause()); // 重试 onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer); } else if (responseFuture.isTimeout()) { MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms", responseFuture.getCause()); // 重试 onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer); } else { MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause()); // 重试 onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer); } } } });
} catch (Exception ex) { long cost = System.currentTimeMillis() - beginStartTime; producer.updateFaultItem(brokerName, cost, true); // 重试 onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer);
}
批量发送
DefaultMQProducer #send (Collection msgs)
和单条发送差不多。
总结
关键点
1、延迟容错机制
2、重试机制
问题:
- 消息队列如何进行负载?
- 消息发送如何实现高可用?
- 批量消息发送如何实现一致性?
- 消息发送流程