> 文章列表 > 一篇文章快速入门Spring AMQP

一篇文章快速入门Spring AMQP

一篇文章快速入门Spring AMQP

文章目录

  • 一、AMQP
  • 二、Spring AMQP
    • 2.1 介绍
    • 2.2 SpringAMQP发送消息
    • 2.3 SpringAMQP接收消息
    • 2.4 WorkQueue模型
      • 2.4.1 概念
      • 2.4.2 示例
    • 2.5 发布订阅模型
      • 2.5.1 介绍
      • 2.5.2 Fanout Exchange
      • 2.5.3 Direct Exchange
      • 2.5.4 Topic Exchange
    • 2.6 消息转换器
      • 2.6.1 介绍
      • 2.6.2 切换消息转换器

一、AMQP

  AMQP(高级消息队列协议)是一个标准的消息传递协议,用于在应用程序之间进行消息传递,特别是在分布式系统中。
在这里插入图片描述

  AMQP提供了一个中间件消息传递机制,使不同的应用程序能够可靠地、安全地和高效地进行通信。它允许应用程序通过交换消息来进行通信,而不必直接进行网络通信。这种机制使得应用程序之间的通信更加灵活,因为它们可以独立地进行通信,而不必考虑其他应用程序的状态和可用性。

  AMQP支持消息传递的各种场景,包括点对点通信、发布/订阅模式和请求/响应模式等。它具有诸如持久化消息、事务、消息确认和优先级等高级特性,使得它非常适合处理复杂的分布式应用程序。

二、Spring AMQP

2.1 介绍

  Spring AMQP是基于Spring Framework的AMQP(高级消息队列协议)客户端库,用于在Java应用程序中使用AMQP进行消息传递。
在这里插入图片描述

  Spring AMQP提供了一个高级的抽象层,使得开发人员可以很方便地使用AMQP进行消息传递,而不必直接处理AMQP的复杂性。它支持各种AMQP实现,例如RabbitMQ和Apache ActiveMQ等。

Spring AMQP提供了许多有用的特性,例如:

1.声明式配置:使用注释和Java配置声明式地配置消息交换和队列。

2.简化的消息发布和订阅:使用Spring AMQP,开发人员可以很方便地将消息发送到队列或从队列中接收消息。

3.异步处理:Spring AMQP提供了异步消息处理机制,可以轻松地处理大量消息。

4.异常处理:Spring AMQP提供了对异常处理的支持,以便在发生错误时进行适当的处理。

2.2 SpringAMQP发送消息

  1. 引入依赖:
<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 在application.yml文件配置RabbitMQ的连接信息:
spring:rabbitmq:#主机名host: 192.168.XX.XX#端口port: 5672#用户名username: caterpillar#密码password: 123456#虚拟主机virtual-host: /
  1. 调用RabbitTemplate的convertAndSend方法:
//自动注入RabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void SendMessage(){//队列名称String queueName = "simple.queue";//所发送的消息String message = "hello,World!";//调用convertAndSend方法发送信息rabbitTemplate.convertAndSend(queueName,message);
}

2.3 SpringAMQP接收消息

  1. 在application.yml文件配置RabbitMQ的连接信息:
spring:rabbitmq:#主机名host: 192.168.XX.XX#端口port: 5672#用户名username: caterpillar#密码password: 123456#虚拟主机virtual-host: /
  1. 在消费者的服务中新建一个类接收消息并处理
//声明为Bean
@Component
public class SpringRabbitListener {//指定消息队列的名称@RabbitListener(queues = "simple.queue")public void listenQueue(String msg){//处理指定消息队列所收到的信息System.out.println("收到的消息:" + msg);}
}

2.4 WorkQueue模型

2.4.1 概念

  Work Queue(工作队列)模型是一种经典的消息队列模型,也被称为任务队列模型。它用于将耗时的任务分配给多个工作进程以便并行执行。
在这里插入图片描述

  在Work Queue模型中,一个生产者将消息发送到一个队列中,多个消费者从队列中接收消息并处理它们。一个消息只能被一个消费者处理,即消息的消费是互斥的。

Work Queue模型的主要特点包括:

  1. 生产者将消息发送到队列中,而不是发送到特定的消费者。
  2. 消费者从队列中获取消息,并将其处理。
  3. 消息的处理是互斥的,即一个消息只能被一个消费者处理。
  4. 消费者可以平均分配任务,以便并行处理。

2.4.2 示例

  1. 生产者测试代码
