> 文章列表 > SpringAMQP的使用

SpringAMQP的使用

SpringAMQP的使用

目录

  • 一、什么是SpringAMQP
  • 二、基本消息队列
    • 消息发送
    • 消息接收
  • 三、WorkQueue队列

一、什么是SpringAMQP

它可以大大的简化我们的开发,不用我们再自己创建连接写一堆代码,具有便捷的发送,便捷的接收,便捷的绑定。它可以实现自动化的声明队列,交换和绑定。
SpringAMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

二、基本消息队列

消息发送

1.第一步:引入AMQP依赖
在父工程中引入依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.第二步:在publisher中编写测试方法
2.1在publisher的application.yml,添加mq连接信息:

spring:rabbitmq:host: 192.168.19.20 #rabbitMQ的ip地址port: 5672 #端口username: lxworkpassword: 123456virtual-host: / #虚拟主机目录

2.2编写测试方法

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendSimpleQueue(){String queueName = "simple.queue";String message = "hello SimpleQueue SpringAMQP";rabbitTemplate.convertAndSend(queueName,message);}
}

消息接收

1.在consumer的application.yml,添加mq连接信息:

spring:rabbitmq:host: 192.168.19.20 #rabbitMQ的ip地址port: 5672 #端口username: lxworkpassword: 123456virtual-host: / #虚拟主机目录

2.在consumer中编写消费逻辑:

@Component //声明成一个bean
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String msg){System.out.println("消费者接收到simple.queue的消息:"+msg);}
}

三、WorkQueue队列

工作队列可以提高消息处理速度,避免队列消息堆积
SpringAMQP的使用
我们让publisher发送每隔0.2秒发送50条消息
代码:

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendWorkQueue() throws InterruptedException {String queueName = "simple.queue";String message = "hello workQueue SpringAMQP-->";for (int i = 0; i < 51; i++) {rabbitTemplate.convertAndSend(queueName,message+i);Thread.sleep(20);}}
}

然后在定义两个消费者都监听simple.queue队列
这里有一个消息预取机制,意思就是消费者会提前拿队列中的消息,可以配置preFetch控制消息的上限
配置consumer的yaml文件

spring:rabbitmq:host: 192.168.19.20 #rabbitMQ的ip地址port: 5672 #端口username: itcastpassword: 123321virtual-host: / #虚拟主机listener:simple:prefetch: 1 # 消息预取机制:每次只能获取一条消息,处理完成才能获取下一条消息
@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到的消息:"+msg+ "---"+LocalDateTime.now());Thread.sleep(20);}@RabbitListener(queues = "simple.queue")public void listenWorkQueue2(String msg) throws InterruptedException{System.err.println("消费者2接收到的消息:"+msg+ "---"+ LocalDateTime.now());Thread.sleep(200);}
}