Spring Boot集成RocketMQ消费端更多扩展属性配置 | Spring Cloud 35
一、前言
在前面我们通过以下章节对RocketMQ
有了基础的了解:
docker-compose 搭建RocketMQ 5.1.0 集群(双主双从模式) | Spring Cloud 28
docker-compose 搭建RocketMQ 5.1.0 集群开启ACL权限控制 | Spring Cloud 29
Spring Boot集成RocketMQ实现普通、延时、事务消息发送接收、PULL消费模式及开启ACL | Spring Cloud 30
现在开始我们正式学习Spring Boot
集成RocketMQ
消费端更多扩展属性配置,在本章节主要进行对以下部分讲解说明:
consumer
设置消费失败最大重试次数consumer
设置消费起始位点
二、Spring Boot集成RocketMQ消费端
com/gm/rocketmq/component/rocketmq/MessageExtConsumer.java
:
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.stereotype.Service;@Slf4j
@Service
@RocketMQMessageListener(topic = TopicConstants.NORMAL_ROCKETMQ_TOPIC_TEST, consumerGroup = TopicConstants.NORMAL_ROCKETMQ_TOPIC_TEST + TopicConstants.CONSUMER_GROUP, accessKey = "${rocketmq.consumer.access-key}", secretKey = "${rocketmq.consumer.secret-key}")
public class MessageExtConsumer implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {@Overridepublic void onMessage(MessageExt message) {log.info("MessageExtConsumer received message, msgId: {}, body: {} ", message.getMsgId(), new String(message.getBody()));}@Overridepublic void prepareStart(DefaultMQPushConsumer consumer) {// 设置从当前时间开始消费consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));// 设置最大重试次数consumer.setMaxReconsumeTimes(5);}
}
关于更多消费重试请见官网:https://rocketmq.apache.org/zh/docs/featureBehavior/10consumerretrypolicy
三、源码分析
查看org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer
的initRocketMQPushConsumer
的源码:
private void initRocketMQPushConsumer() throws MQClientException {if (rocketMQListener == null && rocketMQReplyListener == null) {throw new IllegalArgumentException("Property 'rocketMQListener' or 'rocketMQReplyListener' is required");}Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");Assert.notNull(nameServer, "Property 'nameServer' is required");Assert.notNull(topic, "Property 'topic' is required");RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(),this.rocketMQMessageListener.accessKey(), this.rocketMQMessageListener.secretKey());boolean enableMsgTrace = rocketMQMessageListener.enableMsgTrace();if (Objects.nonNull(rpcHook)) {consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),enableMsgTrace, this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));consumer.setVipChannelEnabled(false);} else {log.debug("Access-key or secret-key not configure in " + this + ".");consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));}consumer.setNamespace(namespace);String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer());if (customizedNameServer != null) {consumer.setNamesrvAddr(customizedNameServer);} else {consumer.setNamesrvAddr(nameServer);}if (accessChannel != null) {consumer.setAccessChannel(accessChannel);}//set the consumer core thread number and maximum thread number has the same valueconsumer.setConsumeThreadMax(consumeThreadNumber);consumer.setConsumeThreadMin(consumeThreadNumber);consumer.setConsumeTimeout(consumeTimeout);consumer.setMaxReconsumeTimes(maxReconsumeTimes);consumer.setAwaitTerminationMillisWhenShutdown(awaitTerminationMillisWhenShutdown);consumer.setInstanceName(instanceName);switch (messageModel) {case BROADCASTING:consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);break;case CLUSTERING:consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);break;default:throw new IllegalArgumentException("Property 'messageModel' was wrong.");}switch (selectorType) {case TAG:consumer.subscribe(topic, selectorExpression);break;case SQL92:consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));break;default:throw new IllegalArgumentException("Property 'selectorType' was wrong.");}switch (consumeMode) {case ORDERLY:consumer.setMessageListener(new DefaultMessageListenerOrderly());break;case CONCURRENTLY:consumer.setMessageListener(new DefaultMessageListenerConcurrently());break;default:throw new IllegalArgumentException("Property 'consumeMode' was wrong.");}//if String is not is equal "true" TLS mode will represent the as default value falseconsumer.setUseTLS(new Boolean(tlsEnable));if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);} else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) {((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer);}}
代码的最后几行,如果rocketMQListener
实现了RocketMQPushConsumerLifecycleListener
接口,则会调用RocketMQPushConsumerLifecycleListener
的prepareStart(consumer)
方法,很明显可以通过prepareStart
方法来配置consuemr
的扩展属性。
再由上述源码可知,根据consumeMode
不同来设置不同的的MessageListener
,分别是DefaultMessageListenerOrderly
和DefaultMessageListenerConcurrently
。
-
DefaultMessageListenerOrderly
实现接口MessageListenerOrderly
,接口MessageListenerOrderly
继承接口MessageListener
-
DefaultMessageListenerConcurrently
实现接口MessageListenerConcurrently
,接口MessageListenerConcurrently
继承接口MessageListener
public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {@SuppressWarnings("unchecked")@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt messageExt : msgs) {log.debug("received msg: {}", messageExt);try {long now = System.currentTimeMillis();handleMessage(messageExt);long costTime = System.currentTimeMillis() - now;log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);} catch (Exception e) {log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
}
以consumeMode
选择为CONCURRENTLY
举例:
在consumer
监听消息到达时,消息在进入自定义onMessage
方法之前,会进入到DefaultRocketMQListenerContainer
内部类DefaultMessageListenerConcurrently
(实现MessageListenerConcurrently
接口)的handleMessage
方法来处理消息。
private void handleMessage(MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException {if (rocketMQListener != null) {rocketMQListener.onMessage(doConvertMessage(messageExt));} else if (rocketMQReplyListener != null) {Object replyContent = rocketMQReplyListener.onMessage(doConvertMessage(messageExt));Message<?> message = MessageBuilder.withPayload(replyContent).build();org.apache.rocketmq.common.message.Message replyMessage = MessageUtil.createReplyMessage(messageExt, convertToBytes(message));DefaultMQProducer producer = consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer();producer.setSendMsgTimeout(replyTimeout);producer.send(replyMessage, new SendCallback() {@Override public void onSuccess(SendResult sendResult) {if (sendResult.getSendStatus() != SendStatus.SEND_OK) {log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus());} else {log.debug("Consumer replies message success.");}}@Override public void onException(Throwable e) {log.error("Consumer replies message failed. error: {}", e.getLocalizedMessage());}});}
}
handleMessage
方法会先调用doConvertMessage
方法反序列化对象,再调用自定义onMessage
方法并传递消息对象。
由上可知自定义
onMessage
方法发生异常时,异常会在DefaultMessageListenerConcurrently
的consumeMessage
方法中进行捕获,并返回ConsumeConcurrentlyStatus.RECONSUME_LATER
表示消费失败;而在onMessage
方法处理完成时,DefaultMessageListenerConcurrently
的consumeMessage
方法返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS
表示消费成功。
ConsumeConcurrentlyStatus.RECONSUME_LATER
消费失败后,先会进入%RETRY%+consumergroup
的Topic
中,再到这个ConsumerGroup
。如果一直这样重复消费都持续失败超过重试次数,就会投递到DLQ
死信队列,默认死信队列topic
名称为%DLQ%+consumergroup
。