> 文章列表 > RocketMQ 发送延时消息与顺序消息

RocketMQ 发送延时消息与顺序消息

RocketMQ 发送延时消息与顺序消息

前面我们已经知道怎么发送消息与消费消息,现在就看下其他的消息类型怎么处理。

发送延时消息

延迟消息发送是指消息发送到 RocketMQ 后,并不期望立马投递这条消息,而是延迟一定时间后才投递到 Consumer 进行消费。

在分布式定时调度触发、任务超时处理等场景,需要实现精准、可靠的延时事件触发。使用 RocketMQ 的延时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。

发送延时消息,需要设置延时的等级 setDelayTimeLevel(), 现在只支持固定的几个时间,详看 delayTimeLevel

// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

消息生产者

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;public class DelayProducer {public static void main(String[] args) throws Exception{// 1:实例化一个生产者来产生延时消息  指定生产组名称DefaultMQProducer producer = new DefaultMQProducer("produceGroup");// 2:设置 NameServer 的地址producer.setNamesrvAddr("192.168.152.130:9876");// 3:启动 Producer 实例producer.start();for (int i = 0; i < 10; i++) {Message message = new Message("TestTopic", ("Hello Delay message " + i).getBytes());// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)message.setDelayTimeLevel(2);// 发送消息SendResult sendResult = producer.send(message);System.out.println("发送状态:"+ sendResult.getSendStatus() + ", 消息ID:" + sendResult.getMsgId());}// 关闭生产者producer.shutdown();}
}

消息消费者

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class DelayConsumer {public static void main(String[] args) throws Exception{// 1. 创建消费者,指定消费者组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("produceGroup");// 2. 指定 NameSever 地址consumer.setNamesrvAddr("192.168.152.130:9876");// 3. 订阅主题 Topic 和 tagconsumer.subscribe("TestTopic", "*");// 4. 设置回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt message : messages) {// Print approximate delay time periodSystem.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者consumer.start();}
}

发送顺序消息

需要注意的是 RocketMQ 消息的顺序性分为两部分,生产顺序性和消费顺序性。只有同时满足了生产顺序性和消费顺序性才能达到上述的FIFO效果。

生产顺序性: RocketMQ 通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化。如需保证消息生产的顺序性,则必须满足以下条件:

  • 单一生产者: 消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的分区键,不同生产者之间产生的消息也无法判定其先后顺序。

  • 串行发送:生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。

消息生产者

首先我们创建一个 OrderStep 对象,用于作为消息体。

public class OrderStep {private long orderId;private String desc;public OrderStep(){}public OrderStep(long orderId, String desc){this.orderId = orderId;this.desc = desc;}public long getOrderId() {return orderId;}public void setOrderId(long orderId) {this.orderId = orderId;}public String getDesc() {return desc;}public void setDesc(String desc) {this.desc = desc;}@Overridepublic String toString() {return "OrderStep{" +"orderId=" + orderId +", desc='" + desc + '\\'' +'}';}public static List<OrderStep> buildOrders(){List<OrderStep> orderList = new ArrayList<>();orderList.add(new OrderStep(15103111039L,"创建"));orderList.add(new OrderStep(15103111065L,"创建"));OrderStep orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("推送");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("完成");orderList.add(orderDemo);return orderList;}
}

接下来,我们增加对应的消息生产者,

package com.demo.rocketmq.order;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.List;public class Producer {public static void main(String[] args) throws Exception {// 1:实例化消息生产者 Producer,  指定生产组名称DefaultMQProducer producer = new DefaultMQProducer("producerGroup");// 2. 指定 NameSever 地址producer.setNamesrvAddr("192.168.152.130:9876");// 3. 启动 producerproducer.start();// 订单列表List<OrderStep> orderList = OrderStep.buildOrders();for (int i = 0; i < orderList.size(); i++) {// 加个时间前缀String body = LocalDateTime.now() + " RocketMQ Queue Msg " + orderList.get(i);Message msg = new Message("OrderTopic", "order", "KEY" + i, body.getBytes());/*参数一: 消息对象参数二:消息选择器*/SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Long id = (Long) arg;  //根据订单id选择发送queuelong index = id % mqs.size();return mqs.get((int) index);}}, orderList.get(i).getOrderId());//订单idSystem.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",sendResult.getSendStatus(),sendResult.getMessageQueue().getQueueId(),body));}producer.shutdown();}
}

消息消费者

package com.demo.rocketmq.order;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class Consumer {public static void main(String[] args) throws Exception{// 1. 创建消费者,指定消费者组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("produceGroup");// 2. 指定 NameSever 地址consumer.setNamesrvAddr("192.168.152.130:9876");// 3. 订阅主题 Topic 和 tagconsumer.subscribe("OrderTopic", "order");// 4. 设置回调函数,处理消息consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {list.forEach( messageExt -> System.out.println(  Thread.currentThread().getName()  + ", " +  new String(messageExt.getBody())));return ConsumeOrderlyStatus.SUCCESS;}});// 启动消费者consumer.start();}
}