> 文章列表 > SpringBoot整合RabbitMQ(六种工作模式介绍)

SpringBoot整合RabbitMQ(六种工作模式介绍)

SpringBoot整合RabbitMQ(六种工作模式介绍)

介绍

RabbitMQ是一种消息中间件,用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。接收端可以根据RabbitMQ配置的转发机制接收服务端发来的消息。
RabbitMQ依据指定的转发规则进行消息的转发、缓冲和持久化操作,主要用在多服务器间或单服务器的子系统间进行通信,是分布式系统标准的配置。

使用场景

1.服务解耦:生产者与消费者之间不是直接调用的,中间存在消息中间件,生产者不需要关心消费者的调用情况
2.流量削峰:在高并发的情况下,系统服务没法及时处理大量的并发请求,此时可以把并发请求发送消费队列中,消费者从队列获取请求并处理,从而减少系统的压力
3.异步调用:任务之间可以异步执行,从而减少整体执行时长

基本组成

PRODUCER生产者、COMSUMER消费者、Exchange交换机、Message Queue消息队列、Binding Key普通键、Routing Key路由键、Broker中间件(包含队列与交换机)
其中:PRODUCER生产者负责发送消息
COMSUMER消费者负责响应消息

yml文件配置

生产者:

spring:application:name: rabbitmq-server#RabbitMQrabbitmq:#iphost: 192.168.17.128#用户名username: rabbitmq#密码password: rabbitmq#端口port: 5673#虚拟主机名,默认/virtual-host: rabbitmq

消费者:

spring:application:name: rabbitmq-consumer#RabbitMQrabbitmq:#iphost: 192.168.17.128#用户名username: rabbitmq#密码password: rabbitmq#端口port: 5673#虚拟主机名virtual-host: rabbitmqlistener:simple:#同一时间抓取的数量,待处理完再抓取prefetch: 1#设置手动签收,默认自动签收acknowledge-mode: manual

简单模式

一个生产者,一个消费者
SpringBoot整合RabbitMQ(六种工作模式介绍)

生产者代码

