采用RibbaitMq延迟队列官方插件实现延迟队列
RibbaitMq插件实现延迟队列主要是延迟消息到交换机的时间。
TTL+死信队列实现延时队列:正常消息过期没有被消费掉,进入死信队列后立即消息。
本章主要采用第一种方式。
一、前期准备工作
1.安装RabbirMQ自行百度或者参考推荐资源:
RabbitMQ部署-Windows篇 - 知乎
RabbitMQ windows下的安装与配置 - 腾讯云开发者社区-腾讯云
安装成功图
2. 下载rabbitmq_delayed_message_exchange-3.11.1插件并上传到指定文件夹中,
插件版本根据ribbitmq版本选择:Community Plugins — RabbitMQ
3. 进入sbin目录,打开管理员控制台,输入如下命令,显示类似信息即可:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
4.重新双击sbin目录下的rabbitmq-server.bat文件,启动rabbitmq服务。
5.启动服务之后打开rabbitmq管理官新增交换机即可看到新的交换模式。
二、编码
1.在pom.xml
文件中添加AMQP
相关依赖
<!--rabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2.在application.yml
添加RabbitMQ的相关配置
rabbitmq:port: 5672host: localhostusername: adminpassword: 123456listener:direct:acknowledge-mode: manualsimple:acknowledge-mode: manualretry:enabled: truemax-attempts: 5initial-interval: 3000virtual-host: newCorepublisher-confirm-type: correlatedpublisher-returns: true
3.采用RibbaitMq延迟队列官方插件,新的方法实现延迟队列
package com.hxnwm.ny.diancan.common.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/* RabbitMQ延时消息配置* @time 2022/3/11 11:40*/
@Configuration
public class RabbitMQLazyConfig {// ----- 订单处于已完成状态后的24小时后,对应操作 此部分采用插件 延迟发送到交换机的时间实现延迟队列 wdj-----//队列public static final String ACT_SETTLE_DELAY_QUEUE = "act.settle.delay.queue";//交换机public static final String ACT_SETTLE_DELAY_EXCHANGE = "act.settle.delay.exchange";public static final String ACT_SETTLE_DELAY_ROUTING_KEY ="act.settle.delay.routing.key";@Beanpublic Queue delaySettleQueue() {return new Queue(ACT_SETTLE_DELAY_QUEUE, true, false, false);}@Beanpublic CustomExchange delaySettleExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange(ACT_SETTLE_DELAY_EXCHANGE, "x-delayed-message", true, false, args);}@Beanpublic Binding bindingSettleDelay() {return BindingBuilder.bind(delaySettleQueue()).to(delaySettleExchange()).with(ACT_SETTLE_DELAY_ROUTING_KEY).noargs();}
}
延时队列工具类
@Component
@Slf4j
//订单工具类
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class OrderUtils {@Resourceprivate AmqpTemplate amqpTemplate;/*订单完成后24小时,给B端发消息 @param order* @return void* @Author wdj* @date 2023/3/22 16:23*/public void delayedSettleOrder(Order order) {this.amqpTemplate.convertAndSend(RabbitMQLazyConfig.ACT_SETTLE_DELAY_EXCHANGE, RabbitMQLazyConfig.ACT_SETTLE_DELAY_ROUTING_KEY, order, message -> {//给消息设置延迟毫秒值message.getMessageProperties().setDelay(1000);//24小时后发送消息到B端 改成1秒message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;});log.info("把订单完成24小时后{},发送到rabbitmq队列中", order.getOrderSn());}
}
service层或者controller层根据业务需求调用this.orderUtils.delayedSettleOrder(order);
//线下扫码支付:门店+余额支付成功,发送收益到B端this.orderUtils.delayedSettleOrder(order);
延时消息监听类:主要对消息进行消费逻辑处理。
/* 延时任务监听 @author knight* @time 2022/3/11 10:40*/
@Slf4j
@Component
public class LazyOrderListener {@Resourceprivate OrderService orderService;@Resourceprivate BApiUtils bApiUtils;/* @Description:订单处于已完成状态后的24小时后,将订单推送给B端* @param order * @Author: wdj* @Date: 2023/3/22 */@RabbitListener(queues = RabbitMQLazyConfig.ACT_SETTLE_DELAY_QUEUE)@Transactional(rollbackFor = Exception.class)public void listenDelayedSettleOrder(Order order){log.info("监听到订单完成 " + order.getOrderSn());try {/* 这部分是业务逻辑,请忽略Order order1 = this.orderService.info(order.getId(), order.getOrderSn());if (Objects.isNull(order1)) {log.info("订单未找到,退出");throw new Exception("订单未找到");}// 判断订单状态是否处于已完成if (!Objects.equals(order1.getStatus(), Order.STATUS.STATUS_0020.status)) {log.info("订单状态处于未完成状态,退出");throw new Exception("订单状态处于未完成状态,退出");}//开发阶段不用一直发送消息,记得测试要放行bApiUtils.sendDelayedSettleOrder(order.getId());*/log.info("监听订单处理成功");}catch (Exception e){e.printStackTrace();TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();log.info("监听订单处理失败回滚");}}
}