RabbitMQ快速入门
文章目录
RabbitMQ快速入门
1、Java原生
这个就放这里做一个参考,这个一般是不常用,但是还是知其然而知所以然,了解一下
创建Maven工程
坐标
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.10.0</version>
</dependency>
Producer
public class Producer {public static void main(String[] args) {// 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 配置连接属性connectionFactory.setHost("192.168.84.131");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("lzy");connectionFactory.setPassword("lzy");// 连接Connection connection = null;// 通道Channel channel = null;try {// 从连接工厂中获得连接connection = connectionFactory.newConnection("生产者");// 从连接中获得通道channel = connection.createChannel();// 要发送的消息String message = "direct-exchange信息";// 交换机的名字String exchangeName = "direct-exchange_ooo";// 交换机的类型String exchangeType = "direct"; // direct、fanout、topic、headers/* 创建路由* @params1: 交换机exchange* @params2: 交换机类型* @params3: 是否持久化*/channel.exchangeDeclare(exchangeName, exchangeType, true);/* 声明队列* @params1: 交换机exchange* @params2: 交换机类型* @params3: 是否持久化*/channel.queueDeclare("queue-5",true, false, false, null);channel.queueDeclare("queue-6",true, false, false, null);channel.queueDeclare("queue-7",true, false, false, null);/* 绑定路由与队列* @params1: 交换机exchange* @params2: 队列名称/routing* @params3: 属性配置* @params4: 发送消息的内容*/channel.queueBind("queue-5", exchangeName, "order");channel.queueBind("queue-6", exchangeName, "order");channel.queueBind("queue-7", exchangeName, "course");/* 发送消息* @params1: 交换机exchange* @params2: 队列名称/routing* @params3: 属性配置* @params4: 发送消息的内容*/channel.basicPublish(exchangeName, "order", null, message.getBytes());System.out.println("消息发送成功!");} catch (Exception e) {e.printStackTrace();System.out.println("发送消息发生异常");} finally {if(channel != null && channel.isOpen()){try{channel.close();}catch (Exception e){e.printStackTrace();}}if(connection != null){try {connection.close();}catch (Exception e){e.printStackTrace();}}}}
}
Consumer
public class Consumer {private static Runnable runnable = () ->{// 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 配置连接属性connectionFactory.setHost("192.168.84.131");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("lzy");connectionFactory.setPassword("lzy");// 队列的名称final String queueName = Thread.currentThread().getName();// 连接Connection connection = null;// 通道Channel channel = null;try {// 从连接工厂中获得连接connection = connectionFactory.newConnection("生产者");// 从连接中获得通道channel = connection.createChannel();Channel finalChannel = channel;finalChannel.basicConsume(queueName, true,new DeliverCallback() {@Overridepublic void handle(String s, Delivery delivery) throws IOException {System.out.println(queueName + ":收到的消息是:" + new String(delivery.getBody(), "UTF-8"));}},new CancelCallback() {@Overridepublic void handle(String s) throws IOException {System.out.println("");}});System.out.println(queueName + "开始接受信息!");System.in.read();} catch (Exception e) {e.printStackTrace();System.out.println("接受消息发生异常");} finally {if(channel != null && channel.isOpen()){try{channel.close();}catch (Exception e){e.printStackTrace();}}if(connection != null){try {connection.close();}catch (Exception e){e.printStackTrace();}}}};public static void main(String[] args) {new Thread(runnable, "queue-5").start();new Thread(runnable, "queue-6").start();new Thread(runnable, "queue-7").start();}
}
Work工作模式
轮询
Producer
public class Produce {public static void main(String[] args) {// 1: 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 设置连接属性connectionFactory.setHost("192.168.84.131");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("lzy");connectionFactory.setPassword("lzy");Connection connection = null;Channel channel = null;try {// 3: 从连接工厂中获取连接connection = connectionFactory.newConnection("生产者");// 4: 从连接中获取通道channelchannel = connection.createChannel();// 6: 准备发送消息的内容//===============================end topic模式==================================for (int i = 1; i <= 20; i++) {//消息的内容String msg = "学相伴:" + i;// 7: 发送消息给中间件rabbitmq-server// @params1: 交换机exchange// @params2: 队列名称/routingkey// @params3: 属性配置// @params4: 发送消息的内容channel.basicPublish("", "queue1", null, msg.getBytes());}System.out.println("消息发送成功!");} catch (Exception ex) {ex.printStackTrace();System.out.println("发送消息出现异常...");} finally {// 7: 释放连接关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}
}
Work1
public class Work1 {public static void main(String[] args) {// 1: 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 设置连接属性connectionFactory.setHost("192.168.84.131");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("lzy");connectionFactory.setPassword("lzy");Connection connection = null;Channel channel = null;try {// 3: 从连接工厂中获取连接connection = connectionFactory.newConnection("消费者-Work1");// 4: 从连接中获取通道channelchannel = connection.createChannel();// 5: 申明队列queue存储消息/ 如果队列不存在,则会创建* Rabbitmq不允许创建两个相同的队列名称,否则会报错。 @params1: queue 队列的名称* @params2: durable 队列是否持久化* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。* */// 这里如果queue已经被创建过一次了,可以不需要定义// channel.queueDeclare("queue1", false, false, false, null);// 同一时刻,服务器只会推送一条消息给消费者// 6: 定义接受消息的回调Channel finalChannel = channel;// finalChannel.basicQos(1);finalChannel.basicConsume("queue1", true, new DeliverCallback() {@Overridepublic void handle(String s, Delivery delivery) throws IOException {try{System.out.println("Work1-收到消息是:" + new String(delivery.getBody(), "UTF-8"));Thread.sleep(2000);}catch(Exception ex){ex.printStackTrace();}}}, new CancelCallback() {@Overridepublic void handle(String s) throws IOException {}});System.out.println("Work1-开始接受消息");System.in.read();} catch (Exception ex) {ex.printStackTrace();System.out.println("发送消息出现异常...");} finally {// 7: 释放连接关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null && connection.isOpen()) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}
}
Work2
public class Work2 {public static void main(String[] args) {// 1: 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 设置连接属性connectionFactory.setHost("192.168.84.131");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("lzy");connectionFactory.setPassword("lzy");Connection connection = null;Channel channel = null;try {// 3: 从连接工厂中获取连接connection = connectionFactory.newConnection("消费者-Work2");// 4: 从连接中获取通道channelchannel = connection.createChannel();// 5: 申明队列queue存储消息/ 如果队列不存在,则会创建* Rabbitmq不允许创建两个相同的队列名称,否则会报错。 @params1: queue 队列的名称* @params2: durable 队列是否持久化* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。* */// 这里如果queue已经被创建过一次了,可以不需要定义//channel.queueDeclare("queue1", false, true, false, null);// 同一时刻,服务器只会推送一条消息给消费者//channel.basicQos(1);// 6: 定义接受消息的回调Channel finalChannel = channel;// finalChannel.basicQos(1);finalChannel.basicConsume("queue1", true, new DeliverCallback() {@Overridepublic void handle(String s, Delivery delivery) throws IOException {try{System.out.println("Work2-收到消息是:" + new String(delivery.getBody(), "UTF-8"));Thread.sleep(200);}catch(Exception ex){ex.printStackTrace();}}}, new CancelCallback() {@Overridepublic void handle(String s) throws IOException {}});System.out.println("Work2-开始接受消息");System.in.read();} catch (Exception ex) {ex.printStackTrace();System.out.println("发送消息出现异常...");} finally {// 7: 释放连接关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null && connection.isOpen()) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}
}
这样的话,发送了20条信息,虽然两个消费者sleep的时间不一样,但是每个消费者固定消费10个任务,这就是轮询
公平分发
Produce
public class Produce {public static void main(String[] args) {// 1: 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 设置连接属性connectionFactory.setHost("192.168.84.131");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("lzy");connectionFactory.setPassword("lzy");Connection connection = null;Channel channel = null;try {// 3: 从连接工厂中获取连接connection = connectionFactory.newConnection("生产者");// 4: 从连接中获取通道channelchannel = connection.createChannel();// 6: 准备发送消息的内容//===============================end topic模式==================================for (int i = 1; i <= 20; i++) {//消息的内容String msg = "学相伴:" + i;// 7: 发送消息给中间件rabbitmq-server// @params1: 交换机exchange// @params2: 队列名称/routingkey// @params3: 属性配置// @params4: 发送消息的内容channel.basicPublish("", "queue1", null, msg.getBytes());}System.out.println("消息发送成功!");} catch (Exception ex) {ex.printStackTrace();System.out.println("发送消息出现异常...");} finally {// 7: 释放连接关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}
}
Work1
public class Work1 {public static void main(String[] args) {// 1: 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 设置连接属性connectionFactory.setHost("192.168.84.131");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("lzy");connectionFactory.setPassword("lzy");Connection connection = null;Channel channel = null;try {// 3: 从连接工厂中获取连接connection = connectionFactory.newConnection("消费者-Work1");// 4: 从连接中获取通道channelchannel = connection.createChannel();// 5: 申明队列queue存储消息/ 如果队列不存在,则会创建* Rabbitmq不允许创建两个相同的队列名称,否则会报错。 @params1: queue 队列的名称* @params2: durable 队列是否持久化* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。* */// 这里如果queue已经被创建过一次了,可以不需要定义// channel.queueDeclare("queue1", false, false, false, null);// 同一时刻,服务器只会推送一条消息给消费者// 6: 定义接受消息的回调Channel finalChannel = channel;finalChannel.basicQos(1);finalChannel.basicConsume("queue1", false, new DeliverCallback() {@Overridepublic void handle(String s, Delivery delivery) throws IOException {try{System.out.println("Work1-收到消息是:" + new String(delivery.getBody(), "UTF-8"));Thread.sleep(2000);finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}catch(Exception ex){ex.printStackTrace();}}}, new CancelCallback() {@Overridepublic void handle(String s) throws IOException {}});System.out.println("Work1-开始接受消息");System.in.read();} catch (Exception ex) {ex.printStackTrace();System.out.println("发送消息出现异常...");} finally {// 7: 释放连接关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null && connection.isOpen()) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}
}
Work2
public class Work2 {public static void main(String[] args) {// 1: 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2: 设置连接属性connectionFactory.setHost("192.168.84.131");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("lzy");connectionFactory.setPassword("lzy");Connection connection = null;Channel channel = null;try {// 3: 从连接工厂中获取连接connection = connectionFactory.newConnection("消费者-Work2");// 4: 从连接中获取通道channelchannel = connection.createChannel();// 5: 申明队列queue存储消息/ 如果队列不存在,则会创建* Rabbitmq不允许创建两个相同的队列名称,否则会报错。 @params1: queue 队列的名称* @params2: durable 队列是否持久化* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。* */// 这里如果queue已经被创建过一次了,可以不需要定义//channel.queueDeclare("queue1", false, true, false, null);// 同一时刻,服务器只会推送一条消息给消费者//channel.basicQos(1);// 6: 定义接受消息的回调Channel finalChannel = channel;finalChannel.basicQos(1);finalChannel.basicConsume("queue1", false, new DeliverCallback() {@Overridepublic void handle(String s, Delivery delivery) throws IOException {try{System.out.println("Work2-收到消息是:" + new String(delivery.getBody(), "UTF-8"));Thread.sleep(200);finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}catch(Exception ex){ex.printStackTrace();}}}, new CancelCallback() {@Overridepublic void handle(String s) throws IOException {}});System.out.println("Work2-开始接受消息");System.in.read();} catch (Exception ex) {ex.printStackTrace();System.out.println("发送消息出现异常...");} finally {// 7: 释放连接关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (Exception ex) {ex.printStackTrace();}}if (connection != null && connection.isOpen()) {try {connection.close();} catch (Exception ex) {ex.printStackTrace();}}}}
}
这样的话,是Work2消费的任务远远大于Work1,根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配。
小结
- 消费者一次接收一条消息,代码channel.BasicQos(0, 1, false);
- 公平分发需要消费者开启手动应答,关闭自动应答
- 关闭自动应答代码channel.BasicConsume(“queue_test”, false, consumer);
- 消费者开启手动应答代码:channel.BasicAck(ea.DeliveryTag, false);
2、SpringBoot快速整合
2.1、Fanout
2.1.1、Producer
创建SpingBoot项目
坐标
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.yaml
# 服务端口
server:port: 8080
# 配置rabbitmq服务
spring:rabbitmq:username: lzypassword: lzyvirtual-host: /host: 192.168.84.131port: 5672
OrderService
@Component
public class OrderService {@Autowiredprivate RabbitTemplate rabbitTemplate;// 交换机的名称private String exchangeName = "fanout_order_exchange";//路由keyprivate String routeKey = "";public void makeOrder(Long userId, Long productId, int num) {String orderNumer = UUID.randomUUID().toString();System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);// 发送订单信息给RabbitMQ fanoutrabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer);}
}
FanoutRabbitConfig
这里就是相当于把之前java原生的创建交换机、创建队列、绑定交换机和队列,放到了springboot的配置中去了
@Configuration
public class FanoutRabbitConfig{//队列 起名:TestDirectQueue@Beanpublic Queue emailQueue() {// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。// return new Queue("TestDirectQueue",true,true,false);//一般设置一下队列的持久化就好,其余两个就是默认falsereturn new Queue("email.fanout.queue", true);}@Beanpublic Queue smsQueue() {// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。// return new Queue("TestDirectQueue",true,true,false);//一般设置一下队列的持久化就好,其余两个就是默认falsereturn new Queue("sms.fanout.queue", true);}@Beanpublic Queue weixinQueue() {// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。// return new Queue("TestDirectQueue",true,true,false);//一般设置一下队列的持久化就好,其余两个就是默认falsereturn new Queue("weixin.fanout.queue", true);}//Direct交换机 起名:TestDirectExchange@Beanpublic DirectExchange fanoutOrderExchange() {// return new DirectExchange("TestDirectExchange",true,true);return new DirectExchange("fanout_order_exchange", true, false);}//绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting@Beanpublic Binding bindingDirect1() {return BindingBuilder.bind(weixinQueue()).to(fanoutOrderExchange()).with("");}@Beanpublic Binding bindingDirect2() {return BindingBuilder.bind(smsQueue()).to(fanoutOrderExchange()).with("");}@Beanpublic Binding bindingDirect3() {return BindingBuilder.bind(emailQueue()).to(fanoutOrderExchange()).with("");}
}
2.1.2、Consumer
创建SpingBoot项目
坐标
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.yaml
# 服务端口
server:port: 8080
# 配置rabbitmq服务
spring:rabbitmq:username: lzypassword: lzyvirtual-host: /host: 192.168.84.131port: 5672
EmailService
@RabbitListener(queues = {"email.fanout.queue"})
@Component
public class EmailService {// @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值@RabbitHandlerpublic void messagerevice(String message){// 此处省略发邮件的逻辑System.out.println("email-------------->" + message);}
}
SMSService
@RabbitListener(queues = {"sms.fanout.queue"})
@Component
public class SMSService {// @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值@RabbitHandlerpublic void messagerevice(String message){// 此处省略发邮件的逻辑System.out.println("sms-------------->" + message);}
}
WeixinService
@RabbitListener(queues = {"weixin.fanout.queue"})
@Component
public class WeixinService {// @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值@RabbitHandlerpublic void messagerevice(String message){// 此处省略发邮件的逻辑System.out.println("weixin-------------->" + message);}
}
2.1.3、测试
- 在Producer中搞一个测试类,顺便看看web页面中出现了交换机和队列了没有
@SpringBootTest
class SspringbootRabbitmqFanoutProducerApplicationTests {@AutowiredOrderService orderService;@Testpublic void contextLoads() throws Exception {for (int i = 0; i < 10; i++) {Thread.sleep(1000);Long userId = 100L + i;Long productId = 10001L + i;int num = 10;orderService.makeOrder(userId, productId, num);}}
}
- 将Consumer启动起来,等待接收消息
- 然后在Consumer中看消息发送的情况
2.2、Direct
大致是一样的,然后因为Direct有路由key所以,要设置一下,这里有两种方法
-
使用配置文件绑定
-
使用注解创建交换机、队列、绑定
@RabbitListener(bindings =@QueueBinding(value = @Queue(value = "email.direct.queue",autoDelete = "false"),exchange = @Exchange(value = "direct_order_exchange",type = ExchangeTypes.DIRECT), key = "email"
))
@Component
public class EmailService {// @RabbitHandler 代表此方法是一个消息接收的方法。该不要有返回值@RabbitHandlerpublic void messagerevice(String message){// 此处省略发邮件的逻辑System.out.println("email-------------->" + message);}
}
自我感觉用配置文件比用注解在视觉上更加简洁
最后我们在生产者这里规定好路由key就可以给满足条件的队列分发消息了
2.3、Topic
和上面那个差不多,也有两种方式去搞这个模糊匹配
3、设置过期时间TTL
3.1、对队列进行设置
@Configuration
public class TTLConfig {@Beanpublic DirectExchange ttl(){return new DirectExchange("ttl_exchange");}@Beanpublic Queue ttlQueue(){HashMap<String, Object> map = new HashMap<>();map.put("x-message-ttl", 5000);return new Queue("ttl_queue", true, false, false, map);}@Beanpublic Binding direct(){return BindingBuilder.bind(ttlQueue()).to(ttl()).with("ttl");}
}
创建队列的时候做点手脚就可以了,加一个过期参数,还有标志
3.2、对消息进行设置
对消息进行设置的话
public void makeOrder1(Long userId, Long productId, int num) {String orderNumer = UUID.randomUUID().toString();System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("5000");message.getMessageProperties().setContentEncoding("UTF-8");return message;}};rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer, messagePostProcessor);
}
我看网上有好多的写法,了解几种就行了
如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就称为dead message被投递到死信队列, 消费者将无法再收到该消息。
4、死信队列
DLX,全称为Dead-Letter-Exchange , 可以称之为死信交换机,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是DLX ,绑定DLX的队列就称之为死信队列。
消息变成死信,可能是由于以下的原因
- 消息被拒绝
- 消息过期
- 队列达到最大长度
Config
@Configuration
public class RabbitMQConfig {// 死信交换机@Beanpublic DirectExchange deadDirect(){return new DirectExchange("dead_direct_exchange");}@Beanpublic Queue deadQueue(){return new Queue("dead_queue", true);}@Beanpublic Binding deadBind(){return BindingBuilder.bind(deadQueue()).to(deadDirect()).with("dead");}@Beanpublic DirectExchange ttlDirect(){return new DirectExchange("ttl_direct_exchange");}@Beanpublic Queue ttlQueue(){Map<String, Object> args = new HashMap<>();// 过期时间args.put("x-message-ttl", 5000);// 最大长度args.put("x-max-length", 5);// 绑定死信交换机args.put("x-dead-letter-exchange", "dead_direct_exchange");// 条件args.put("x-dead-letter-routing-key", "dead");return new Queue("ttl_queue", true, false, false, args);}@Beanpublic Binding direct(){return BindingBuilder.bind(ttlQueue()).to(ttlDirect()).with("ttl");}
}
OrderService
@Component
public class OrderService {@Autowiredprivate RabbitTemplate rabbitTemplate;// 1: 定义交换机private String exchangeName = "ttl_direct_exchange";// 2: 路由keyprivate String routeKey = "ttl";public void makeOrder(Long userId, Long productId, int num) {String orderNumer = UUID.randomUUID().toString();System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);// 发送订单信息给RabbitMQrabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer);}
}
测试
@SpringBootTest
class DeadQueueApplicationTests {@AutowiredOrderService orderService;@Testpublic void contextLoads() throws Exception {for (int i = 0; i < 10; i++) {orderService.makeOrder(1L, 2L, 12);}}
}
5、内存监控
当这里超过的话,那么所有的队列都会阻塞掉
通过命令行去修改内存
rabbitmqctl set_vm_memory_high_watermark <fraction> // 相对大小(默认0.4)
rabbitmqctl set_vm_memory_high_watermark absolute 50MB // 绝对大小
但是这种方式有弊端:通过此命令修改阈值在Broker重启以后将会失效,通过修改配置文件方式设置的阈值则不会随着重启而消失,但修改了配置文件一样要重启broker才会生效。
通过配置文件修改
我使用的是docker的rabbitmq,配置文件在这个位置
#默认
#vm_memory_high_watermark.relative = 0.4
# 使用relative相对值进行设置fraction,建议取值在04~0.7之间,不建议超过0.7.
vm_memory_high_watermark.relative = 0.6
# 使用absolute的绝对值的方式,但是是KB,MB,GB对应的命令如下
vm_memory_high_watermark.absolute = 2GB
6、内存换页
在某个Broker节点及内存阻塞生产者之前,它会尝试将队列中的消息换页到磁盘以释放内存空间,持久化和非持久化的消息都会写入磁盘中,其中持久化的消息本身就在磁盘中有一个副本,所以在转移的过程中持久化的消息会先从内存中清除掉。
默认情况下,内存到达的阈值是50%时就会换页处理。
也就是说,在默认情况下该内存的阈值是0.4的情况下,当内存超过0.4*0.5=0.2时,会进行换页动作。
命令行
vm_memory_high_watermark.relative = 0.4
vm_memory_high_watermark_paging_ratio = 0.7(设置小于1的值)
7、磁盘监控
当磁盘的剩余空间低于确定的阈值时,RabbitMQ同样会阻塞生产者,这样可以避免因非持久化的消息持续换页而耗尽磁盘空间导致服务器崩溃。
默认情况下:磁盘预警为50MB的时候会进行预警。表示当前磁盘空间第50MB的时候会阻塞生产者并且停止内存消息换页到磁盘的过程。
这个阈值可以减小,但是不能完全的消除因磁盘耗尽而导致崩溃的可能性。比如在两次磁盘空间的检查空隙内,第一次检查是:60MB ,第二检查可能就是1MB,就会出现警告。
命令行
rabbitmqctl set_disk_free_limit <disk_limit>
rabbitmqctl set_disk_free_limit memory_limit <fraction>
disk_limit:固定单位 KB MB GB
fraction :是相对阈值,建议范围在:1.0~2.0之间。(相对于内存)
配置文件
disk_free_limit.relative = 3.0
disk_free_limit.absolute = 50mb