@Test
public void testSendMessage2WorkQueue() throws InterruptedException {String queueName = "simple.queue";String message = "message -- ";//发送50次消息for (int i = 1; i <= 50; i++) {rabbitTemplate.convertAndSend(queueName, message + i);//每次循环延时20毫秒Thread.sleep(20);}
}
  1. 消费者代码
@Component
public class SpringRabbitListener {//消息队列1@RabbitListener(queues = "simple.queue")public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消息队列1收到的消息:" + msg + " -- " + LocalTime.now());Thread.sleep(20);}//消息队列2@RabbitListener(queues = "simple.queue")public void listenWorkQueue2(String msg) throws InterruptedException {//使用err是为了输出时颜色不同便于区分System.err.println("消息队列2收到的消息:" + msg + " -- " + LocalTime.now());Thread.sleep(200);}
}
  1. 消费者配置添加
spring:rabbitmq:listener:simple:prefetch: 1 #每次只预取一条消息,使消费能力的队列更强的做更多的接收

2.5 发布订阅模型

2.5.1 介绍

  发布-订阅模型(Publish-Subscribe)是一种消息队列模型,主要用于将消息同时发给多个消费者。在发布-订阅模型中,消息的生产者将消息发送到一个交换机中,交换机会将消息同时发给多个与之绑定的队列,多个消费者可以分别从这些队列中获取消息并进行处理。
在这里插入图片描述

  与点对点模型(P2P)不同,发布-订阅模型中的消息生产者并不需要知道消息的接收者,而是只需要将消息发送到交换机中即可。交换机会将消息广播给与之绑定的所有队列,多个消费者可以从不同的队列中获取并处理消息。消息的处理是并行的,多个消费者可以同时处理不同的消息,提高了系统的处理效率和吞吐量。

常见的exchange类型包括:

  • Fanout:广播
  • Direct:路由
  • Topic:话题

2.5.2 Fanout Exchange

  Fanout Exchange是一种简单的Exchange类型,它会将所有收到的消息广播给与之绑定的所有队列,所有的队列都会收到相同的消息。在Fanout Exchange中,不需要指定Routing Key,消息会直接广播给所有的队列。Fanout Exchange通常用于需要将消息广播给多个消费者的场景,例如在线聊天室、实时数据分析等。
在这里插入图片描述

示例:

  1. 生产者测试代码
@Test
public void testSendFanoutExchange(){//交换机名称String exchangName = "caterpillar.fanout";//消息String message = "hello";//发送消息rabbitTemplate.convertAndSend(exchangName,"",message);
}	
  1. 添加一个类作为消费者的配置类
//声明为一个配置
@Configuration
public class FanoutConfig {//声明交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("caterpillar.fanout");}//声明队列1@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}//将队列1绑定到交换机@Beanpublic Binding fanoutBinding1(Queue fanoutQueue1,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}//声明队列2@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}//将队列2绑定到交换机@Beanpublic Binding fanoutBinding2(Queue fanoutQueue2,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}
  1. 消费者代码
@Component
public class SpringRabbitListener {@RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1(String msg){System.out.println("fanout1收到的消息:" + msg);}@RabbitListener(queues = "fanout.queue2")public void listenfanoutQueue(String msg){System.out.println("fanout2收到的消息:" + msg);}
}

2.5.3 Direct Exchange

  Direct Exchange是一种常见的Exchange类型,它会将消息发送给与之匹配的队列。在Direct Exchange中,生产者将消息发送到指定的交换机中,并指定一个Routing Key,Exchange会将消息发送到与之绑定的Routing Key相同的队列中。Direct Exchange通常用于一些点对点的场景,例如订单处理、日志记录等。
在这里插入图片描述

示例:

  1. 生产者测试代码
@Test
public void testSendDirectExchange(){//交换机名称String exchangName = "caterpillar.direct";//消息String message = "hello,blue";//发送消息,只会发送给相同key的消费者,第二个参数是Routing KeyrabbitTemplate.convertAndSend(exchangName,"red",message);
}
  1. 消费者代码
@RabbitListener(bindings = @QueueBinding(//声明队列value = @Queue(name = "direct.queue1"),//声明交换机exchange = @Exchange(name = "caterpillar.direct",type = ExchangeTypes.DIRECT),//声明绑定keykey = {"red","blue"}
))
public void listenDirectQueue1(String msg){System.out.println("direct1收到的消息:" + msg);
}
@RabbitListener(bindings = @QueueBinding(//声明队列value = @Queue(name = "direct.queue2"),//声明交换机exchange = @Exchange(name = "caterpillar.direct",type = ExchangeTypes.DIRECT),//声明绑定keykey = {"red","yellow"}
))
public void listenDirectQueue2(String msg){System.out.println("direct2收到的消息:" + msg);
}

