RocketMQ 消息发送源码解读

RocketMQ 消息发送需要考虑以下3个问题。


public class Message implements Serializable { private String topic; // 主题 private int flag; // 消息标记(不做处理) private Map<String, String> properties; // 扩展属性 private byte[] body; // 消息体 private String transactionId; // 事务 id public Message() { } }



public class MessageSysFlag { 
public final static int COMPRESSED_FLAG = 0x1; 
public final static int MULTI_TAGS_FLAG = 0x1 << 1; 
public final static int TRANSACTION_NOT_TYPE = 0; 
public final static int TRANSACTION_PREPARED_TYPE = 0x1 << 2; 
public final static int TRANSACTION_COMMIT_TYPE = 0x2 << 2; 
public final static int TRANSACTION_ROLLBACK_TYPE = 0x3 << 2; 
public final static int BORNHOST_V6_FLAG = 0x1 << 4; 
public final static int STOREHOSTADDRESS_V6_FLAG = 0x1 << 5; //Mark the flag for batch to avoid conflict 
public final static int NEED_UNWRAP_FLAG = 0x1 << 6; 
public final static int INNER_BATCH_FLAG = 0x1 << 7; // COMPRESSION_TYPE 
public final static int COMPRESSION_LZ4_TYPE = 0x1 << 8; 
public final static int COMPRESSION_ZSTD_TYPE = 0x2 << 8; 
public final static int COMPRESSION_ZLIB_TYPE = 0x3 << 8; 
public final static int COMPRESSION_TYPE_COMPARATOR = 0x7 << 8; }

tags:消息 tag,用于消息过滤。
keys:消息索引键,用空格隔开,RocketMQ 可以根据这些 key(键)快速检索消息。


继承了 MQAdmin 接口
public interface MQAdmin {// 创建 topic void createTopic(String key, String newTopic, int queueNum, int topicSysFlag, Map<String, String> attributes) throws MQClientException; // 根据时间搜索下标 long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException; // 最大下标 long maxOffset(final MessageQueue mq) throws MQClientException; // 最小下标 long minOffset(final MessageQueue mq) throws MQClientException; // 获取最早消息存储时间 long earliestMsgStoreTime(final MessageQueue mq) throws MQClientException; // 根据 msgId 查询消息 MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; // 查询消息 QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin, final long end) throws MQClientException, InterruptedException; }


public interface MQProducer extends MQAdmin { 
// 获取发布的消息队列 
List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException; 
// 同步发送 
SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; 
// 异步发送 
void send(final Message msg, final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException; 
// 单向消息发送 
void sendOneway(final Message msg) throws MQClientException, RemotingException, InterruptedException; 
// 同步方式发送消息,且发送到指定的消息队列。 
SendResult send(final Message msg, final MessageQueue mq) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; 
// 异步方式发送消息,且发送到指定的消息队列。 
void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException; 
// 单向方式发送消息,且发送到指定的消息队列。 
void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException, RemotingException, InterruptedException; 
// 同步批量消息发送。 
SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; 
// 异步批量消息发送。 
void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException; 
// 单向批量消息发送。 
void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg) throws MQClientException, RemotingException, InterruptedException; 
// 发送事务消息 
TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException; 
// 同步批量消息发送。 
SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; 

DefaultMQProducer 的核心属性

public class DefaultMQProducer extends ClientConfig implements MQProducer {protected final transient DefaultMQProducerImpl defaultMQProducerImpl; // 实际的生存者调用实现 private String producerGroup; // 生产者组 private volatile int defaultTopicQueueNums = 4; // 默认 4 个 队列 private int sendMsgTimeout = 3000; // 发送超时时间 private int compressMsgBodyOverHowmuch = 1024 * 4; // 消息体最大大小 private int retryTimesWhenSendFailed = 2; // 同步重试次数 private int retryTimesWhenSendAsyncFailed = 2; // 异步重试次数 private boolean retryAnotherBrokerWhenNotStoreOK = false; // 发送失败是否会指定另外的 broker private int maxMessageSize = 1024 * 1024 * 4; // 最大消息为 4M 


org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start (boolean)

// 检查配置  this.checkConfig();  if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {  // 修改生产者 id  this.defaultMQProducer.changeInstanceNameToPID();  }  // 创建实例  this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);  // 注册实例  boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);  if (!registerOK) {  this.serviceState = ServiceState.CREATE_JUST;  throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()  + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),  null);  }  
// 成功就放入方便管理  this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());  // 启动  if (startFactory) {  mQClientFactory.start();  }

org.apache.rocketmq.client.impl.MQClientManager#getOrCreateMQClientInstance (org.apache.rocketmq.client.ClientConfig, org.apache.rocketmq.remoting.RPCHook)

public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {  // 实例 id  String clientId = clientConfig.buildMQClientId();  MQClientInstance instance = this.factoryTable.get(clientId);  // 不存在则创建一个实例  if (null == instance) {  instance =  new MQClientInstance(clientConfig.cloneClientConfig(),  this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);  MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);  if (prev != null) {  instance = prev;  log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);  } else {  log.info("Created new MQClientInstance for clientId:[{}]", clientId);  }  }  return instance;  


RocketMQ 发送-存储-消费全流程。
RocketMQ 发送主代码。

private SendResult sendDefaultImpl(  Message msg,  final CommunicationMode communicationMode,  final SendCallback sendCallback,  final long timeout  
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {  // 消息验证  this.makeSureStateOK();  Validators.checkMessage(msg, this.defaultMQProducer);  // 拿 topic  TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());  if (topicPublishInfo != null && topicPublishInfo.ok()) {  ...  // 发送主逻辑  sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);  }  // 远程获取 nameSrc   
validateNameServerSetting();  throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),  null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);  


tryToFindTopicPublishInfo 是查找主题的路由信息的方法
获取路由信息,发哪个 Broker。

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {  // 查找路由信息  TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);  if (null == topicPublishInfo || !topicPublishInfo.ok()) {  this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());  this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);  topicPublishInfo = this.topicPublishInfoTable.get(topic);  }  if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {  return topicPublishInfo;  } else {  // 从 NameSpace 中更新路由信息  this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);  topicPublishInfo = this.topicPublishInfoTable.get(topic);  return topicPublishInfo;  }  

TopicPublishInfo 的 Class 结构:

public class TopicPublishInfo {  private boolean orderTopic = false;  private boolean haveTopicRouterInfo = false;  private List<MessageQueue> messageQueueList = new ArrayList<>();  private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();  private TopicRouteData topicRouteData;  
}  public class TopicRouteData extends RemotingSerializable {  private String orderTopicConf;  private List<QueueData> queueDatas;  private List<BrokerData> brokerDatas;  private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;  //It could be null or empty  private Map<String/*brokerName*/, TopicQueueMappingInfo> topicQueueMappingByBroker;  

MQClientInstance#updateTopicRouteInfoFromNameServer 从 NameSpace 拿路由信息的方法。

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,  DefaultMQProducer defaultMQProducer) {  // 去 NameSpace 拿路由信息  TopicRouteData topicRouteData;  if (isDefault && defaultMQProducer != null) {  topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(clientConfig.getMqClientApiTimeout());  if (topicRouteData != null) {  for (QueueData data : topicRouteData.getQueueDatas()) {  int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());  data.setReadQueueNums(queueNums);  data.setWriteQueueNums(queueNums);  }  }    } else {  topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());  }  if (topicRouteData != null) {  // 获取路由信息  TopicRouteData old = this.topicRouteTable.get(topic);  // 对比  boolean changed = topicRouteData.topicRouteDataChanged(old);  if (!changed) {  // 发生变化就改路由信息  changed = this.isNeedUpdateTopicRouteInfo(topic);  } else {  log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);  }  if (changed) {  // 更新 Broker 地址缓存表  for (BrokerData bd : topicRouteData.getBrokerDatas()) {  this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());  }  // Update endpoint map  {  ConcurrentMap<MessageQueue, String> mqEndPoints = topicRouteData2EndpointsForStaticTopic(topic, topicRouteData);  if (!mqEndPoints.isEmpty()) {  topicEndPointsTable.put(topic, mqEndPoints);  }  }  // Update Pub info  {  // 将 topicRouteData 转换为 TopicPublishInfo  TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);  publishInfo.setHaveTopicRouterInfo(true);  for (Entry<String, MQProducerInner> entry : this.producerTable.entrySet()) {  MQProducerInner impl = entry.getValue();  if (impl != null) {  impl.updateTopicPublishInfo(topic, publishInfo);  }  }  }  // Update sub info  if (!consumerTable.isEmpty()) {  // 更新 consumer 信息  Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);  for (Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {  MQConsumerInner impl = entry.getValue();  if (impl != null) {  impl.updateTopicSubscribeInfo(topic, subscribeInfo);  }  }  }  TopicRouteData cloneTopicRouteData = new TopicRouteData(topicRouteData);  log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);  this.topicRouteTable.put(topic, cloneTopicRouteData);  return true;        }  }


首先会 broker + queue 的方式进行顺序排列。

[{“brokerName”: “broker-a”、“queueId”: 0}、

由 retryTimesWhenSend AsyncFailed 指定异常重试次数


1)sendLatencyFaultEnable=false,默认不启用 Broker 故障延迟机制。
2)sendLatencyFaultEnable=true,启用 Broker 故障延迟机制。

如果 sendLatencyFaultEnable=false 延时故障启动机制,则调用 TopicPublishInfo #selectOneMessageQueue

    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {  if (this.sendLatencyFaultEnable) {  try {  int index = tpInfo.getSendWhichQueue().incrementAndGet();  // 1)轮询获取一个消息队列。  for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {  int pos = index++ % tpInfo.getMessageQueueList().size();  MessageQueue mq = tpInfo.getMessageQueueList().get(pos);  // 2)验证该消息队列是否可用,latencyFaultTolerance.isAvailable(mq.getBrokerName())是关键。  if (!StringUtils.equals(lastBrokerName, mq.getBrokerName()) && latencyFaultTolerance.isAvailable(mq.getBrokerName())) {  return mq;  }  }  // 如果返回的MessageQueue可用,则移除latencyFaultTolerance关于该topic的条目,表明该Broker故障已经修复。  final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();  int writeQueueNums = tpInfo.getWriteQueueIdByBroker(notBestBroker);  if (writeQueueNums > 0) {  // Broker故障延迟机制  final MessageQueue mq = tpInfo.selectOneMessageQueue();  if (notBestBroker != null) {  mq.setBrokerName(notBestBroker);  mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);  }  return mq;  } else {  latencyFaultTolerance.remove(notBestBroker);  }  return tpInfo.selectOneMessageQueue();  }  
// 不启用  return tpInfo.selectOneMessageQueue(lastBrokerName);  }  // 故障机制,平移到其他 Queue  public MessageQueue selectOneMessageQueue() {  int index = this.sendWhichQueue.incrementAndGet();  int pos = index % this.messageQueueList.size();  return this.messageQueueList.get(pos);  }

RocketMQ 故障延迟机制核心类
RocketMQ 消息发送源码解读

4、MQFalutStrategy:操作 LatenctFaultTolerance 接口封装类




1)Message msg:待发送消息。
2)MessageQueue mq:消息将发送到该消息队列上。
3)CommunicationMode communicationMode:消息发送模式,包括 SYNC、ASYNC、ONEWAY。
4)SendCallback sendCallback:异步消息回调函数。
5)TopicPublishInfo topicPublishInfo:主题路由信息。
6)long timeout:消息发送超时时间。


private SendResult sendKernelImpl() {  // broker 地址 if (null == brokerAddr) {  tryToFindTopicPublishInfo(mq.getTopic());  brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);  brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);  }  SendMessageContext context = null;  if (brokerAddr != null) {  brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);  byte[] prevBody = msg.getBody();  try {  //for MessageBatch,ID has been set in the generating process  // 对于 MsgBatch,在生成过程中已设置 ID            if (!(msg instanceof MessageBatch)) {  MessageClientIDSetter.setUniqID(msg);  }  // 标记压缩 FLAG  int sysFlag = 0;  boolean msgBodyCompressed = false;  if (this.tryToCompressMessage(msg)) {  sysFlag |= MessageSysFlag.COMPRESSED_FLAG;  sysFlag |= compressType.getCompressionFlag();  msgBodyCompressed = true;  }  // 标记事务消息  final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);  if (Boolean.parseBoolean(tranMsg)) {  sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;  }  // 注册消息发送钩子函数  if (hasCheckForbiddenHook()) {  CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();  ...this.executeCheckForbiddenHook(checkForbiddenContext);  }  if (this.hasSendMessageHook()) {  context = new SendMessageContext();  ... // 发送之前的增强逻辑  this.executeSendMessageHookBefore(context);  }  // 构建消息发送请求包  SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();  ...// 发送消息 同步、异步、单向  SendResult sendResult = null;  switch (communicationMode) {  case ASYNC:  break;                case ONEWAY:  break;case SYNC:  break;            }  // 发送之后的处理逻辑  if (this.hasSendMessageHook()) {  context.setSendResult(sendResult);  this.executeSendMessageHookAfter(context);  }  return sendResult;  } catch (RemotingException | InterruptedException | MQBrokerException e) {  // 如果注册了钩子函数,就执行 after 逻辑  if (this.hasSendMessageHook()) {  context.setException(e);  this.executeSendMessageHookAfter(context);  }  throw e;  } finally {  msg.setBody(prevBody);  msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));  }  }  throw new MQClientException("The broker[" + brokerName + "] not exist", null);  

三种发送方式 :


// 发送方式  
switch (communicationMode) {  case ONEWAY:  this.remotingClient.invokeOneway(addr, request, timeoutMillis);  return null;    case ASYNC:  final AtomicInteger times = new AtomicInteger();  long costTimeAsync = System.currentTimeMillis() - beginStartTime;  if (timeoutMillis < costTimeAsync) {  throw new RemotingTooMuchRequestException("sendMessage call timeout");  }  this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,  retryTimesWhenSendFailed, times, context, producer);  return null;   case SYNC:  long costTimeSync = System.currentTimeMillis() - beginStartTime;  if (timeoutMillis < costTimeSync) {  throw new RemotingTooMuchRequestException("sendMessage call timeout");  }  return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);  default:  assert false;  break;}

其核心重试逻辑就是先记录到容错 Item 中,然后在通过回调方法就行发送。
org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessageAsync 。

try {  this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {  @Override  public void operationComplete(ResponseFuture responseFuture) {  long cost = System.currentTimeMillis() - beginStartTime;  RemotingCommand response = responseFuture.getResponseCommand();  if (null == sendCallback && response != null) {  try {  SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);  if (context != null && sendResult != null) {  context.setSendResult(sendResult);  context.getProducer().executeSendMessageHookAfter(context);  }  } catch (Throwable e) {  }  producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);  return;            }  if (response != null) {  try {  SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);  assert sendResult != null;  if (context != null) {  context.setSendResult(sendResult);  context.getProducer().executeSendMessageHookAfter(context);  }  try {  sendCallback.onSuccess(sendResult);  } catch (Throwable e) {  }  producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);  } catch (Exception e) {  producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);  // 重试  onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,  retryTimesWhenSendFailed, times, e, context, false, producer);  }  } else {  producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);  if (!responseFuture.isSendRequestOK()) {  MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());  // 重试  onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,  retryTimesWhenSendFailed, times, ex, context, true, producer);  } else if (responseFuture.isTimeout()) {  MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",  responseFuture.getCause());  // 重试  onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,  retryTimesWhenSendFailed, times, ex, context, true, producer);  } else {  MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());  // 重试  onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,  retryTimesWhenSendFailed, times, ex, context, true, producer);  }  }  }  });  
} catch (Exception ex) {  long cost = System.currentTimeMillis() - beginStartTime;  producer.updateFaultItem(brokerName, cost, true);  // 重试  onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,  retryTimesWhenSendFailed, times, ex, context, true, producer);  


DefaultMQProducer #send (Collection msgs)




  • 消息队列如何进行负载?
  • 消息发送如何实现高可用?
  • 批量消息发送如何实现一致性?
  • 消息发送流程