声明队列

	/* 简单模式队列* @return*/@Beanpublic Queue simpleQueue(){//持久化 非独占 非自动删除return QueueBuilder.durable("simpleQueue").build();}

发送消息

	private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");@Autowiredprivate RabbitTemplate rabbitTemplate;/* 组装消息* @param msg* @return*/private static Map<String, Object> createMsg(Object msg) {String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);Map<String,Object> message= Maps.newHashMap();message.put("sendTime",sdf.format(new Date()));message.put("msg", msg);message.put("msgId",msgId);return message;}@GetMapping("simple")@ApiOperation("简单模式")public String simple(@RequestParam String msg){Map<String, Object> map = createMsg(msg);rabbitTemplate.convertAndSend("simpleQueue",map);return "ok";}

消费者代码

	/* 简单模式的消费者* @param message 消息* @param c 通道* @param msg 消息内容* @throws IOException*///使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致@RabbitListener(queuesToDeclare = @Queue(value = "simpleQueue"))public void simple(Message message,Channel c,Map msg) throws IOException {MessageProperties properties = message.getMessageProperties();long tag = properties.getDeliveryTag();log.info("简单模式的消费者收到:{}",msg);//由于在yml设置手动回执,此处需要手动回执,不批量签收,回执后才能处理下一批消息c.basicAck(tag,false);}

工作模式

一个生产者向队列发送消息,多个消费者从同一个队列取消息
是否手动回执ACk,合理分发QOS,消息持久化
SpringBoot整合RabbitMQ(六种工作模式介绍)
SpringBoot整合RabbitMQ(六种工作模式介绍)

生产者代码

声明队列

 	/* 工作模式队列* @return*/@Beanpublic Queue workQueue(){return QueueBuilder.durable("workQueue").build();}

发送消息

	private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");@Autowiredprivate RabbitTemplate rabbitTemplate;/* 组装消息* @param msg* @return*/private static Map<String, Object> createMsg(Object msg) {String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);Map<String,Object> message= Maps.newHashMap();message.put("sendTime",sdf.format(new Date()));message.put("msg", msg);message.put("msgId",msgId);return message;}@GetMapping("work")@ApiOperation("工作模式")public String work(){for (int i = 0; i <100; i++) {rabbitTemplate.convertAndSend("workQueue",createMsg(i),message -> {MessageProperties messageProperties = message.getMessageProperties();//默认消息持久化,设置消息不持久化messageProperties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);return message;});}return "ok";}

消费者代码

	/* 工作模式的消费者1* @param message 消息* @param c 通道* @param msg 消息内容* @throws IOException*///使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致@RabbitListener(queuesToDeclare = @Queue(value = "workQueue"))public void work1(Message message,Channel c,Map msg) throws IOException {MessageProperties properties = message.getMessageProperties();long tag = properties.getDeliveryTag();log.info("工作模式的消费者1收到:{}",msg);//手动回执,不批量签收,回执后才能处理下一批消息c.basicAck(tag,false);}/* 工作模式的消费者2* @param message 消息* @param c 通道* @param msg 消息内容* @throws IOException*///使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致@RabbitListener(queuesToDeclare = @Queue(value = "workQueue"))public void work2(Message message,Channel c,Map msg) throws IOException {MessageProperties properties = message.getMessageProperties();long tag = properties.getDeliveryTag();log.info("工作模式的消费者2收到:{}",msg);//手动回执,不批量签收,回执后才能处理下一批消息c.basicAck(tag,false);}

发布订阅模式

生产者通过fanout扇出交换机群发消息给消费者,同一条消息每一个消费者都可以收到
SpringBoot整合RabbitMQ(六种工作模式介绍)
SpringBoot整合RabbitMQ(六种工作模式介绍)

方法1:生产者创建交换机,消费者创建队列与监听队列

生产者代码

创建交换机
	 /* fanout交换机* @return*/@Beanpublic Exchange fautExchange1(){//持久化 非自动删除return ExchangeBuilder.fanoutExchange("fanout").build();}//创建初始化RabbitAdmin对象@Beanpublic RabbitAdmin fanoutRabbitAdmin1(ConnectionFactory connectionFactory) {RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);// 只有设置为 true,spring 才会加载 RabbitAdmin 这个类rabbitAdmin.setAutoStartup(true);rabbitAdmin.declareExchange(fanoutExchange1());return rabbitAdmin;}
发送消息
	private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");@Autowiredprivate RabbitTemplate rabbitTemplate;/* 组装消息* @param msg* @return*/private static Map<String, Object> createMsg(Object msg) {String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);Map<String,Object> message= Maps.newHashMap();message.put("sendTime",sdf.format(new Date()));message.put("msg", msg);message.put("msgId",msgId);return message;}@GetMapping("fanout1")@ApiOperation("发布订阅模式1")public String fanout1(@RequestParam String msg){Map<String, Object> map = createMsg(msg);rabbitTemplate.convertAndSend("fanout",null,map);return "ok";}
消费者代码
	/* 发布订阅模式方法1的消费者1* @param message 消息* @param c 通道* @param msg 消息内容* @throws IOException*/@RabbitListener(bindings =@QueueBinding(value = @Queue,//这里定义随机队列,默认属性: 随机命名,非持久,排他,自动删除exchange =@Exchange(name="fanout",declare = "false")//declare = "false":生产者已定义交换机,此处不再声明交换机))public void fanout1(Message message,Channel c,Map msg) throws IOException {MessageProperties properties = message.getMessageProperties();long tag = properties.getDeliveryTag();log.info("发布订阅模式方法1的消费者1收到:{}",msg);//手动回执,不批量签收,回执后才能处理下一批消息c.basicAck(tag,false);}/* 发布订阅模式方法1的消费者2* @param message 消息* @param c 通道* @param msg 消息内容* @throws IOException*/@RabbitListener(bindings = @QueueBinding(value = @Queue,//这里定义随机队列,默认属性: 随机命名,非持久,排他,自动删除exchange =@Exchange(name="fanout",declare = "false")//declare = "false":生产者已定义交换机,此处不再声明交换机))public void fanout2(Message message,Channel c,Map msg) throws IOException {MessageProperties properties = message.getMessageProperties();long tag = properties.getDeliveryTag();log.info("发布订阅模式方法1的消费者2收到:{}",msg);//手动回执,不批量签收,回执后才能处理下一批消息c.basicAck(tag,false);}

方法2:生产者创建队列与交换机,消费者监听队列

生产者代码

声明队列与交换机
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/* 扇出交换机*/
@Configuration
public class FanoutExchangeConfiguration{/* 定义队列 持久 非排他 非自动删除* @return*/@Beanpublic Queue fanoutQueue1(){return QueueBuilder.durable("fanout-queue1").build();}/* 定义队列 持久 非排他 非自动删除* @return*/@Beanpublic Queue fanoutQueue2(){return QueueBuilder.durable("fanout-queue2").build();}/* 定义扇出交换机 持久  非自动删除* @return*/@Beanpublic FanoutExchange fanoutExchange(){return ExchangeBuilder.fanoutExchange("logs").build();}/* 将队列1与交换机绑定* @return*/@Beanpublic Binding fanoutBinding1(){return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());}/* 将队列2与交换机绑定* @return*/@Beanpublic Binding fanoutBinding2(){return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());}//创建初始化RabbitAdmin对象@Beanpublic RabbitAdmin fanoutRabbitAdmin(ConnectionFactory connectionFactory) {RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);// 只有设置为 true,spring 才会加载 RabbitAdmin 这个类rabbitAdmin.setAutoStartup(true);rabbitAdmin.declareExchange(fanoutExchange());rabbitAdmin.declareQueue(fanoutQueue1());rabbitAdmin.declareQueue(fanoutQueue2());return rabbitAdmin;}
}
发送消息
	private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");@Autowiredprivate RabbitTemplate rabbitTemplate;/* 组装消息* @param msg* @return*/private static Map<String, Object> createMsg(Object msg) {String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);Map<String,Object> message= Maps.newHashMap();message.put("sendTime",sdf.format(new Date()));message.put("msg", msg);message.put("msgId",msgId);return message;}@GetMapping("fanout2")@ApiOperation("发布订阅模式2")public String fanout(@RequestParam String msg){Map<String, Object> map = createMsg(msg);rabbitTemplate.convertAndSend("logs",null,map);return "ok";}

消费者代码

	/* 发布订阅模式方法2的消费者1* @param message 消息* @param c 通道* @param msg 消息内容* @throws IOException*///使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致@RabbitListener(queuesToDeclare = @Queue(value = "fanout-queue1"))public void logs1(Message message,Channel c,Map msg) throws IOException {MessageProperties properties = message.getMessageProperties();long tag = properties.getDeliveryTag();log.info("发布订阅模式方法2的消费者1收到:{}",msg);//手动回执,不批量签收,回执后才能处理下一批消息c.basicAck(tag,false);}/* 发布订阅模式方法2的消费者2* @param message 消息* @param c 通道* @param msg 消息内容* @throws IOException*///使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致@RabbitListener(queuesToDeclare = @Queue(value = "fanout-queue2"))public void logs2(Message message,Channel c,Map msg) throws IOException {MessageProperties properties = message.getMessageProperties();long tag = properties.getDeliveryTag();log.info("发布订阅模式方法2的消费者2收到:{}",msg);//手动回执,不批量签收,回执后才能处理下一批消息c.basicAck(tag,false);}

路由模式

生产者提供通过direct直流交换机给消费者发送消息,消费者通过关键字匹配规则从绑定的队列获取消息
SpringBoot整合RabbitMQ(六种工作模式介绍)

SpringBoot整合RabbitMQ(六种工作模式介绍)

方法1:生产者创建交换机,消费者创建队列与监听队列

生产者代码

创建交换机
	 /* 直流交换机* @return*/@Beanpublic Exchange routeExchange(){//持久化 非自动删除return ExchangeBuilder.directExchange("route").build();}//创建初始化RabbitAdmin对象@Beanpublic RabbitAdmin RabbitAdminroute(ConnectionFactory connectionFactory) {RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);// 只有设置为 true,spring 才会加载 RabbitAdmin 这个类rabbitAdmin.declareExchange(routeExchange());return rabbitAdmin;}
发送消息
	private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");@Autowiredprivate RabbitTemplate rabbitTemplate;/* 组装消息* @param msg* @return*/private static Map<String, Object> createMsg(Object msg) {String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);Map<String,Object> message= Maps.newHashMap();message.put("sendTime",sdf.format(new Date()));message.put("msg", msg);message.put("msgId",msgId);return message;}@GetMapping("route")@ApiOperation("路由模式1")public String route(@RequestParam String msg, @RequestParam String routingKey){Map<String, Object> map = createMsg(msg);rabbitTemplate.convertAndSend("route",routingKey,map);return "ok";}
消费者代码
	 /* 路由式方法1的消费者1* @param message 消息* @param c 通道* @param msg 消息内容* @throws IOException*/@RabbitListener(bindings =@QueueBinding(value = @Queue,//这里定义随机队列,默认属性: 随机命名,非持久,排他,自动删除exchange =@Exchange(name="route",declare = "false"),//declare = "false":生产者已定义交换机,此处不再声明交换机key = {"error"}//路由键))public void route1(Message message,Channel c,Map msg) throws IOException {MessageProperties properties = message.getMessageProperties();String routingKey = properties.getReceivedRoutingKey();log.info("路由模式方法1的消费者1收到:{},路由键:{}",msg,routingKey);//手动回执,不批量签收,回执后才能处理下一批消息long tag = properties.getDeliveryTag();c.basicAck(tag,false);}/* 路由式方法1的消费者2* @param message 消息* @param c 通道* @param msg 消息内容* @throws IOException*/@RabbitListener(bindings =@QueueBinding(value = @Queue,//这里定义随机队列,默认属性: 随机命名,非持久,排他,自动删除exchange =@Exchange(name="route",declare = "false"),//declare = "false":生产者已定义交换机,此处不再声明交换机key = {"info","debug"}//路由键))public void route2(Message message,Channel c,Map msg) throws IOException {MessageProperties properties = message.getMessageProperties();String routingKey = properties.getReceivedRoutingKey();log.info("路由模式方法1的消费者2收到:{},路由键:{}",msg,routingKey);//手动回执,不批量签收,回执后才能处理下一批消息long tag = properties.getDeliveryTag();c.basicAck(tag,false);}

方法2:生产者创建队列与交换机,消费者监听队列

生产者代码

声明队列与交换机
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/* 路由交换机*/
@Configuration
public class RouteExchangeConfiguration {/* 定义队列 持久 非排他 非自动删除* @return*/@Beanpublic Queue directQueue1(){return  QueueBuilder.durable("direct-queue1").build();}/* 定义队列 持久 非排他 非自动删除* @return*/@Beanpublic Queue directQueue2(){return  QueueBuilder.durable("direct-queue2").build();}/* 定义路由交换机 持久  非自动删除* @return*/@Beanpublic DirectExchange directExchange(){return  ExchangeBuilder.directExchange("direct").build();}/* 将队列1与交换机绑定,路由键:error* @return*/@Beanpublic Binding directBinding1(){return BindingBuilder.bind(directQueue1()).to(directExchange()).with("error");}/* 将队列2与交换机绑定,路由键:info* @return*/@Beanpublic Binding directBinding2(){return BindingBuilder.bind(directQueue2()).to(directExchange()).with("info");}/* 将队列2与交换机绑定,路由键:debug* @return*/@Beanpublic Binding directBinding3(){return BindingBuilder.bind(directQueue2()).to(directExchange()).with("debug");}//创建初始化RabbitAdmin对象@Beanpublic RabbitAdmin directRabbitAdmin(ConnectionFactory connectionFactory) {RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);// 只有设置为 true,spring 才会加载 RabbitAdmin 这个类rabbitAdmin.setAutoStartup(true);rabbitAdmin.declareExchange(directExchange());rabbitAdmin.declareQueue(directQueue1());rabbitAdmin.declareQueue(directQueue2());return rabbitAdmin;}
}
发送消息
private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");@Autowiredprivate RabbitTemplate rabbitTemplate;/* 组装消息* @param msg* @return*/private static Map<String, Object> createMsg(Object msg) {String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);Map<String,Object> message= Maps.newHashMap();message.put("sendTime",sdf.format(new Date()));message.put("msg", msg);message.put("msgId",msgId);return message;}@GetMapping("direct")@ApiOperation("路由模式2")public String direct(@RequestParam String msg, @RequestParam String routingKey){Map<String, Object> map = createMsg(msg);rabbitTemplate.convertAndSend("direct",routingKey,map);return "ok";}

消费者代码

	/* 路由模式方法2的消费者1* @param message 消息* @param c 通道* @param msg 消息内容* @throws IOException*///使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致@RabbitListener(queuesToDeclare = @Queue(value = "direct-queue1"))public void direct1(Message message,Channel c,Map msg) throws IOException {MessageProperties properties = message.getMessageProperties();String routingKey = properties.getReceivedRoutingKey();log.info("路由模式方法2的消费者1收到:{},路由键:{}",msg,routingKey);//手动回执,不批量签收,回执后才能处理下一批消息long tag = properties.getDeliveryTag();c.basicAck(tag,false);}/* 路由模式方法2的消费者2* @param message 消息* @param c 通道* @param msg 消息内容* @throws IOException*///使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致@RabbitListener(queuesToDeclare = @Queue(value = "direct-queue2"))public void direct2(Message message,Channel c,Map msg) throws IOException {MessageProperties properties = message.getMessageProperties();String routingKey = properties.getReceivedRoutingKey();log.info("路由模式方法2的消费者2收到:{},路由键:{}",msg,routingKey);//手动回执,不批量签收,回执后才能处理下一批消息long tag = properties.getDeliveryTag();c.basicAck(tag,false);}

主题模式

生产者通过topic主题交换机给消费者发送消息,消费者通过特殊的关键字匹配规则从绑定的队列取消息
SpringBoot整合RabbitMQ(六种工作模式介绍)
SpringBoot整合RabbitMQ(六种工作模式介绍)

方法1:生产者创建交换机,消费者创建队列与监听队列

生产者代码

创建交换机
	/* 主题交换机* @return*/@Beanpublic Exchange themeExchange(){//持久化 非自动删除return ExchangeBuilder.topicExchange("theme").build();}//创建初始化RabbitAdmin对象@Beanpublic RabbitAdmin rabbitAdmintheme(ConnectionFactory connectionFactory) {RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);// 只有设置为 true,spring 才会加载 RabbitAdmin 这个类rabbitAdmin.setAutoStartup(true);rabbitAdmin.declareExchange(themeExchange());return rabbitAdmin;}
发送消息
private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");@Autowiredprivate RabbitTemplate rabbitTemplate;/* 组装消息* @param msg* @return*/private static Map<String, Object> createMsg(Object msg) {String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);Map<String,Object> message= Maps.newHashMap();message.put("sendTime",sdf.format(new Date()));message.put("msg", msg);message.put("msgId",msgId);return message;}@GetMapping("theme")@ApiOperation("主题模式1")public String theme(@RequestParam String msg, @RequestParam String routingKey){Map<String, Object> map = createMsg(msg);rabbitTemplate.convertAndSend("theme",routingKey,map);return "ok";}

消费者代码

	/* 主题模式方法1的消费者1* @param message 消息* @param c 通道* @param msg 消息内容* @throws IOException*/@RabbitListener(bindings =@QueueBinding(value = @Queue,//这里定义随机队列,默认属性: 随机命名,非持久,排他,自动删除exchange =@Exchange(name="theme",type = ExchangeTypes.TOPIC),//declare = "false":生产者已定义交换机,此处不再声明交换机key = {"*.error.*"}))public void theme1(Message message,Channel c,Map msg) throws IOException {MessageProperties properties = message.getMessageProperties();String routingKey = properties.getReceivedRoutingKey();log.info("主题模式方法1的消费者1收到:{},路由键:{}",msg,routingKey);//手动回执,不批量签收,回执后才能处理下一批消息long tag = properties.getDeliveryTag();c.basicAck(tag,false);}/* 主题模式方法1的消费者2* @param message 消息* @param c 通道* @param msg 消息内容* @throws IOException*/@RabbitListener(bindings =@QueueBinding(value = @Queue,//这里定义随机队列,默认属性: 随机命名,非持久,排他,自动删除exchange =@Exchange(name="theme",type = ExchangeTypes.TOPIC),//declare = "false":生产者已定义交换机,此处不再声明交换机key = {"#.info","debug.#"}))public void theme2(Message message,Channel c,Map msg) throws IOException {MessageProperties properties = message.getMessageProperties();String routingKey = properties.getReceivedRoutingKey();log.info("主题模式方法1的消费者2收到:{},路由键:{}",msg,routingKey);//手动回执,不批量签收,回执后才能处理下一批消息long tag = properties.getDeliveryTag();c.basicAck(tag,false);}

方法2:生产者创建队列与交换机,消费者监听队列

生产者代码

声明队列与交换机
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/* 主题交换机*/
@Configuration
public class TopicExchangeConfiguration {/* 定义队列 持久 非排他 非自动删除* @return*/@Beanpublic Queue topicQueue1(){return  QueueBuilder.durable("topic-queue1").build();}/* 定义队列 持久 非排他 非自动删除* @return*/@Beanpublic Queue topicQueue2(){return  QueueBuilder.durable("topic-queue2").build();}/* 定义路由交换机 持久  非自动删除* @return*/@Beanpublic TopicExchange topicExchange(){return  ExchangeBuilder.topicExchange("topic").build();}/* 将队列1与交换机绑定,路由键:A. @return*/@Beanpublic Binding topicBinding1(){return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("A.*");}/* 将队列2与交换机绑定,路由键:#.B* @return*/@Beanpublic Binding topicBinding2(){return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("#.*");}/* 将队列2与交换机绑定,路由键:A.B* @return*/@Beanpublic Binding topicBinding3(){return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("A.B");}//创建初始化RabbitAdmin对象@Beanpublic RabbitAdmin topicRabbitAdmin(ConnectionFactory connectionFactory) {RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);// 只有设置为 true,spring 才会加载 RabbitAdmin 这个类rabbitAdmin.setAutoStartup(true);rabbitAdmin.declareExchange(topicExchange());rabbitAdmin.declareQueue(topicQueue1());rabbitAdmin.declareQueue(topicQueue2());return rabbitAdmin;}
}
发送消息
private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");@Autowiredprivate RabbitTemplate rabbitTemplate;/* 组装消息* @param msg* @return*/private static Map<String, Object> createMsg(Object msg) {String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);Map<String,Object> message= Maps.newHashMap();message.put("sendTime",sdf.format(new Date()));message.put("msg", msg);message.put("msgId",msgId);return message;}@GetMapping("topic")@ApiOperation("主题模式2")public String topic2(@RequestParam String msg, @RequestParam String routingKey){Map<String, Object> map = createMsg(msg);rabbitTemplate.convertAndSend("topic",routingKey,map);return "ok";}
消费者代码
	/* 主题模式方法2的消费者1* @param message 消息* @param c 通道* @param msg 消息内容* @throws IOException*///使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致@RabbitListener(queuesToDeclare = @Queue(value = "topic-queue1"))public void topic1(Message message,Channel c,Map msg) throws IOException {MessageProperties properties = message.getMessageProperties();String routingKey = properties.getReceivedRoutingKey();log.info("主题模式方法2的消费者1收到:{},路由键:{}",msg,routingKey);//手动回执,不批量签收,回执后才能处理下一批消息long tag = properties.getDeliveryTag();c.basicAck(tag,false);}/* 主题模式方法2的消费者2* @param message 消息* @param c 通道* @param msg 消息内容* @throws IOException*///使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致@RabbitListener(queuesToDeclare = @Queue(value = "topic-queue2"))public void topic2(Message message,Channel c,Map msg) throws IOException {MessageProperties properties = message.getMessageProperties();String routingKey = properties.getReceivedRoutingKey();log.info("主题模式方法2的消费者2收到:{},路由键:{}",msg,routingKey);//手动回执,不批量签收,回执后才能处理下一批消息long tag = properties.getDeliveryTag();c.basicAck(tag,false);}

异步调用模式

生产者绑定调用队列向消费者发送消息,通过绑定返回队列异步接收消息处理结果

生产者代码

创建发送队列与接收队列

	 /* 定义异步发送队列* @return*/@Beanpublic Queue rpcQueue(){return QueueBuilder.nonDurable("rpc-queue").build();}/* 定义异步接收队列* @return*/@Beanpublic Queue receivedQueue(){return QueueBuilder.nonDurable(UUID.randomUUID().toString().replaceAll("-","")).build();}//创建初始化RabbitAdmin对象@Beanpublic RabbitAdmin rabbitAdmin111(ConnectionFactory connectionFactory) {RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);// 只有设置为 true,spring 才会加载 RabbitAdmin 这个类rabbitAdmin.setAutoStartup(true);rabbitAdmin.declareQueue(rpcQueue());rabbitAdmin.declareQueue(receivedQueue());return rabbitAdmin;}

发送与接收消息

	@Value("#{receivedQueue.name}")private String rndQueue;@GetMapping("rpc")@ApiOperation("异步调用模式")public String rpc(@RequestParam Integer n){rabbitTemplate.convertAndSend("rpc-queue", n, message -> {MessageProperties properties = message.getMessageProperties();properties.setReplyTo(rndQueue);properties.setCorrelationId(UUID.randomUUID().toString());return message;});return "ok";}//从随机队列接收计算结果@RabbitListener(queues = "#{receivedQueue.name}")public void receive(long r, @Header(name= AmqpHeaders.CORRELATION_ID) String correlationId) {log.info("\\n\\n"+correlationId+" - 收到: "+r);}

消费者代码

	/* 异步调用* @param n* @return*///使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致@RabbitListener(queuesToDeclare = @Queue(value = "rpc-queue",durable = "false"))public long getFbnq(int n,Message message,Channel c) throws IOException {MessageProperties properties = message.getMessageProperties();long tag = properties.getDeliveryTag();long result = f(n);c.basicAck(tag,false);return result;}private long f(int n) {if (n==1 || n==2) {return 1;}return f(n-1) + f(n-2);}

隔音材料网