> 文章列表 > SpringBoot整合RabbitMQ实现死信队列

SpringBoot整合RabbitMQ实现死信队列

SpringBoot整合RabbitMQ实现死信队列

死信介绍

顾名思义就是无法被消费的消息。一般来说,Producer 将消息投递到 Broker 或者直接到 Queue 里了,Consumer 从 Queue 取出消息进行消费,但某些时候由于特定的原因导致 Queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列

应用场景

为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入到死信队列中。还有比如说:用户在商城下单成功并点击支付后再指定时间未支付时自动失效。

死信的三种情况

过期未被消费的消息

情况1

	队列设置统一消息设置过期时间导致消息过期
情况1代码测试
生产者定义队列与交换机
	/*** 正常队列---10s过期* @return*/@Beanpublic Queue normalQueue(){//ttl(10000):队列里面的消息统一10s过期--x-message-ttlreturn QueueBuilder.durable("normal-queue").ttl(10000).//绑定死信交换机deadLetterExchange("dlx-exchange").//设置路由键deadLetterRoutingKey("error").build();}/*** 死信队列* @return*/@Beanpublic Queue dlxQueue(){return QueueBuilder.durable("dlx-queue").build();}/*** 正常交换机* @return*/@Beanpublic DirectExchange normalExchange(){return ExchangeBuilder.directExchange("normal-exchange").build();}/*** 死信交换机* @return*/@Beanpublic DirectExchange dlxExchange(){return ExchangeBuilder.directExchange("dlx-exchange").build();}/*** 正常队列---10s过期与正常交换机绑定* @return*/@Beanpublic Binding dlxBind(){return BindingBuilder.bind(normalQueue()).to(normalExchange()).with("info");}/*** 死信队列与死信交换机绑定* @return*/@Beanpublic Binding dlxBind1(){return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("error");}/*** 项目重启时创建交换机与队列* @return*/@Beanpublic RabbitAdmin dlxRabbitAdmin(ConnectionFactory connectionFactory){RabbitAdmin rabbitAdmin=new RabbitAdmin(connectionFactory);rabbitAdmin.setAutoStartup(true);rabbitAdmin.declareExchange(dlxExchange());rabbitAdmin.declareExchange(normalExchange());rabbitAdmin.declareQueue(normalQueue());rabbitAdmin.declareQueue(dlxQueue());return rabbitAdmin;}
生产者发送消息
 	private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");/*** 组装消息* @param msg* @return*/private static Map<String, Object> createMsg(Object msg) {String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);Map<String,Object> message= Maps.newHashMap();message.put("sendTime",sdf.format(new Date()));message.put("msg", msg);message.put("msgId",msgId);return message;}@GetMapping("dlx1")@ApiOperation("死信:队列设置统一过期时间导致消息过期测试")public String dlx1(@RequestParam String msg,@RequestParam String routingKey){Map<String, Object> map = createMsg(msg);rabbitTemplate.convertAndSend("normal-exchange",routingKey,map);return "ok";}
消费者监听死信队列
	/*** 死信队列* @param message 消息* @param c 通道* @param msg 消息内容* @throws IOException*///使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致@RabbitListener(queuesToDeclare = @Queue(value = "dlx-queue"))public void dlxQueue(Message message,Channel c,Map msg) throws IOException {MessageProperties properties = message.getMessageProperties();String routingKey = properties.getReceivedRoutingKey();log.info("死信队列收到:{},路由键:{}",msg,routingKey);//手动回执,不批量签收,回执后才能处理下一批消息long tag = properties.getDeliveryTag();c.basicAck(tag,false);}
接口测试SpringBoot整合RabbitMQ实现死信队列
结果验证

由于队列设置统一过期时间10s,此消息若10s内没有被消费,消息会变成死信进入死信队列,结果如图
SpringBoot整合RabbitMQ实现死信队列

情况2

	消息本身设置过期时间导致消息过期
情况2代码测试
生产者定义队列与交换机
	/*** 正常队列* @return*/@Beanpublic Queue normalQueue2(){return QueueBuilder.durable("normal-queue2").//绑定死信交换机deadLetterExchange("dlx-exchange").//设置路由键deadLetterRoutingKey("error").build();}/*** 死信队列* @return*/@Beanpublic Queue dlxQueue(){return QueueBuilder.durable("dlx-queue").build();}/*** 正常交换机* @return*/@Beanpublic DirectExchange normalExchange(){return ExchangeBuilder.directExchange("normal-exchange").build();}/*** 死信交换机* @return*/@Beanpublic DirectExchange dlxExchange(){return ExchangeBuilder.directExchange("dlx-exchange").build();}/*** 死信队列与死信交换机绑定* @return*/@Beanpublic Binding dlxBind1(){return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("error");}/***  正常交换机与正常队列绑定* @return*/@Beanpublic Binding dlxBind3(){return BindingBuilder.bind(normalQueue2()).to(normalExchange()).with("trace");}/*** 项目重启时创建交换机与队列* @return*/@Beanpublic RabbitAdmin dlxRabbitAdmin(ConnectionFactory connectionFactory){RabbitAdmin rabbitAdmin=new RabbitAdmin(connectionFactory);rabbitAdmin.setAutoStartup(true);rabbitAdmin.declareExchange(dlxExchange());rabbitAdmin.declareExchange(normalExchange());rabbitAdmin.declareQueue(normalQueue2());rabbitAdmin.declareQueue(dlxQueue());return rabbitAdmin;}
生产者发送消息
 	private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");/*** 组装消息* @param msg* @return*/private static Map<String, Object> createMsg(Object msg) {String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);Map<String,Object> message= Maps.newHashMap();message.put("sendTime",sdf.format(new Date()));message.put("msg", msg);message.put("msgId",msgId);return message;}@GetMapping("dlx3")@ApiOperation("死信:消息本身设置过期导致消息过期测试")public String dlx3(@RequestParam String msg,@RequestParam String routingKey){Map<String, Object> map = createMsg(msg);rabbitTemplate.convertAndSend("normal-exchange", routingKey, map, message -> {MessageProperties properties = message.getMessageProperties();//设置消息10s过期,注:1.只有在消息被消费才会判断是否过期//2.若队列设置ttl同时消息设置过期时间,则以时间短得为主properties.setExpiration("10000");return message;});return "ok";}
消费者监听死信队列
	/*** 死信队列* @param message 消息* @param c 通道* @param msg 消息内容* @throws IOException*///使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致@RabbitListener(queuesToDeclare = @Queue(value = "dlx-queue"))public void dlxQueue(Message message,Channel c,Map msg) throws IOException {MessageProperties properties = message.getMessageProperties();String routingKey = properties.getReceivedRoutingKey();log.info("死信队列收到:{},路由键:{}",msg,routingKey);//手动回执,不批量签收,回执后才能处理下一批消息long tag = properties.getDeliveryTag();c.basicAck(tag,false);}
接口测试

SpringBoot整合RabbitMQ实现死信队列

结果验证

由于消息本身设置过期时间10s,此消息若10s内没有被消费,消息会变成死信进入死信队列,结果如图
SpringBoot整合RabbitMQ实现死信队列

1.消息只有在被消费时才会判断是否过期,所以ttl与死信队列实现不了真正的延迟队列,存在误差
2.若消息本身和队列都设置过期时间,以时间短的为主

达到消息队列长度限制无法进入队列的消息

生产者定义队列与交换机
	 /*** 正常队列---最大长度为10* @return*/@Beanpublic Queue normalQueue1(){return QueueBuilder.durable("normal-queue1").maxLength(10).//绑定死信交换机deadLetterExchange("dlx-exchange").//设置路由键deadLetterRoutingKey("error").build();}/*** 死信队列* @return*/@Beanpublic Queue dlxQueue(){return QueueBuilder.durable("dlx-queue").build();}/*** 正常交换机* @return*/@Beanpublic DirectExchange normalExchange(){return ExchangeBuilder.directExchange("normal-exchange").build();}/*** 死信交换机* @return*/@Beanpublic DirectExchange dlxExchange(){return ExchangeBuilder.directExchange("dlx-exchange").build();}/*** 死信队列与死信交换机绑定* @return*/@Beanpublic Binding dlxBind1(){return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("error");}/*** 正常队列---最大长度为10与正常交换机绑定* @return*/@Beanpublic Binding dlxBind2(){return BindingBuilder.bind(normalQueue1()).to(normalExchange()).with("debug");}@Beanpublic RabbitAdmin dlxRabbitAdmin(ConnectionFactory connectionFactory){RabbitAdmin rabbitAdmin=new RabbitAdmin(connectionFactory);rabbitAdmin.setAutoStartup(true);rabbitAdmin.declareExchange(dlxExchange());rabbitAdmin.declareExchange(normalExchange());rabbitAdmin.declareQueue(normalQueue1());rabbitAdmin.declareQueue(dlxQueue());return rabbitAdmin;}
生产者发送消息
 	private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");/*** 组装消息* @param msg* @return*/private static Map<String, Object> createMsg(Object msg) {String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);Map<String,Object> message= Maps.newHashMap();message.put("sendTime",sdf.format(new Date()));message.put("msg", msg);message.put("msgId",msgId);return message;}@GetMapping("dlx2")@ApiOperation("死信:队列长度测试")public String dlx2(@RequestParam String msg,@RequestParam String routingKey){for (int i = 0; i < 20; i++) {Map<String, Object> map = createMsg(msg);rabbitTemplate.convertAndSend("normal-exchange",routingKey,map);}return "ok";}
消费者监听死信队列
	/*** 死信队列* @param message 消息* @param c 通道* @param msg 消息内容* @throws IOException*///使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致@RabbitListener(queuesToDeclare = @Queue(value = "dlx-queue"))public void dlxQueue(Message message,Channel c,Map msg) throws IOException {MessageProperties properties = message.getMessageProperties();String routingKey = properties.getReceivedRoutingKey();log.info("死信队列收到:{},路由键:{}",msg,routingKey);//手动回执,不批量签收,回执后才能处理下一批消息long tag = properties.getDeliveryTag();c.basicAck(tag,false);}
接口测试

SpringBoot整合RabbitMQ实现死信队列

结果验证

由于队列长度限制为10,当队列里面消息数量达到10之后,后续的消息进不了队列,消息会变成死信进入死信队列,结果如图
SpringBoot整合RabbitMQ实现死信队列

消费者拒收且不重回队列的消息

生产者定义队列与交换机
	/*** 正常队列* @return*/@Beanpublic Queue normalQueue2(){return QueueBuilder.durable("normal-queue2").//绑定死信交换机deadLetterExchange("dlx-exchange").//设置路由键deadLetterRoutingKey("error").build();}/*** 死信队列* @return*/@Beanpublic Queue dlxQueue(){return QueueBuilder.durable("dlx-queue").build();}/*** 正常交换机* @return*/@Beanpublic DirectExchange normalExchange(){return ExchangeBuilder.directExchange("normal-exchange").build();}/*** 死信交换机* @return*/@Beanpublic DirectExchange dlxExchange(){return ExchangeBuilder.directExchange("dlx-exchange").build();}/*** 死信队列与死信交换机绑定* @return*/@Beanpublic Binding dlxBind1(){return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("error");}/***  正常交换机与正常队列绑定* @return*/@Beanpublic Binding dlxBind3(){return BindingBuilder.bind(normalQueue2()).to(normalExchange()).with("trace");}@Beanpublic RabbitAdmin dlxRabbitAdmin(ConnectionFactory connectionFactory){RabbitAdmin rabbitAdmin=new RabbitAdmin(connectionFactory);rabbitAdmin.setAutoStartup(true);rabbitAdmin.declareExchange(dlxExchange());rabbitAdmin.declareExchange(normalExchange());rabbitAdmin.declareQueue(normalQueue2());rabbitAdmin.declareQueue(dlxQueue());return rabbitAdmin;}
生产者发送消息
 	private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");/*** 组装消息* @param msg* @return*/private static Map<String, Object> createMsg(Object msg) {String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);Map<String,Object> message= Maps.newHashMap();message.put("sendTime",sdf.format(new Date()));message.put("msg", msg);message.put("msgId",msgId);return message;}@GetMapping("dlx5")@ApiOperation("死信:消息被拒收且不重回队列测试")public String dlx5(@RequestParam String msg,@RequestParam String routingKey){Map<String, Object> map = createMsg(msg);rabbitTemplate.convertAndSend("normal-exchange", routingKey, map);return "ok";}
消费者监听死信队列与拒收消息
	/*** 死信队列* @param message 消息* @param c 通道* @param msg 消息内容* @throws IOException*///使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致@RabbitListener(queuesToDeclare = @Queue(value = "dlx-queue"))public void dlxQueue(Message message,Channel c,Map msg) throws IOException {MessageProperties properties = message.getMessageProperties();String routingKey = properties.getReceivedRoutingKey();log.info("死信队列收到:{},路由键:{}",msg,routingKey);//手动回执,不批量签收,回执后才能处理下一批消息long tag = properties.getDeliveryTag();c.basicAck(tag,false);}/*** 拒收消息* @param message 消息* @param c 通道* @param msg 消息内容* @throws IOException*///使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致@RabbitListener(queuesToDeclare = @Queue(value = "normal-queue2",declare = "false"))public void reject(Message message,Channel c,Map msg) throws IOException {MessageProperties properties = message.getMessageProperties();String routingKey = properties.getReceivedRoutingKey();log.info("队列收到:{},路由键:{}",msg,routingKey);//手动回执,不批量签收,回执后才能处理下一批消息long tag = properties.getDeliveryTag();//设置消息拒收且不重回队列c.basicReject(tag,false);log.info("消息拒收");}
接口测试

SpringBoot整合RabbitMQ实现死信队列

结果验证

由于消息者拒收消息且设置消息不重回队列,死信队列则会收到此消息,结果如图

SpringBoot整合RabbitMQ实现死信队列
注:若队列设置删除自动删除的时间,到时队列会被删除,此时队列要如有消息则也会被删除,不会变成死信队列

队列删除,队列里面的消息不会变成死信

代码验证

生产者定义交换机与队列

	/*** 正常队列-30s后删除* @return*/@Beanpublic Queue normalQueue3(){//expires(30000):队列30s内被删除--x-expiresreturn QueueBuilder.durable("normal-queue3").expires(30000).//绑定死信交换机deadLetterExchange("dlx-exchange").//设置路由键deadLetterRoutingKey("error").build();}/*** 死信队列* @return*/@Beanpublic Queue dlxQueue(){return QueueBuilder.durable("dlx-queue").build();}/*** 正常交换机* @return*/@Beanpublic DirectExchange normalExchange(){return ExchangeBuilder.directExchange("normal-exchange").build();}/*** 死信交换机* @return*/@Beanpublic DirectExchange dlxExchange(){return ExchangeBuilder.directExchange("dlx-exchange").build();}/*** 死信队列与死信交换机绑定* @return*/@Beanpublic Binding dlxBind1(){return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("error");}/*** 正常队列--30s后删除与正常交换机绑定* @return*/@Beanpublic Binding dlxBind4(){return BindingBuilder.bind(normalQueue3()).to(normalExchange()).with("warn");}@Beanpublic RabbitAdmin dlxRabbitAdmin(ConnectionFactory connectionFactory){RabbitAdmin rabbitAdmin=new RabbitAdmin(connectionFactory);rabbitAdmin.setAutoStartup(true);rabbitAdmin.declareExchange(dlxExchange());rabbitAdmin.declareExchange(normalExchange());rabbitAdmin.declareQueue(normalQueue3());rabbitAdmin.declareQueue(dlxQueue());return rabbitAdmin;}
生产者发送消息
 	private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");/*** 组装消息* @param msg* @return*/private static Map<String, Object> createMsg(Object msg) {String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);Map<String,Object> message= Maps.newHashMap();message.put("sendTime",sdf.format(new Date()));message.put("msg", msg);message.put("msgId",msgId);return message;}/*** 不会进入死信队列* @param msg* @param routingKey* @return*/@GetMapping("dlx4")@ApiOperation("死信:队列删除,消息测试")public String dlx4(@RequestParam String msg,@RequestParam String routingKey){Map<String, Object> map = createMsg(msg);rabbitTemplate.convertAndSend("normal-exchange", routingKey, map);return "ok";}
消费者监听死信队列与拒收消息
	/*** 死信队列* @param message 消息* @param c 通道* @param msg 消息内容* @throws IOException*///使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致@RabbitListener(queuesToDeclare = @Queue(value = "dlx-queue"))public void dlxQueue(Message message,Channel c,Map msg) throws IOException {MessageProperties properties = message.getMessageProperties();String routingKey = properties.getReceivedRoutingKey();log.info("死信队列收到:{},路由键:{}",msg,routingKey);//手动回执,不批量签收,回执后才能处理下一批消息long tag = properties.getDeliveryTag();c.basicAck(tag,false);}
接口测试

SpringBoot整合RabbitMQ实现死信队列
向队列里面发送一条消息
SpringBoot整合RabbitMQ实现死信队列

结果验证

结果队列30s被删除,死信队列没有此消息
SpringBoot整合RabbitMQ实现死信队列

SpringBoot整合RabbitMQ实现死信队列