RocketMQ 发送批量消息、过滤消息和事务消息
前面我们知道RocketMQ 发送延时消息与顺序消息,现在我们看下怎么发送批量消息、过滤消息和事务消息。
发送批量消息
限制是这些批量消息应该有相同的 topic,相同的 waitStoreMsgOK,而且不能是延时消息。 此外,这一批消息的总大小不应超过4MB。
消息的生产者
package com.demo.rocketmq.batch;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;import java.util.ArrayList;
import java.util.List;/* 批量发送消息 限制是这些批量消息应该有相同的 topic,相同的 waitStoreMsgOK,而且不能是延时消息。 此外,这一批消息的总大小不应超过4MB。*/
public class BatchProducer {public static void main(String[] args) throws Exception {// 1:实例化消息生产者 Producer, 指定生产组名称DefaultMQProducer producer = new DefaultMQProducer("produceGroup");// 2:设置NameServer的地址producer.setNamesrvAddr("192.168.152.130:9876");// 3:启动Producer实例producer.start();// 4:创建消息String topic = "BatchTest";List<Message> messages = new ArrayList<>();messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));try {SendResult sendResult = producer.send(messages);// 5:通过 sendResult 返回消息是否成功送达System.out.printf("%s%n", sendResult);System.out.println("发送状态:"+ sendResult.getSendStatus() + ", 消息ID" + sendResult.getMsgId());} catch (Exception e) {e.printStackTrace();//处理error}// 6:如果不再发送消息,关闭Producer实例。producer.shutdown();}
}
消息的消费者
package com.demo.rocketmq.batch;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class BatchConsumer {public static void main(String[] args) throws Exception {// 1. 创建消费者,指定消费者组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("produceGroup");// 2. 指定 NameSever 地址consumer.setNamesrvAddr("192.168.152.130:9876");// 3. 订阅主题 Topic 和 tagconsumer.subscribe("BatchTest", "TagA");// 4. 设置回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {list.forEach( messageExt -> System.out.println(new String(messageExt.getBody())));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者consumer.start();}
}
消息分割
如果消息的总长度可能大于4MB时,这时候最好把消息进行分割
public class ListSplitter implements Iterator<List<Message>> {private final int SIZE_LIMIT = 1024 * 1024 * 4;private final List<Message> messages;private int currIndex;public ListSplitter(List<Message> messages) {this.messages = messages;}@Override public boolean hasNext() {return currIndex < messages.size();}@Override public List<Message> next() {int nextIndex = currIndex;int totalSize = 0;for (; nextIndex < messages.size(); nextIndex++) {Message message = messages.get(nextIndex);int tmpSize = message.getTopic().length() + message.getBody().length;Map<String, String> properties = message.getProperties();for (Map.Entry<String, String> entry : properties.entrySet()) {tmpSize += entry.getKey().length() + entry.getValue().length();}tmpSize = tmpSize + 20; // 增加日志的开销20字节if (tmpSize > SIZE_LIMIT) {//单个消息超过了最大的限制//忽略,否则会阻塞分裂的进程if (nextIndex - currIndex == 0) {//假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环nextIndex++;}break;}if (tmpSize + totalSize > SIZE_LIMIT) {break;} else {totalSize += tmpSize;}}List<Message> subList = messages.subList(currIndex, nextIndex);currIndex = nextIndex;return subList;}
}//把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {try {List<Message> listItem = splitter.next();producer.send(listItem);} catch (Exception e) {e.printStackTrace();//处理error}
}
过滤消息
RocketMQ 的消息过滤功能通过生产者和消费者对消息的属性、标签进行定义,并在 RocketMQ 服务端根据过滤条件进行筛选匹配,将符合条件的消息投递给消费者进行消费。
消息过滤主要通过以下几个关键流程实现:
-
生产者:生产者在初始化消息时预先为消息设置一些属性和标签,用于后续消费时指定过滤目标。
-
消费者:消费者在初始化及后续消费流程中通过调用订阅关系注册接口,向服务端上报需要订阅指定主题的哪些消息,即过滤条件。
-
服务端:消费者获取消息时会触发服务端的动态过滤计算,Apache RocketMQ 服务端根据消费者上报的过滤条件的表达式进行匹配,并将符合条件的消息投递给消费者。
详细的可以看下官网
这里使用 Tag 过滤,在大多数情况下,TAG是一个简单而有用的设计,其可以来选择想要的消息。例如:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
发送消息的时候,还是按照正常的方式发送,在消费消息的时候,修改下对应的 tag 表达式就好
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class TagConsumer {public static void main(String[] args) throws Exception{// 1. 创建消费者,指定消费者组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("produceGroup");// 2. 指定 NameSever 地址consumer.setNamesrvAddr("192.168.152.130:9876");// 3. 订阅主题 Topic 和 tagconsumer.subscribe("TagFilterTopicTest", "TagA || TagB"); // 这块做了修改// 4. 设置回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {list.forEach( messageExt -> System.out.println(new String(messageExt.getBody())));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者consumer.start();}
}
事务消息
这里是消息支持事务操作,如果发送消息失败,就可以回滚当前的操作。
整个事务消息的详细交互流程如下图所示:
生产者
对应的生产者需要添加 事务监听器 ,如果返回的状态值为 LocalTransactionState.UNKNOW
; 就会进行回查消息
producer.setTransactionListener(new TransactionListener() {/* 在该方法中执行本地事务* @param message* @param o* @return*/@Overridepublic LocalTransactionState executeLocalTransaction(Message message, Object o) {System.out.println(o);// 如果是 TagA 就执行提交if (StringUtils.equals("TagA", message.getTags())) {return LocalTransactionState.COMMIT_MESSAGE;} else if (StringUtils.equals("TagB", message.getTags())) {// 如果是 TagB 就执行回滚return LocalTransactionState.ROLLBACK_MESSAGE;} else if (StringUtils.equals("TagC", message.getTags())) {// 如果是 TagB 就返回 UNKNOW, 返回 UNKNOW 的时候,调用下面的回查方法return LocalTransactionState.UNKNOW;} else {return LocalTransactionState.UNKNOW;}}/* 该方法是 MQ 进行消息事务状态回查* @param messageExt* @return*/@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt messageExt) {System.out.println("回查的消息 Tag:" + messageExt.getTags() + ", 消息内容:" + new String(messageExt.getBody()));return LocalTransactionState.COMMIT_MESSAGE;}
});
完整代码
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;import java.util.Map;
import java.util.concurrent.TimeUnit;/* 发送事务消息* @author wuq* @Time 2023-1-5 17:46* @Description*/
public class TransactionProducer {public static void main(String[] args) throws Exception {// 1:实例化消息生产者 Producer, 指定生产组名称TransactionMQProducer producer = new TransactionMQProducer("produceGroup");// 2:设置NameServer的地址producer.setNamesrvAddr("192.168.220.129:9876");// 3: 添加监听器producer.setTransactionListener(new TransactionListener() {/* 在该方法中执行本地事务* @param message* @param o* @return*/@Overridepublic LocalTransactionState executeLocalTransaction(Message message, Object o) {System.out.println(o);// 如果是 TagA 就执行提交if (StringUtils.equals("TagA", message.getTags())) {return LocalTransactionState.COMMIT_MESSAGE;} else if (StringUtils.equals("TagB", message.getTags())) {// 如果是 TagB 就执行回滚return LocalTransactionState.ROLLBACK_MESSAGE;} else if (StringUtils.equals("TagC", message.getTags())) {// 如果是 TagB 就返回 UNKNOW, 返回 UNKNOW 的时候,调用下面的回查方法return LocalTransactionState.UNKNOW;} else {return LocalTransactionState.UNKNOW;}}/* 该方法是 MQ 进行消息事务状态回查* @param messageExt* @return*/@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt messageExt) {System.out.println("回查的消息 Tag:" + messageExt.getTags() + ", 消息内容:" + new String(messageExt.getBody()));return LocalTransactionState.COMMIT_MESSAGE;}});// 启动生成者producer.start();String[] tags = {"TagA", "TagB", "TagC"};for (int i = 0; i < 3; i++) {// 4:创建消息,并指定 Topic,Tag 和 消息体Message msg = new Message("TransactionTopic", tags[i], ("RocketMQ Sync Msg " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// 5:发送消息到一个Broker, 第二个参数:执行本地事务的回调参数SendResult sendResult = producer.sendMessageInTransaction(msg, Map.of("callback", "true"));// 通过sendResult返回消息是否成功送达System.out.printf("%s%n", sendResult);System.out.println("发送状态:"+ sendResult.getSendStatus() + ", 消息ID" + sendResult.getMsgId());TimeUnit.SECONDS.sleep(3);}// 6:如果不再发送消息,关闭Producer实例。
// producer.shutdown();}
}
对应的消费者
在测试的时候发现,会出现 tagC 两次被回查的情况,这里可能是需要做幂等控制
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;public class TransactionConsumer {public static void main(String[] args) throws Exception {// 1. 创建消费者,指定消费者组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("produceGroup");// 2. 指定 NameSever 地址consumer.setNamesrvAddr("192.168.220.129:9876");// 3. 订阅主题 Topic 和 tagconsumer.subscribe("TransactionTopic", "*");// 4. 设置回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {list.forEach(messageExt -> {String body = new String(messageExt.getBody());String tags = messageExt.getTags();System.out.println("tag:" + tags + ", body: "+ body);});return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者consumer.start();}
}