Spring Boot集成RocketMQ实现普通、延时、事务消息发送接收、PULL消费模式及开启ACL | Spring Cloud 30
一、前言
在前面我们通过以下章节对RocketMQ
有了基础的了解:
docker-compose 搭建RocketMQ 5.1.0 集群(双主双从模式) | Spring Cloud 28
docker-compose 搭建RocketMQ 5.1.0 集群开启ACL权限控制 | Spring Cloud 29
现在开始我们正式学习Spring Boot
中集成RocketMQ
使用,,在本章节主要进行对以下部分讲解说明:
二、项目集成RocketMQ
2.1 项目总体结构
2.2 引入依赖
rocketmq/pom.xml
:
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version>
</dependency>
2.3 配置文件
rocketmq/src/main/resources/application.yml
:
server:port: 8888spring:application:name: @artifactId@rocketmq:name-server: 192.168.0.30:9876producer:group: @artifactId@-groupsend-message-timeout: 60000 # 发送消息超时时间,单位:毫秒。默认为 3000retry-times-when-send-failed: 3 # 同步发送消息时,失败重试次数。默认为 2 次retry-times-when-send-async-failed: 2 # 异步发送消息时,失败重试次数。默认为 2 次retry-next-server: false # 发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 默认为 falseaccess-key: RocketMQAdmin # Access Keysecret-key: 1qaz@WSX # Secret Keyenable-msg-trace: true # 是否开启消息轨迹功能,默认为 true 开启customized-trace-topic: RMQ_SYS_TRACE_TOPIC # 自定义消息轨迹的 Topic,默认为 RMQ_SYS_TRACE_TOPICconsumer:access-key: RocketMQAdmin # Access Keysecret-key: 1qaz@WSX # Secret Keylogging:level:org:springframework:boot:autoconfigure:logging: info
2.4主题及消费组常量
com/gm/rocketmq/component/rocketmq/TopicConstants.java
:
package com.gm.rocketmq.component.rocketmq;/* 主题常量*/
public interface TopicConstants {String NORMAL_ROCKETMQ_TOPIC_TEST= "NORMAL_ROCKETMQ_TOPIC_TEST";String ORDERLY_ROCKETMQ_TOPIC_TEST= "ORDERLY_ROCKETMQ_TOPIC_TEST";String SCHEDULE_ROCKETMQ_TOPIC_TEST= "SCHEDULE_ROCKETMQ_TOPIC_TEST";String TRANSACTION_ROCKETMQ_TOPIC_TEST= "TRANSACTION_ROCKETMQ_TOPIC_TEST";String PULL_ROCKETMQ_TOPIC_TEST= "PULL_ROCKETMQ_TOPIC_TEST";String EXT_ROCKETMQ_TOPIC_TEST= "EXT_ROCKETMQ_TOPIC_TEST";String CONSUMER_GROUP = "_CONSUMER_GROUP";
}
三、各类型消息收发
3.1 普通消息
3.1.1 普通消息发送
@Autowiredprivate RocketMQTemplate rocketMQTemplate;/* 向rocketmq发送同步和异步消息*/@RequestMapping(value = "sendNormal", method = RequestMethod.GET)public String sendNormal() {rocketMQTemplate.send(TopicConstants.NORMAL_ROCKETMQ_TOPIC_TEST + ":sync", MessageBuilder.withPayload("同步发送消息").build());rocketMQTemplate.asyncSend(TopicConstants.NORMAL_ROCKETMQ_TOPIC_TEST + ":async", MessageBuilder.withPayload("异步发送消息").build(), new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("异步发送成功:{}", sendResult.getSendStatus().name());}@Overridepublic void onException(Throwable throwable) {log.info("异步发送失败:{}", throwable.getMessage());}});return "OK";}
3.1.2 普通消息接收
com/gm/rocketmq/component/rocketmq/NormalRocketMqListener.java
:
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;@Slf4j
@Service
@RocketMQMessageListener(topic = TopicConstants.NORMAL_ROCKETMQ_TOPIC_TEST, consumerGroup = TopicConstants.NORMAL_ROCKETMQ_TOPIC_TEST + TopicConstants.CONSUMER_GROUP, accessKey = "${rocketmq.consumer.access-key}", secretKey = "${rocketmq.consumer.secret-key}")
public class NormalRocketMqListener implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {log.info("普通订阅-接收到的信息:{}", s);}
}
@RocketMQMessageListener
注解参数说明:
consumerGroup
:消费者订阅组,它是必需的,并且必须是唯一的。topic
:主题名字,生产发送的主题名。consumeMode
:消费模式,可选择并发或有序接收消息;默认CONCURRENTLY
同时接收异步传递的消息。messageModel
:消息模式,默认CLUSTERING
集群消费;如果希望所有订阅者都接收消息,可以设置广播BROADCASTING
。consumeThreadMax
:消费者最大线程数,默认64
。consumeTimeout
:消息阻塞最长时间,默认15
分钟。nameServer
:服务器地址,默认读取配置文件地址,可以单独为消费者设置指定位置。selectorExpression
:消费指定的Tag标签的业务消息。Consumer
端ACL
功能需要在@RocketMQMessageListener
中进行配置Producer
端ACL
功能需要在配置文件中进行配置- 更多查看官方解释
3.2 顺序消息
3.2.1 顺序消息发送
/* 向rockertmq发送顺序消息,同步方式 @return*/@RequestMapping(value = "sendOrderlySync", method = RequestMethod.GET)public String sendOrderlySync() {// 订单列表List<OrderStep> orderList = buildOrders();for (int i = 0; i < 10; i++) {Message msg = MessageBuilder.withPayload(orderList.get(i).toString()).build();String orderId = String.valueOf(orderList.get(i).getOrderId());rocketMQTemplate.sendOneWayOrderly(TopicConstants.ORDERLY_ROCKETMQ_TOPIC_TEST + ":sync", msg, orderId);}return "OK";}/* rockertmq发送顺序消息,异步方式 @return*/@RequestMapping(value = "sendOrderlyAsync", method = RequestMethod.GET)public String sendOrderlyAsync() {// 订单列表List<OrderStep> orderList = buildOrders();for (int i = 0; i < 10; i++) {Message msg = MessageBuilder.withPayload(orderList.get(i).toString()).build();String orderId = String.valueOf(orderList.get(i).getOrderId());rocketMQTemplate.syncSendOrderly(TopicConstants.ORDERLY_ROCKETMQ_TOPIC_TEST + ":async", msg, orderId);}return "OK";}/* 订单的步骤*/private static class OrderStep {private long orderId;private String desc;public long getOrderId() {return orderId;}public void setOrderId(long orderId) {this.orderId = orderId;}public String getDesc() {return desc;}public void setDesc(String desc) {this.desc = desc;}@Overridepublic String toString() {return "OrderStep{" + "orderId=" + orderId + ", desc='" + desc + '\\'' + '}';}}/* 生成模拟订单数据*/private List<OrderStep> buildOrders() {List<OrderStep> orderList = new ArrayList<OrderStep>();OrderStep orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("推送");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("完成");orderList.add(orderDemo);return orderList;}
3.2.1 顺序消息接收
com/gm/rocketmq/component/rocketmq/OrderlyRocketMqListenerA.java
:
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;@Slf4j
@Service
@RocketMQMessageListener(topic = TopicConstants.ORDERLY_ROCKETMQ_TOPIC_TEST,consumerGroup = TopicConstants.ORDERLY_ROCKETMQ_TOPIC_TEST + TopicConstants.CONSUMER_GROUP,accessKey = "${rocketmq.consumer.access-key}", secretKey = "${rocketmq.consumer.secret-key}",consumeMode = ConsumeMode.ORDERLY)
public class OrderlyRocketMqListenerA implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {log.info("顺序订阅-接收到的信息:{}", s);}
}
com/gm/rocketmq/component/rocketmq/OrderlyRocketMqListenerB.java
:
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;@Slf4j
@Service
@RocketMQMessageListener(topic = TopicConstants.ORDERLY_ROCKETMQ_TOPIC_TEST,consumerGroup = TopicConstants.ORDERLY_ROCKETMQ_TOPIC_TEST + TopicConstants.CONSUMER_GROUP,accessKey = "${rocketmq.consumer.access-key}", secretKey = "${rocketmq.consumer.secret-key}",consumeMode = ConsumeMode.ORDERLY)
public class OrderlyRocketMqListenerB implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {log.info("顺序订阅-接收到的信息:{}", s);}
}
3.3 延时消息
3.3.1 延时消息发送
/* rockertmq发送延时消息 @return*/@RequestMapping(value = "sendSchedule", method = RequestMethod.GET)public String sendSchedule() {Message msg = MessageBuilder.withPayload("延时消息").build();rocketMQTemplate.syncSendDelayTimeSeconds(TopicConstants.SCHEDULE_ROCKETMQ_TOPIC_TEST + ":", msg, 20);log.info("延时消息-发布时间:{}", System.currentTimeMillis());return "OK";}
3.3.2 延时消息接收
com/gm/rocketmq/component/rocketmq/ScheduleRocketMqListener.java
:
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;@Slf4j
@Service
@RocketMQMessageListener(topic = TopicConstants.SCHEDULE_ROCKETMQ_TOPIC_TEST,consumerGroup = TopicConstants.SCHEDULE_ROCKETMQ_TOPIC_TEST + TopicConstants.CONSUMER_GROUP,accessKey = "${rocketmq.consumer.access-key}", secretKey = "${rocketmq.consumer.secret-key}")
public class ScheduleRocketMqListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {String msg = "内容:" + new String(message.getBody()) + ",时间:" + (System.currentTimeMillis() - message.getBornTimestamp()) + "ms later";log.info("延时订阅-接收到的信息:{}", msg);log.info("延时消息-接受时间:{}", System.currentTimeMillis());}
}
3.4 发送端事务消息
3.4.1 事务消息发送
/* rockertmq发送生产端事务消息 @return*/@RequestMapping(value = "sendTransaction", method = RequestMethod.GET)public String sendTransaction() {Message msg = MessageBuilder.withPayload("事务消息").build();TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(TopicConstants.TRANSACTION_ROCKETMQ_TOPIC_TEST + ":", msg, "自定义参数");log.info("事务消息-发布结果:{} = {}", result.getTransactionId(), result.getSendStatus());return "OK";}
com/gm/rocketmq/component/rocketmq/TransactionListenerImpl.java
:
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;@Slf4j
@RocketMQTransactionListener
class TransactionListenerImpl implements RocketMQLocalTransactionListener {private AtomicInteger transactionIndex = new AtomicInteger(0);private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();// 事务消息共有三种状态,提交状态、回滚状态、中间状态:// RocketMQLocalTransactionState.COMMIT: 提交事务,它允许消费者消费此消息。// RocketMQLocalTransactionState.ROLLBACK: 回滚事务,它代表该消息将被删除,不允许被消费。// RocketMQLocalTransactionState.UNKNOWN: 中间状态,它代表需要检查消息队列来确定状态。// executeLocalTransaction 方法来执行本地事务@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {String transactionId = msg.getHeaders().get("__transactionId__").toString();log.info("执行本地事务,transactionId:{}", transactionId);int value = transactionIndex.getAndIncrement();int status = value % 3;localTrans.put(transactionId, status);log.info("获取自定义参数:{}", arg);return RocketMQLocalTransactionState.UNKNOWN;}// checkLocalTransaction 方法用于检查本地事务状态,并回应消息队列的检查请求@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {String transactionId = msg.getHeaders().get("__transactionId__").toString();log.info("检查本地事务状态,transactionId:{}", transactionId);Integer status = localTrans.get(transactionId);if (null != status) {switch (status) {case 0:return RocketMQLocalTransactionState.UNKNOWN;case 1:return RocketMQLocalTransactionState.COMMIT;case 2:return RocketMQLocalTransactionState.ROLLBACK;}}return RocketMQLocalTransactionState.COMMIT;}
}
3.5 Pull模式消费
3.5.1 源码分析
在rocketmq-spring-boot-starter
中关于Pull
模式消费的自动配置,
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration
:
其中rocketmq.name-server
、rocketmq.pull-consumer.group
、rocketmq.pull-consumer.topic
三项配置为必填项。
剩余其他配置,org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.PullConsumer
:
由上可知,利用
rocketmq-spring-boot-starter
实现PULL
模式,只支持单Topic
的PULL
消费,要想对多个Topic
进行PULL
模式消费需要用到@ExtRocketMQConsumerConfiguration
。
3.5.2 PULL消费所需配置文件
配置文件新增pull-consumer
相关配置,完整rocketmq/src/main/resources/application.yml
:
rocketmq:name-server: 192.168.0.30:9876producer:group: @artifactId@-groupsend-message-timeout: 60000 # 发送消息超时时间,单位:毫秒。默认为 3000retry-times-when-send-failed: 3 # 同步发送消息时,失败重试次数。默认为 2 次retry-times-when-send-async-failed: 2 # 异步发送消息时,失败重试次数。默认为 2 次retry-next-server: false # 发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 默认为 falseaccess-key: RocketMQAdmin # Access Keysecret-key: 1qaz@WSX # Secret Keyenable-msg-trace: true # 是否开启消息轨迹功能,默认为 true 开启customized-trace-topic: RMQ_SYS_TRACE_TOPIC # 自定义消息轨迹的 Topic,默认为 RMQ_SYS_TRACE_TOPICconsumer:access-key: RocketMQAdmin # Access Keysecret-key: 1qaz@WSX # Secret Keypull-consumer:access-key: RocketMQAdmin # Access Keysecret-key: 1qaz@WSX # Secret Keytopic: PULL_ROCKETMQ_TOPIC_TESTgroup: PULL_ROCKETMQ_TOPIC_TEST_CONSUMER_GROUP
3.5.3 消息发送
/* 向ockertmq 消费端pull模式发生消息 @return*/@RequestMapping(value = "sendPull", method = RequestMethod.GET)public String pull() {for (int i = 0; i < 10; i++) {Message msg = MessageBuilder.withPayload("pull 消息" + i).build();rocketMQTemplate.syncSend(TopicConstants.PULL_ROCKETMQ_TOPIC_TEST + ":", msg);}for (int i = 0; i < 10; i++) {Message msg = MessageBuilder.withPayload("pull ext 消息" + i).build();rocketMQTemplate.syncSend(TopicConstants.EXT_ROCKETMQ_TOPIC_TEST + ":", msg);}return "OK";}
3.5.4 @ExtRocketMQConsumerConfiguration使用
此示例利用
@ExtRocketMQConsumerConfiguration
定义消费,声明消费的Topic
和消费组,或声明不同name-server
。
利用
@ExtRocketMQTemplateConfiguration
定义生产者,声明不同name-server
或者其他特定的属性来定义非标的RocketMQTemplate
。
com/gm/rocketmq/component/rocketmq/ExtRocketMQTemplate.java
:
import org.apache.rocketmq.spring.annotation.ExtRocketMQConsumerConfiguration;
import org.apache.rocketmq.spring.core.RocketMQTemplate;/* 可用于不同name-server或者其他特定的属性来定义非标的RocketMQTemplate,此示例定义消息Topic和消费者*/
@ExtRocketMQConsumerConfiguration(group = TopicConstants.EXT_ROCKETMQ_TOPIC_TEST + TopicConstants.CONSUMER_GROUP,topic = TopicConstants.EXT_ROCKETMQ_TOPIC_TEST)
public class ExtRocketMQTemplate extends RocketMQTemplate {
}
3.5.5 PULL模式消息接收
com/gm/rocketmq/component/rocketmq/PullConsumer.java
:
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.util.List;@Slf4j
@Component
public class PullConsumer implements CommandLineRunner {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Autowiredprivate ExtRocketMQTemplate extRocketMQTemplate;@Overridepublic void run(String... args) {while (true) {List<String> messages = rocketMQTemplate.receive(String.class, 5000);log.info("receive from rocketMQTemplate, messages={}", messages);messages = extRocketMQTemplate.receive(String.class, 5000);log.info("receive from extRocketMQTemplate, messages={}", messages);}}
}
3.6 源码
源码地址:https://gitee.com/gm19900510/springboot-cloud-example.git 中模块rocketmq
。