  在此示例中两个消费者都会收到消息,如果生产者指定的key为blue则只有direct1收到消息,如果生产者指定的key为yellow则只有direct2收到消息。

2.5.4 Topic Exchange

  Topic Exchange是一种强大的Exchange类型,它会将消息发送到与之匹配的队列中。在Topic Exchange中,生产者可以指定一个带有通配符的Routing Key(*),Exchange会将消息发送到所有与之匹配的队列中。Topic Exchange支持通配符的Routing Key匹配,可以根据消息的主题进行路由,非常适合一些消息订阅和过滤的场景,例如新闻订阅、事件通知等。
在这里插入图片描述

Topic Exchange支持两种通配符:* 和 #。

  1. *:表示匹配一个单词,可以用于匹配某个特定单词,例如"*.apple"可以匹配"green.apple"、“red.apple"等,但不能匹配"green.big.apple”。
  2. #:表示匹配零个或多个单词,可以用于匹配某个前缀或者所有单词,例如"fruit.#“可以匹配"fruit.apple”、“fruit.orange”、"fruit.apple.red"等。

示例:

  1. 生产者测试代码
@Test
public void testSendTopicExchange(){//交换机名称String exchangName = "caterpillar.topic";//消息String message = "毛毛虫被网易招聘了!!!";//发送消息rabbitTemplate.convertAndSend(exchangName,"china.news",message);
}
  1. 消费者代码
@RabbitListener(bindings = @QueueBinding(//声明队列value = @Queue(name = "topic.queue1"),//声明交换机exchange = @Exchange(name = "caterpillar.topic",type = ExchangeTypes.TOPIC),//声明绑定keykey = "china.#"
))
public void listenTopicQueue1(String msg){System.out.println("topic1收到的消息:" + msg);
}
@RabbitListener(bindings = @QueueBinding(//声明队列value = @Queue(name = "topic.queue2"),//声明交换机exchange = @Exchange(name = "caterpillar.topic",type = ExchangeTypes.TOPIC),//声明绑定keykey = "#.news"
))
public void listenTopicQueue2(String msg){System.out.println("topic2收到的消息:" + msg);
}

  在此示例中两个消费者都会收到消息,如果生产者指定的key为china.weather则只有topic1收到消息,如果生产者指定的key为Canada则只有topic2收到消息。

2.6 消息转换器

2.6.1 介绍

  Spring AMQP提供了消息转换器(MessageConverter)的支持,用于将消息对象转换为字节数组或者将字节数组转换为消息对象,使得消息生产者和消费者之间可以方便地传输POJO对象。

Spring AMQP提供了以下几种消息转换器:

  1. SimpleMessageConverter:默认的消息转换器,可以将Java对象转换为字节数组并发送到队列中,也可以从队列中接收字节数组并将其转换为Java对象。支持文本消息和字节消息的转换。
  2. Jackson2JsonMessageConverter:基于Jackson库的消息转换器,可以将Java对象转换为JSON字符串并发送到队列中,也可以从队列中接收JSON字符串并将其转换为Java对象。支持文本消息和字节消息的转换。
  3. Jackson2XmlMessageConverter:基于Jackson库的消息转换器,可以将Java对象转换为XML字符串并发送到队列中,也可以从队列中接收XML字符串并将其转换为Java对象。支持文本消息和字节消息的转换。
  4. MarshallingMessageConverter:使用Spring的Marshaller和Unmarshaller进行消息转换的消息转换器。支持XML和Java Serialization格式。
  5. ByteArrayMessageConverter:将Java对象转换为字节数组并发送到队列中,也可以从队列中接收字节数组并将其转换为Java对象。只支持字节消息的转换。

2.6.2 切换消息转换器

以切换为Jackson2JsonMessageConverter为例:

  1. 在主类pom文件引入依赖
<!--数据绑定模块-->
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId>
</dependency>
  1. 在生产者和消费者的配置类中声明消息转换器的Bean
//消息转换器
@Bean
public MessageConverter messageConverter(){//Jackson消息转换工具return new Jackson2JsonMessageConverter();
}
  1. 消费者监听队列消息时接收的消息要和生产者中的类型一致
@RabbitListener(/*queues = 队列名称*/)
public void listenObjectQueue(/*消息类型 消息实例*/){//消息操作
}