> 文章列表 > 使用RabbitMq实现延迟队列

使用RabbitMq实现延迟队列

使用RabbitMq实现延迟队列

下载RabbitMq:本地安装rabbitmq_王胖胖1112的博客-CSDN博客

1、pom文件引入

 <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit-test</artifactId>
        <scope>test</scope>
    </dependency>

 2、设置配置文件

spring:

        rabbitmq:

                host: IP地址 // 若在本地:127.0.0.1

                username: guest //默认的账号密码

                password: guest

                port: 5672

                virtual-host: / 

 3、使用延迟队列需要安装插件

一、进行地址下载插件:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

image-20210919180221637

二、将下载的文件放入rabbitMq目录下的plugins文件夹中

三、进入工作台,进入rabbitMq目录下的plugins目录下,执行:rabbitmq-plugins enable rabbitmq_delayed_message_exchange,开启插件支持

image-20210919180632012

四、重启rabbitMq

  1. net start RabbitMQ 启动

  2. net stop RabbitMQ 停止

五、到http://localhost:15672,查看交换机类型是否有x-delayed-message,有则成功了

image-20210919180805320

 4、配置文件类代码

@Configuration
public class MqConf {/***死信交换机* */
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
/***死信队列* */
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
/***死信路由* */
public static final String DELAYED_ROUTING_KEY = "delayed.routingKey";

/**

*定义死信队列

* */

@Bean("delayedQueue")
public Queue delayedQueue(){return new Queue(DELAYED_QUEUE_NAME);
}

/**

*定义死信交换机

* */

@Bean("delayedExchange")
public CustomExchange delayedExchange(){Map<String, Object> args = new HashMap<>(1);// 自定义交换机的类型args.put("x-delayed-type", "direct");return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
}

/**

*绑定

* */

@Bean
public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue delayedQueue,@Qualifier("delayedExchange") CustomExchange delayedExchange){return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}

 5、消息生产者代码

@Slf4j
@RestController
@RequestMapping("/ttl")
public class TestController {@GetMapping("/sendDelayMsg/{message}/{delayTime}")public String sendMsg(@PathVariable String message, @PathVariable Integer delayTime){rabbitTemplate.convertAndSend(TtlQueueConfig.DELAYED_EXCHANGE_NAME, TtlQueueConfig.DELAYED_ROUTING_KEYA, message,                 messagePostProcessor ->{messagePostProcessor.getMessageProperties().setDelay(delayTime);return messagePostProcessor;});log.info("当前时间:{},发送一条延迟{}毫秒的信息给队列delay.queue:{}", new Date(), delayTime, message);return "发送成功";}
}

6、消息消费者代码 

@Slf4j
@Component
public class DeadLetterConsumer {@RabbitListener(queues = TtlQueueConfig.DELAYED_QUEUE_NAMEA)public void receiveD(String msg){log.info("当前时间{},队列收到死信队列的消息:{}", new Date(), msg);}
}