> 文章列表 > Spring Boot集成RocketMQ消费端更多扩展属性配置 | Spring Cloud 35

Spring Boot集成RocketMQ消费端更多扩展属性配置 | Spring Cloud 35

Spring Boot集成RocketMQ消费端更多扩展属性配置 | Spring Cloud 35

一、前言

在前面我们通过以下章节对RocketMQ有了基础的了解:

docker-compose 搭建RocketMQ 5.1.0 集群(双主双从模式) | Spring Cloud 28

docker-compose 搭建RocketMQ 5.1.0 集群开启ACL权限控制 | Spring Cloud 29

Spring Boot集成RocketMQ实现普通、延时、事务消息发送接收、PULL消费模式及开启ACL | Spring Cloud 30

现在开始我们正式学习Spring Boot集成RocketMQ消费端更多扩展属性配置,在本章节主要进行对以下部分讲解说明:

  • consumer设置消费失败最大重试次数
  • consumer设置消费起始位点

二、Spring Boot集成RocketMQ消费端

com/gm/rocketmq/component/rocketmq/MessageExtConsumer.java

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.stereotype.Service;@Slf4j
@Service
@RocketMQMessageListener(topic = TopicConstants.NORMAL_ROCKETMQ_TOPIC_TEST, consumerGroup = TopicConstants.NORMAL_ROCKETMQ_TOPIC_TEST + TopicConstants.CONSUMER_GROUP, accessKey = "${rocketmq.consumer.access-key}", secretKey = "${rocketmq.consumer.secret-key}")
public class MessageExtConsumer implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {@Overridepublic void onMessage(MessageExt message) {log.info("MessageExtConsumer received message, msgId: {}, body: {} ", message.getMsgId(), new String(message.getBody()));}@Overridepublic void prepareStart(DefaultMQPushConsumer consumer) {// 设置从当前时间开始消费consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));// 设置最大重试次数consumer.setMaxReconsumeTimes(5);}
}

关于更多消费重试请见官网:https://rocketmq.apache.org/zh/docs/featureBehavior/10consumerretrypolicy

三、源码分析

查看org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainerinitRocketMQPushConsumer的源码:

private void initRocketMQPushConsumer() throws MQClientException {if (rocketMQListener == null && rocketMQReplyListener == null) {throw new IllegalArgumentException("Property 'rocketMQListener' or 'rocketMQReplyListener' is required");}Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");Assert.notNull(nameServer, "Property 'nameServer' is required");Assert.notNull(topic, "Property 'topic' is required");RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(),this.rocketMQMessageListener.accessKey(), this.rocketMQMessageListener.secretKey());boolean enableMsgTrace = rocketMQMessageListener.enableMsgTrace();if (Objects.nonNull(rpcHook)) {consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),enableMsgTrace, this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));consumer.setVipChannelEnabled(false);} else {log.debug("Access-key or secret-key not configure in " + this + ".");consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));}consumer.setNamespace(namespace);String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer());if (customizedNameServer != null) {consumer.setNamesrvAddr(customizedNameServer);} else {consumer.setNamesrvAddr(nameServer);}if (accessChannel != null) {consumer.setAccessChannel(accessChannel);}//set the consumer core thread number and maximum thread number has the same valueconsumer.setConsumeThreadMax(consumeThreadNumber);consumer.setConsumeThreadMin(consumeThreadNumber);consumer.setConsumeTimeout(consumeTimeout);consumer.setMaxReconsumeTimes(maxReconsumeTimes);consumer.setAwaitTerminationMillisWhenShutdown(awaitTerminationMillisWhenShutdown);consumer.setInstanceName(instanceName);switch (messageModel) {case BROADCASTING:consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);break;case CLUSTERING:consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);break;default:throw new IllegalArgumentException("Property 'messageModel' was wrong.");}switch (selectorType) {case TAG:consumer.subscribe(topic, selectorExpression);break;case SQL92:consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));break;default:throw new IllegalArgumentException("Property 'selectorType' was wrong.");}switch (consumeMode) {case ORDERLY:consumer.setMessageListener(new DefaultMessageListenerOrderly());break;case CONCURRENTLY:consumer.setMessageListener(new DefaultMessageListenerConcurrently());break;default:throw new IllegalArgumentException("Property 'consumeMode' was wrong.");}//if String is not is equal "true" TLS mode will represent the as default value falseconsumer.setUseTLS(new Boolean(tlsEnable));if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);} else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) {((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer);}} 

代码的最后几行,如果rocketMQListener实现了RocketMQPushConsumerLifecycleListener接口,则会调用RocketMQPushConsumerLifecycleListenerprepareStart(consumer)方法,很明显可以通过prepareStart方法来配置consuemr的扩展属性。

再由上述源码可知,根据consumeMode不同来设置不同的的MessageListener,分别是DefaultMessageListenerOrderlyDefaultMessageListenerConcurrently

  • DefaultMessageListenerOrderly实现接口MessageListenerOrderly,接口MessageListenerOrderly继承接口MessageListener

  • DefaultMessageListenerConcurrently实现接口MessageListenerConcurrently,接口MessageListenerConcurrently继承接口MessageListener

public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {@SuppressWarnings("unchecked")@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt messageExt : msgs) {log.debug("received msg: {}", messageExt);try {long now = System.currentTimeMillis();handleMessage(messageExt);long costTime = System.currentTimeMillis() - now;log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);} catch (Exception e) {log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
}

consumeMode选择为CONCURRENTLY举例:

consumer监听消息到达时,消息在进入自定义onMessage方法之前,会进入到DefaultRocketMQListenerContainer内部类DefaultMessageListenerConcurrently(实现MessageListenerConcurrently接口)的handleMessage方法来处理消息。

private void handleMessage(MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException {if (rocketMQListener != null) {rocketMQListener.onMessage(doConvertMessage(messageExt));} else if (rocketMQReplyListener != null) {Object replyContent = rocketMQReplyListener.onMessage(doConvertMessage(messageExt));Message<?> message = MessageBuilder.withPayload(replyContent).build();org.apache.rocketmq.common.message.Message replyMessage = MessageUtil.createReplyMessage(messageExt, convertToBytes(message));DefaultMQProducer producer = consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer();producer.setSendMsgTimeout(replyTimeout);producer.send(replyMessage, new SendCallback() {@Override public void onSuccess(SendResult sendResult) {if (sendResult.getSendStatus() != SendStatus.SEND_OK) {log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus());} else {log.debug("Consumer replies message success.");}}@Override public void onException(Throwable e) {log.error("Consumer replies message failed. error: {}", e.getLocalizedMessage());}});}
}

handleMessage方法会先调用doConvertMessage方法反序列化对象,再调用自定义onMessage方法并传递消息对象。

由上可知自定义onMessage方法发生异常时,异常会在DefaultMessageListenerConcurrentlyconsumeMessage方法中进行捕获,并返回ConsumeConcurrentlyStatus.RECONSUME_LATER表示消费失败;而在onMessage方法处理完成时,DefaultMessageListenerConcurrentlyconsumeMessage方法返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS表示消费成功。

ConsumeConcurrentlyStatus.RECONSUME_LATER消费失败后,先会进入%RETRY%+consumergroupTopic中,再到这个ConsumerGroup。如果一直这样重复消费都持续失败超过重试次数,就会投递到DLQ死信队列,默认死信队列topic名称为%DLQ%+consumergroup