SpringAMQP的使用
一、什么是SpringAMQP
它可以大大的简化我们的开发,不用我们再自己创建连接写一堆代码,具有便捷的发送,便捷的接收,便捷的绑定。它可以实现自动化的声明队列,交换和绑定。
SpringAMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。
二、基本消息队列
消息发送
1.第一步:引入AMQP依赖
在父工程中引入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.第二步:在publisher中编写测试方法
2.1在publisher的application.yml,添加mq连接信息:
spring:rabbitmq:host: 192.168.19.20 #rabbitMQ的ip地址port: 5672 #端口username: lxworkpassword: 123456virtual-host: / #虚拟主机目录
2.2编写测试方法
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendSimpleQueue(){String queueName = "simple.queue";String message = "hello SimpleQueue SpringAMQP";rabbitTemplate.convertAndSend(queueName,message);}
}
消息接收
1.在consumer的application.yml,添加mq连接信息:
spring:rabbitmq:host: 192.168.19.20 #rabbitMQ的ip地址port: 5672 #端口username: lxworkpassword: 123456virtual-host: / #虚拟主机目录
2.在consumer中编写消费逻辑:
@Component //声明成一个bean
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String msg){System.out.println("消费者接收到simple.queue的消息:"+msg);}
}
三、WorkQueue队列
工作队列可以提高消息处理速度,避免队列消息堆积
我们让publisher发送每隔0.2秒发送50条消息
代码:
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendWorkQueue() throws InterruptedException {String queueName = "simple.queue";String message = "hello workQueue SpringAMQP-->";for (int i = 0; i < 51; i++) {rabbitTemplate.convertAndSend(queueName,message+i);Thread.sleep(20);}}
}
然后在定义两个消费者都监听simple.queue队列
这里有一个消息预取机制,意思就是消费者会提前拿队列中的消息,可以配置preFetch控制消息的上限
配置consumer的yaml文件
spring:rabbitmq:host: 192.168.19.20 #rabbitMQ的ip地址port: 5672 #端口username: itcastpassword: 123321virtual-host: / #虚拟主机listener:simple:prefetch: 1 # 消息预取机制:每次只能获取一条消息,处理完成才能获取下一条消息
@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到的消息:"+msg+ "---"+LocalDateTime.now());Thread.sleep(20);}@RabbitListener(queues = "simple.queue")public void listenWorkQueue2(String msg) throws InterruptedException{System.err.println("消费者2接收到的消息:"+msg+ "---"+ LocalDateTime.now());Thread.sleep(200);}
}