> 文章列表 > RabbitMQ

RabbitMQ

RabbitMQ

1、概念

1.1、什么是MQ

MQ(message queue),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了MQ之后,消息发送上游只需要依赖MQ,不用依赖其他服务。

1.2、为什么要用MQ

1.2.1、流量消峰

举个例子,如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余正常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两万次下单操作系统是处理不了的,只能限制订单超过一万后不允许用户下单。使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体验要好。

1.2.2、应用解耦

以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障,提升系统的可用性。
RabbitMQ

1.2.3、异步处理

有些服务间调用是异步的,例如A调用B,B需要花费很长时间执行,但是A需要知道B什么时候可以执行完,以前一般有两种方式,A过一段时间去调用B的查询api查询。或者A提供一个callback api,B执行完之后调用api通知A服务。这两种方式都不是很优雅,使用消息总线,可以很方便解决这个问题,A调用B服务后,只需要监听B处理完成的消息,当B处理完成后,会发送一条消息给MQ,MQ会将此消息转发给A服务。这样A服务既不用循环调用B的查询api,也不用提供callback api。同样B服务也不用做这些操作。A服务还能及时的得到异步处理成功的消息。
RabbitMQ

1.3、MQ的分类

1.3.1、ActiveMQ

优点:单机吞吐量万级,时效性ms级,可用性高,基于主从架构实现高可用性,消息可靠性较低的概率丢失数据
缺点:官方社区现在对ActiveMQ 5.x维护越来越少,高吞吐量场景较少使用。

1.3.2、Kafka

大数据的杀手锏,谈到大数据领域内的消息传输,则绕不开Kafka,这款为大数据而生的消息中间件,以其百万级TPS的吞吐量名声大噪,迅速成为大数据领域的宠儿,在数据采集、传输、存储的过程中发挥着举足轻重的作用。目前已经被LinkedIn,Uber,Twitter,Netflix等大公司所采纳。

优点:性能卓越,单机写入TPS约在百万条/秒,最大的优点,就是吞吐量高。时效性ms级可用性非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用,消费者采用Pull方式获取消息,消息有序,通过控制能够保证所有消息被消费且仅被消费一次;有优秀的第三方Kafka Web管理界面Kafka-Manager;在日志领域比较成熟,被多家公司和多个开源项目使用;功能支持:功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用

缺点: Kafka单机超过64个队列/分区,Load 会发生明显的飙高现象,队列越多,load越高,发送消息响应时间变长,使用短轮询方式,实时性取决于轮询间隔时间,消费失败不支持重试;支持消息顺序,但是一台代理宕机后,就会产生消息乱序,社区更新较慢;

1.3.3、RocketMQ

RocketMQ出自阿里巴巴的开源产品,用Java语言实现,在设计时参考了Kafka,并做出了自己的一些改进。被阿里巴巴广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog.分发等场景。

优点:单机吞吐量十万级,可用性非常高,分布式架构,消息可以做到О丢失,MQ功能较为完善,还是分布式的,扩展性好,支持10亿级别的消息堆积,不会因为堆积导致性能下降,源码是java我们可以自己阅读源码,定制自己公司的MQ

缺点:支持的客户端语言不多,目前是java及c++,其中c++不成熟;社区活跃度一般,没有在MQ核心中去实现JMS等接口,有些系统要迁移需要修改大量代码

1.3.4、RabbitMQ

2007年发布,是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。

优点:由于erlang.语言的高并发特性,性能较好;吞吐量到万级,MQ功能比较完备,健壮、稳定、易用、跨平台、支持多种语言如: Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX文档齐全;开源提供的管理界面非常棒,用起来很好用,社区活跃度高;更新频率相当高

缺点:商业版需要收费,学习成本较高

1.4、MQ的选择

1.4.1、Kafka

Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。大型公司建议可以选用,如果有日志采集功能,肯定是首选kafka了。

1.4.2、RocketMQ

天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况。RoketMQ在稳定性上可能更值得信赖,这些业务场景在阿里双11已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择RocketMQ.

1.4.3、RabbitMQ

结合erlang语言本身的并发优势,性能好时效性微秒级,社区活跃度也比较高,管理界面用起来十分方便,如果你的数据量没有那么大,中小型公司优先选择功能比较完备的RabbitMQ.

2、RabbitMQ

2.1、RabbitMQ概念

RabbitMQ是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑RabbitMQ是一个快递站,一个快递员帮你传递快件。RabbitMQ.与快递站的主要区别在于,它不处理快件而是接收,存储和转发消息数据。

2.2、四大核心概念

  1. 生产者:产生数据发送消息的程序是生产者
  2. 交换机:交换机是RabbitMQ.非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定
  3. 队列:队列是RabbitMQ内部使用的一种数据结构,尽管消息流经RabbitMQ.和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式
  4. 消费者:消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。

2.3、RabbitMQ核心部分

RabbitMQ
1.简单模式
2.工作模式
3.发布/订阅模式
4.路由模式
5.主题模式
6.发布确认模式

2.4、各个名词介绍

Producer生产者,Exchange交换机,Consumer消费者,Channel信道Queue队列

RabbitMQ
Broker:接收和分发消息的应用,RabbitMQ Server就是Message Broker。
Virtual host:出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMO server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange / queue 等。
Connection: publisher / consumer和broker之间的TCP连接。
Channel:如果每一次访问 RabbitMQ 都建立一个Connection,在消息量大的时候建立TCP,Connection的开销将是巨大的,效率也较低。Channel是在connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id 帮助客户端和message broker识别 channel,所以channel之间是完全隔离的。Channel作为轻量级的。
Connection极大减少了操作系统建立TCP connection的开销
Exchange: message 到达 broker 的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue 中去。常用的类型有: direct (point-to-point), topic (publish-subscribe) and fanout
( multicast)。
Queue:消息最终被送到这里等待consumer取走。
Binding: exchange和queue之间的虚拟连接,binding 中可以包含routing key,Binding 信息被保存到exchange 中的查询表中,用于message的分发依据。

3、Hellow World

在下图中,“P”是我们的生产者, “C”是我们的消费者。中间的框是一个队列-RabbitMQ代表使用者保留的消息缓冲区
RabbitMQ
创建一个空目录,添加moduls选择maven项目
添加依赖

 <dependencies>
<!--        rabbitmq客户端--><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version></dependency>
<!--        操作文件流--><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.6</version></dependency></dependencies><!--    指定jdk编译版本--><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build>
package com.zmc.mq.one;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 消息生产者:发消息*/
public class Producer {/*队列名称*/public static final String QUEUE_NAME = "HELLOW";/*发消息*/public static void main(String[] args) throws IOException, TimeoutException {/*创建连接工厂*/ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");/*创建连接*/Connection connection = connectionFactory.newConnection();/*获取信道*/Channel channel = connection.createChannel();/*** 生成一个队列* @params1 队列名称* @params2 队列里的消息是否持久化(磁盘),默认消息存储在内存中* @params3 该队列是否只供一个消费者进行消费是否进行消息共享,true:可以多个消费者消费;false:只能一个消费者消费* @params4 是否自动删除 最后一个消费者端开连接后该队列是否自动删除,true自动删除,false反之* @params5 其他参数*/channel.queueDeclare(QUEUE_NAME,false,false,false,null);/*发消息*/String message="Hellow World !This is a message!";/*** @params1 发送到哪个交换机* @params2 路由key值是哪个(本次是队列名称)* @params3 其他参数信息* @params4 发送消息的消息体*/channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("消息发送success!");}
}
package com.zmc.mq.one;import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/*** 消费者 接收消息*/
public class Consumer {/*队列名称*/public static final String QUEUE_NAME = "HELLOW";/*接收消息*/public static void main(String[] args) throws IOException, TimeoutException {/*创建连接工厂*/ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");/*创建连接*/Connection connection = connectionFactory.newConnection();/*获取信道*/Channel channel = connection.createChannel();/*声明 ;接收消息*/DeliverCallback deliverCallback=(consumerTag,message)->{System.out.println(message);System.out.println(new String(message.getBody()));};/*取消消息时的回调*/CancelCallback cancelCallback=consumerTag->{System.out.println("消息消费被中断");};/*** 消费者消费消息* @params1 消费哪个队列* @params2 消费成功后是否自动应答;true代表自动应答 false代表手动应答* @params3 消费者未成功消费的回调* @params4 消费者取录消费的回调*/channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}

4、Work Queues

工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务

4.1、轮询发消息

package com.zmc.mq.two;
import com.rabbitmq.client.Channel;
import com.zmc.mq.utils.RabbitMqUtils;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
/*** 消息生产者*/
public class TaskProducer01 {/*队列名称*/public static final String QUEUE_NAME = "HELLOW";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();/*** 声明一个队列* @params1 队列名称* @params2 队列里的消息是否持久化(磁盘),默认消息存储在内存中* @params3 该队列是否只供一个消费者进行消费是否进行消息共享,true:可以多个消费者消费;false:只能一个消费者消费* @params4 是否自动删除 最后一个消费者端开连接后该队列是否自动删除,true自动删除,false反之* @params5 其他参数*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);/*发消息*/String message = "";Scanner scanner=new Scanner(System.in);while (scanner.hasNext()){message=scanner.next();/*** @params1 发送到哪个交换机* @params2 路由key值是哪个(本次是队列名称)* @params3 其他参数信息* @params4 发送消息的消息体*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("发达!");}}
}
package com.zmc.mq.two;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.zmc.mq.utils.RabbitMqUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 这是一个工资线程 (相当于之前的消费者)*/
public class WorkThread01 {/*队列名称*/public static final String QUEUE_NAME = "HELLOW";public static void main(String[] args) throws IOException, TimeoutException {DeliverCallback deliverCallback=(consumerTag,message)->{System.out.println("接收到消息:"+new String(message.getBody()));};CancelCallback cancelCallback = var1->{System.out.println(var1+"消费者取消消费接口回调逻辑");};Channel channel = RabbitMqUtils.getChannel();/*** 消费者消费消息** @params1 消费哪个队列* @params2 是否自动应答(应答给消息生产者)* @params3 消费回调* @params4 取消消息回调*/System.out.println("Thread3等待接收消息。。。。。");channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}

4.2、消息应答

4.2.1、概念

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况。RabbitMQ一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费这的消息,因为它无法接收到。
为了保证消息在发送过程中不丢失,rabbitmq引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉rabbitmq它已经处理了,rabbitmq可以把该消息删除了。`

消费者处理一个任务可能需要很长时间,如果消费者在处理一个消息时消息突然挂掉了,而消息生产者将消息发送出去后就会将消息标记为删除此时就会出现消息丢失的情况

4.2.2、自动应答

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者channel关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。

boolean autoAck = true;  true自动应答,false手动应答
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);

4.2.3、消息应答的方法(手动应答)

  1. Channel.basicAck(用于肯定确认)RabbitMQ已知道该消息并且成功的处理消息,可以将其丢弃了`
  2. Channel. basicNack(用于否定确认)
  3. Channel. basicReject(用于否定确认)与Channel.basicNack_相比少一个参数不处理该消息了直接拒绝,可以将其丢弃了

4.2.4、Multiple的解释

手动应答的好处是可以批量应答并且减少网络拥堵

channel.basicAck(deliveryTag,true);//第二个参数表示是否批量应答

multiple 的 true和 false代表不同意思
true代表批量应答channel上未应答的消息
比如说channel上有传送tag的消息.5,6,7,8 当前tag是8那么此时5-8的这些还未应答的消息都会被确认收到消息应答
false同上面相比
只会应答tag=8的消息、5,6,7这三个消息依然不会被确认收到消息应答

false批量应答只应答已经成功处理完的消息,true只要消息发送出去就会应答
RabbitMQ
RabbitMQ

4.2.5、消息自动重新入列

保证消息不丢失使用消息自动重新入列

如果将消息发送给两个消息消费者其中一个消费者挂掉了失去连接,为保证消息不丢失就会将消息重新入列给另个一消费者消费

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或TCP连接丢失),导致消息未发送ACK确认,RabbitMQ将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。
RabbitMQ

4.2.6、消息手动应答代码

默认消息采用的是自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改为手动应答,消费者在上面代码的基础上增加下面画红色部分代码。

package com.zmc.mq.three;import com.rabbitmq.client.Channel;
import com.zmc.mq.utils.RabbitMqUtils;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;/*** 消息在手动应答时不丢失、放回队列重新消费*/
public class TaskProducer02 {public static final String TASK_QUEUE_NAME = "ack_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();//声明一个队列/*** 队列名,是否持久化磁盘,是否共享,是否自动删除,其他参数*/channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);Scanner scanner=new Scanner(System.in);while (scanner.hasNext()){String str = scanner.next();/*** 消息发布* 交换机,队列名,其他参数,消息体*/channel.basicPublish("",TASK_QUEUE_NAME,null,str.getBytes("UTF-8"));System.out.println("消息发送successful!");}}
}
package com.zmc.mq.three;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.zmc.mq.utils.RabbitMqUtils;
import com.zmc.mq.utils.ThreadUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 消息手动应答时不丢失,放回队列中重新消费*/
public class Consumer01 {public static final String TASK_QUEUE_NAME = "ack_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();System.out.println("C1等待接收消息处理时间较短");DeliverCallback deliverCallback = (var1, var2) -> {//睡眠一秒ThreadUtils.sleep(1);System.out.println("接收到的消息:" + new String(var2.getBody(), "UTF-8"));//手动应答/*** @params1 消息的标记 tag* params2 是否批量应答false,批量应答可能会造成消息的丢失*/channel.basicAck(var2.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback = var1 -> {System.out.println("消费者取消消费"+var1);};boolean autoAck = false;channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);}
}
package com.zmc.mq.three;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.zmc.mq.utils.RabbitMqUtils;
import com.zmc.mq.utils.ThreadUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 消息手动应答时不丢失,放回队列中重新消费*/
public class Consumer02 {public static final String TASK_QUEUE_NAME = "ack_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();System.out.println("C2等待接收消息处理时间较短");DeliverCallback deliverCallback = (var1, var2) -> {//睡眠20秒 关闭消费者便于展示重新入列ThreadUtils.sleep(20);System.out.println("接收到的消息:" + new String(var2.getBody(), "UTF-8"));//手动应答/*** @params1 消息的标记 tag* params2 是否批量应答false,批量应答可能会造成消息的丢失*/channel.basicAck(var2.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback = var1 -> {System.out.println("消费者取消消费"+var1);};boolean autoAck = false;channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);}
}

4.2.7、手动应答效果展示

RabbitMQ

4.3、RabbitMQ持久化

4.3.1、概念

刚刚我们已经看到了如何处理任务不丢失的情况,但是如何保障当RabbitMQ服务停掉以后消息生产者发送过来的消息不丢失。默认情况下RabbitMQ退出或由于某种原因崩溃时,它忽视队列和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化。

4.3.2、队列如何实现持久化,

队列持久化即使MQ宕机了也不至于队列消失可以避免消息的丢失

之前我们创建的队列都是非持久化的,rabbitmq如果重启的化,该队列就会被删除掉,如果要队列实现持久化需要在声明队列的时候把 durable参数设置为持久化

声明队列第二个参数为true
RabbitMQ
但是需要注意的就是如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新创建一个持久化的队列,不然就会出现错误
RabbitMQ
以下是控制台中持久化与非持久化队列的区别
RabbitMQ

4.3.3、消息实现持久化

要想让消息实现持久化需要在消息生产者修改代码,MessageProperties.PERSISTENT_TEXT_PLAIN添加这个属性。
RabbitMQ
将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉RabbitMQ将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。如果需要更强有力的持久化策略,参考后边课件发布确认章节。

4.3.4、不公平分发

在最开始的时候我们学习到RabbitMQ分发消息采用的轮训分发,但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理任务,其中有个消费者1处理任务的速度非常快,而另外一个消费者2处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是RabbitMQ并不知道这种情况它依然很公平的进行分发。

为了避免这种情况,我们可以设置参数channel.basicQos(1);

参数为0(默认就是0)轮询发消息,类似轮询负载均衡算法;参数为1时是不公平分发,谁的效率高谁处理的消息也就多;参数比1时就是预取值类似基于权重的负载均衡算法
RabbitMQ
RabbitMQ
效果展示
RabbitMQ
RabbitMQ

4.3.5、预取值

类似基于权重的负载均衡算法(加权)

本身消息的发送就是异步发送的,所以在任何时候,channel上肯定不止只有一个消息另外来自消费者的手动确认本质上也是异步的。因此这里就存在一个未确认的消息缓冲区,因此希望开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。这个时候就可以通过使用basic.gos.方法设置“预取计数”值来完成的。该值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数量,RabbitMQ.将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认,例如,假设在通道上有未确认的消息5、6、7,8,并且通道的预取计数设置为4,此时RabbitMQ.将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被ack。比方说 tag=6这个消息刚刚被确认ACK,RabbitMQ将会感知这个情况到并再发送一条消息。消息应答和QoS.预取值对用户吞吐量有重大影响。通常,增加预取将提高向消费者传递消息的速度。虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的RAM消耗(随机存取存储器)应该小心使用具有无限预处理的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的内存消耗变大,所以找到合适的预取值是一个反复试验的过程,不同的负载该值取值也不同100到300范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。预取值为1是最保守的。当然这将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下,特别是在消费者连接等待时间较长的环境中,对于大多数应用来说,稍微高一点的值将是最佳的。
RabbitMQ
RabbitMQ
RabbitMQ
RabbitMQ

5、发布确认

5.1、发布确认原理

生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID (从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker回传给生产者的确认消息中delivery-tag域包含了确认消息的序列号,此外 broker也可以设置basic. ack的multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。

confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack,消息,生产者应用程序同样可以在回调方法中处理该nack消息。

发布确认的目的是避免消息的丢失。前面虽然设置了队列和消息的持久化但是也可能出现消息丢失的情况,因为将队列保存在磁盘上才能实现持久化的目的,如果消息只是传送到队列还没保存到磁盘上就宕机了也是会造成消息的丢失。所以我们可以使用发布确认当消息保存到磁盘里的时候MQ告诉生产者消息已经保存到磁盘上了,此时我们可以非常肯定的确保消息不会丢失。

5.2、发布确认的策略

5.2.1、开启发布确认的方法

发布确认默认是没有开启的,如果要开启需要调用方法 confirmSelect,每当你要想使用发布确认,都需要在channel上调用该方法
RabbitMQ

5.2.2、单个确认发布

这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布, waitForConfirmsOrDie(long)这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。

这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某些应用程序来说这可能已经足够了。

发布一条就确认一条,缺点发布速度特别的慢,优点出问题立马知道哪个消息出问题
RabbitMQ
RabbitMQ

5.2.3、批量确认发布

上面那种方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。

一次性确认全部的批量,优点发布速度快,缺点消息出现问题的时候不知道具体哪个除了问题
RabbitMQ
RabbitMQ

5.2.4、异步确认发布

异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功,下面就让我们来详细讲解异步确认是怎么实现的。

它会为消息打上序号集合了单个确认发布具体到哪个消息出问题的优点和批量发布确认速度快的优点。采用异步批量的方式给确认收到消息给生产者一个确认收到的回调,未确认收到的消息给生产者一个未确认收到的回调可以进行重新发送

异步发布确认就是先将消息发送出去,后续对这些消息进行监听同时还会为这些消息打上标记便于回调时知道具体哪些消息接收成功,哪些消息接收失败
RabbitMQ
RabbitMQ
打印这些消息序号(标记)的时候消息已经发送完成了,只是在监听后续的监听操作
RabbitMQ

5.2.5、如何处理异步未确认消息

最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用ConcurrentLinkedQueue这个队列在confirm callbacks与发布线程之间进行消息的传递。
RabbitMQ
RabbitMQ

6、交换机(Publish/Subscribe)

RabbitMQ
在上一节中,我们创建了一个工作队列。我们假设的是工作队列背后,每个任务都恰好交付给一个消费者(工作进程)。在这一部分中,我们将做一些完全不同的事情-我们将消息传达给多个消费者。这种模式称为发布/订阅".

为了说明这种模式,我们将构建一个简单的日志系统。它将由两个程序组成:第一个程序将发出日志消息,第二个程序是消费者。其中我们会启动两个消费者,其中一个消费者接收到消息后把日志存储在磁盘,

另外一个消费者接收到消息后把消息打印在屏幕上,事实上第一个程序发出的日志消息将广播给所有消费者者

之前使用默认交换机发送的一个消息只能被一个消费者消费,现在使用交换机,交换机可以将消息给多队列供一条消息被多个消费者消费

6.1、Exchanges

6.1.1、Exchanges概念

RabbitMQ消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。

相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。
RabbitMQ

6.1.2、Exchanges的类型

  1. 直接(direct)也叫路由类型
  2. 主题(topic)
  3. 标题(headers)
  4. 扇出(fanout)

6.1.3、无名Exchanges

无名类型就是默认类型

在本教程的前面部分我们对exchange一无所知,但仍然能够将消息发送到队列。之前能实现的原因是因为我们使用的是默认交换,我们通过空字符串(" ")进行标识。

写法

channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

第一个参数是交换机的名称。空字符串表示默认或无名称交换机:消息能路由发送到队列中其实是由routingKey(bindingkey)绑定key指定的,如果它存在的话

6.2、临时队列

之前的章节我们使用的是具有特定名称的队列(还记得hello和ack _queue吗? )。队列的名称我们来说至关重要-我们需要指定我们的消费者去消费哪个队列的消息。

每当我们连接到Rabbit时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连接,队列将被自动删除

没有持久化的队列即临时队列,主要看Features有没有D字母标记

创建方式

String queueName=channle.queueDeclare().getQueue();

创建出来的样子
RabbitMQ

6.3、绑定(binding)

什么是bingding.呢, binding其实是exchange和queue之间的桥梁,它告诉我们exchange和那个队列进行了绑定关系。比如说下面这张图告诉我们的就是×与Q1和Q2进行了绑定
RabbitMQ

6.4、Fanout

6.4.1、Fanout介绍

Fanout这种类型非常简单。正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的所有队列中。系统中默认有些exchange类型

扇出模式,一人发多人接收。类似群聊一个发消息多个人可以接收到
RabbitMQ

6.4.2、Fanout实战

扇出的routingkey(路由key是一致的)

在第五个package

生产者发送消息到交换机,交换机将消息绑定到两个队列被两个消费者消费
RabbitMQ
RabbitMQ
RabbitMQ
RabbitMQ
RabbitMQ

6.5、Direct exchange

直接也叫路由模式

如果routingkey(路由key)不一致就是直接模式

6.5.1、回顾

在上一节中,我们构建了一个简单的日志记录系统。我们能够向许多接收者广播日志消息。在本节我们将向其中添加一些特别的功能-比方说我们只让某个消费者订阅发布的部分消息。例如我们只把严重错误消息定向存储到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。

我们再次来回顾一下什么是 bindings,绑定是交换机和队列之间的桥梁关系。也可以这么理解:队列只对它绑定的交换机的消息感兴趣。绑定用参数: routingKey.来表示也可称该参数为binding key,创建绑定我们用代码:channel.queueBind(queueName,EXCHANGE_NAME,“routingKey”);绑定之后的意义由其交换类型决定。

6.5.2、Direct exchange介绍

RabbitMQ
在上面这张图中,我们可以看到X绑定了两个队列,绑定类型是direct。队列Q1绑定键为orange,队列Q2绑定键有两个:一个绑定键为black,另一个绑定键为green.

在这种绑定情况下,生产者发布消息到exchange上,绑定键为orange的消息会被发布到队列Q1。绑定键为blackgreen.和的消息会被发布到队列Q2,其他消息类型的消息将被丢弃。

6.5.3、多重绑定

RabbitMQ
当然如果exchange的绑定类型是direct,但是它绑定的多个队列的key如果都相同,在这种情况下虽然绑定类型是direct但是它表现的就和fanout,有点类似了,就跟广播差不多,如上图所示。

总结根据路由key交换机将消息发送到指定的队列,如果路由key相同也就是多重绑定 就和扇出模式类似

6.5.4、实战

RabbitMQ
RabbitMQ
RabbitMQ
RabbitMQ
RabbitMQ

6.6、Topics

主题类型

6.6.1、之前类型的问题

在上一个小节中,我们改进了日志记录系统。我们没有使用只能进行随意广播的 fanout,交换机,而是使用了direct交换机,从而有能实现有选择性地接收日志。

尽管使用direct交换机改进了我们的系统,但是它仍然存在局限性-比方说我们想接收的日志类型有info.base和info.advantage,某个队列只想info.base的消息,那这个时候direct就办不到了。这个时候就只能使用topic类型

6.6.2、Topic的要求

发送到类型是topic交换机的消息的routing_key不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说: “stock.usd.nyse” , “nyse.vmw”,
“quick.orange.rabbit”.这种类型的。当然这个单词列表最多不能超过255个字节。

在这个规则列表中,其中有两个替换符是大家需要注意的

  • *(星号)可以代替一个单词
  • #(并号)可以替代零个或多个单词

6.6.3、Topic匹配案例

下图绑定关系如下

Q1–>绑定的是
中间带orange带3个单词的字符串(*.orange.*)

Q2–>绑定的是
最后—个单词是rabbit的3个单词(*.*.rabbit)
第—个单词是lazy的多个单词(lazy.#)
RabbitMQ
RabbitMQ
上图是一个队列绑定关系图,我们来看看他们之间数据接收情况是怎么样的

quick.orange.rabbit 被队列Q1Q2接收到
lazy.orange.elephant 被队列Q1Q2接收到 quick.orange.fox 被队列Q1接收到
lazy.brown.fox 被队列Q2接收到
lazy.pink.rabbit 虽然满足两个绑定但只被队列Q2接收一次 quick.brown.fox 不匹配任何绑定不会被任何队列接收到会被丢弃 quick.orange.male.rabbit 是四个单词不匹配任何绑定会被丢弃 lazy.orange.male.rabbit 是四个单词但匹配Q2

当队列绑定关系是下列这种情况时需要引起注意 当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像fanout了 如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是direct了 可以说主题交换机包含扇出交换机和直接交换机

6.6.4、实战

RabbitMQ
RabbitMQ
RabbitMQ
RabbitMQ

7、死信队列

7.1、死信队列的概念

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer将消息投递到broker或者直接到queue里了,consumer 从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

应用场景:为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

应用场景当消息消费发生异常时将消息投入到死信队列防止消息丢失,比如说用户正在支付时取消了支付,会将支付时发送的消息放入死信队列;目的是等后续环境好了之后再把死信队列中的消息进行消费;

7.2、死信队列的来源

  • 消息TTL过期 (TTL指存活时间;比如一个消息存活10秒钟,在10秒内没有被消费视认为该消息过期了)
  • 队列达到最大长度(队列满了,无法再添加数据到mq.中)
  • 消息被拒绝(basic.reject或 basic.nack)并且requeue=false.(requeue=false指不放回队列中)

这些场景都会产生死信队列;将这些场景的消息转移到死信队列中以方便后续处理

7.3、死信队列实战

7.3.1、代码结构图

RabbitMQ

7.3.2、消息TTL过期

RabbitMQ
RabbitMQ
RabbitMQ
生产者发送消息给消费者01并为消息设置过期时间10秒,01在10秒内未接收消息这些消息会成为死信消息,然后01将死信消息转发给消费者02.
RabbitMQ

7.3.3、队列达到最大长度

设置队列最大长度是5,其他多余的消息会成为死信队列里的消息
RabbitMQ
RabbitMQ
​ 由此可见生产者发送10条消息,由于消费者队列最大长度是5,所以多余的消息转发给死信队列进行消费

7.3.4、消息被拒

RabbitMQ
RabbitMQ
RabbitMQ

8、延迟队列

延时队列也是基于死信队列的一种(TTL)

8.1、延迟队列的概念

延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

8.2、延迟队列使用场景

1.订单在十分钟之内未支付则自动取消
⒉新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒
3.用户注册成功后,如果三天内没有登陆则进行短信提醒。 4.用户发起退款,如果三天内没有得到处理则通知相关运营人员。
5.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,
如:
发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?如果数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求,如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:“订单十分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。

8.3、整合springboot

<dependency> <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--        rabbitmq客户端--><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version></dependency>
spring:rabbitmq:host: 127.0.0.1port: 5672     #不可用15672username: guestpassword: guest

8.4、队列TTL

8.4.1、代码架构图

创建两个队列QA和QB,两者队列TTL分别设置为10S和40S,然后在创建一个交换机X和死信交换机Y,它们的类型都是direct,创建一个死信队列QD,它们的绑定关系如下:
RabbitMQ

8.5.2、配置文件类代码

RabbitMQ
RabbitMQ

8.4.3、消息生产者代码

RabbitMQ

8.4.4、消息消费者代码

RabbitMQ
RabbitMQ
第一条消息在10S后变成了死信消息,然后被消费者消费掉,第二条消息在40S之后变成了死信消息,然后被消费掉,这样一个延时队列就打造完成了。

不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有10S和40S两个时间选项,如果需要一个小时后处理,那么就需要增加TTL为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?

8.5、延时队列优化

8.5.1、代码架构图

在这里新增了一个队列QC,绑定关系如下,该队列不设置TTL时间
RabbitMQ

8.5.2、配置文件类代码

RabbitMQ
RabbitMQ

8.5.3、生产者代码

RabbitMQ
RabbitMQ

8.6、RabbitMQ插件实现延迟队列

Installing on Windows — RabbitMQ

9、发布确认高级

在生产环境中由于一些不明原因,导致 rabbitmq重启,在RabbitMQ重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行RabbitMQ的消息可靠投递呢?特别是在这样比较极端的情况,RabbitMQ集群不可用的时候,无法投递的消息该如何处理呢:

9.1、发布确认SpringBoot版本

9.1.1、确认机制方案

RabbitMQ

9.1.2、代码架构图

RabbitMQ

9.1.3、配置文件

RabbitMQ
SIMPLE会发布一条确认一条消息
RabbitMQ

9.1.4、添加配置类

RabbitMQ

9.1.5、消息生产者

RabbitMQ

9.1.6、回调接口

消息发送之后,无论成功与否都会回调
RabbitMQ
RabbitMQ
接收到消息,效果
RabbitMQ

接收不到的效果
RabbitMQ
如果把路由key改错,交换机能接收到消息,队列就收不到消息
RabbitMQ

9.1.7、消息消费者

RabbitMQ

9.2、回退消息

解决消息只到达交换机,而没有到达队列导致消息丢失

Mandatory 参数

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。那么如何让无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊。通过设置mandatory参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。
RabbitMQ
RabbitMQ
RabbitMQ
RabbitMQ

9.3、备份交换机

有了mandatory 参数和回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理。但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。而且设置 mandatory 参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?前面在设置死信队列的文章中,我们提到,可以为队列设置死信交换机来存储那些处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。在RabbitMQ中,有一种备份交换机的机制存在,可以很好的应对这个问题。什么是备份交换机呢?备份交换机可以理解为RabbitMQ中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为Fanout,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。

9.3.1、代码架构图

RabbitMQ
给交换机一个备机。如果生产者无法把消息投递给交换机,则让交换机把消息投递给备份交换机,也能达到消息不丢失的目的.并且还有一个好处可以对消息进行监测和报警

9.3.2、修改配置类

RabbitMQ
RabbitMQ
RabbitMQ
mandatory参数与备份交换机可以一起使用的时候,如果两者同时开启,消息究竟何去何从?谁优先级高,经过上面结果显示答案是备份交换机优先级高

10、RabbitMQ其他知识点

10.1、幂等性

10.1.1、概念

用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成了两条。在以前的单应用系统中,我们只需要把数据操作放入事务中即可,发生错误立即回滚,但是再响应客户端的时候也有可能出现网络中断或者异常等等

简单的说就是被重复消费了

10.1.2、消息重复消费

消费者在消费MQ中的消息时,MQ已把消息发送给消费者,消费者在给MQ返回ack.时网络中断,故MQ未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。

10.1.3、解决思路

MQ消费者的幂等性的解决一般使用全局ID或者写个唯一标识比如时间戳或者UUID或者订单消费者消费MQ中的消息也可利用MQ的该id来判断,或者可按自己的规则生成一个全局唯一id,每次消费消息时用该id先判断该消息是否已消费过。

10.1.4、消费端的幂等性保障

在海量订单生成的业务高峰期,生产端有可能就会重复发生了消息,这时候消费端就要实现幂等性,这就意味着我们的消息永远不会被消费多次,即使我们收到了一样的消息。业界主流的幂等性有两种操作:a.唯一ID+指纹码机制,利用数据库主键去重, b.利用redis.的原子性去实现

10.1.5、唯一ID+指纹码机制(解决方案)

指纹码:我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个id是否存在数据库中,优势就是实现简单就一个拼接,然后查询判断是否重复;劣势就是在高并发时,如果是单个数据库就会有写入性能瓶颈当然也可以采用分库分表提升性能,但也不是我们最推荐的方式。

10.1.6、Redis原子性(解决方案 推荐)

利用redis.执行setnx.命令,天然具有幂等性。从而实现不重复消费

10.2、优先级队列

10.2.1、使用场景

在我们系统中有一个订单催付的场景,我们的客户在天猫下的订单,淘宝会及时将订单推送给我们,如果在用户设定的时间内未付款那么就会给用户推送一条短信提醒,很简单的一个功能对吧,但是,tmall商家对我们来说,肯定是要分大客户和小客户的对吧,比如像苹果,小米这样大商家一年起码能给我们创造很大的利润,所以理应当然,他们的订单必须得到优先处理,而曾经我们的后端系统是使用redis,来存放的定时轮询,大家都知道redis.只能用List做一个简简单单的消息队列,并不能实现一个优先级的场景,所以订单量大了后采用RabbitMQ进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级,否则就是默认优先级。

由于消息是被排队消费的先来后到的原则,使用优先级打破先来后到的原则 使优先级高的先被消费

10.2.2、如何添加

1.在可视化页面上添加
RabbitMQ
2.队列中代码添加优先级

 @Bean("queue")public Queue queue(){Map<String, Object> arguments=new HashMap<>();/*官方允许是0-255,此处设置10,过大优先级会占用CPU与内存*/arguments.put("x-max-priority",10);return QueueBuilder.durable("priority_queue").withArguments(arguments).build();}

3.消息中代码添加优先级

    @GetMapping("/sendMsgPriority")public void sendMsgPriority(){String msg="消息:";for (int i = 1; i <= 6; i++) {if (i==5){Message message= MessageBuilder.withBody((msg+i).getBytes()).setPriority(5).build();rabbitTemplate.convertAndSend("priority_exchange","vip.aa",message);}rabbitTemplate.convertAndSend("priority_exchange","vip.aa",msg+i);}}

4.注意

要让队列实现优先级需要做的事情有如下事情:队列需要设置为优先级队列,消息需要设置消息的优先级,消费者需要等待消息已经发送到队列中才去消费因为,这样才有机会对消息进行排序

10.3、惰性队列

10.3.1、使用场景

RabbitMQ 从3.6.0版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关i闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。

默认情况下,当生产者将消息发送到RabbitMQ.的时候,队列中的消息会尽可能的存储在内存之中,这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当RabbitMQ需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。虽然RabbitMQ的开发者们一直在升级相关的算法,但是效果始终不太理想,尤其是在消息量特别人的时候。

惰性队列使用场景是在消费者宕机的情况下将队列保存在磁盘中,但是惰性队列执行性能较差

10.3.2、两种模式

队列具备两种模式: default和lazy。默认的为default模式,在3.6.0之前的版本无需做任何变更。lazy模式即为惰性队列的模式,可以通过调用channel.queueDeclare方法的时候在参数中设置,也可以通过Policy的方式设置,如果一个队列同时使用这两种方式设置的话,那么Policy的方式具备更高的优先级。如果要通过声明的方式改变已有队列的模式的话,那么只能先删除队列,然后再重新声明一个新的。

在队列声明的时候可以通过"x-queue-mode"参数来设置队列的模式,取值为"default"和"lazy"。下面示例中演示了一个惰性队列的声明细节:

Map<String, Object> args = new HashMap<String,Object>();

args.put(“x-queue-mode” , “lazy”);

channel.queueDeclare(“myqueue”, false, false, false, args);

10.3.3、内存开销对比

RabbitMQ

11、RabbitMQ集群

11.1、clustering

11.1.1、使用集群的原因

最开始我们介绍了如何安装及运行RabbitMQ服务,不过这些是单机版的,无法满足目前真实应用的要求。如果RabbitMQ服务器遇到内存崩溃、机器掉电或者主板故障等情况,该怎么办?单台RabbitMO服务器可以满足每秒1000条消息的吞吐量,那么如果应用需要RabbitMQ服务满足每秒10万条消息的吞

吐量呢?购买昂贵的服务器来增强单机 RabbitMQ 务的性能显得捉襟见肘,搭建一个RabbitMQ集群才是解决实际问题的关键.

11.2、镜像队列

使用镜像队列的原因

如果RabbitMQ.集群中只有一个Broker节点,那么该节点的失效将导致整体服务的临时性不可用,并且也可能会导致消息的丢失。可以将所有消息都设置为持久化,并且对应队列的durable属性也设置为true,但是这样仍然无法避免由于缓存导致的问题:因为消息在发送之后和被写入磁盘井执行刷盘动作之间存在一个短暂却会产生问题的时间窗。通过publisherconfirm机制能够确保客户端知道哪些消息己经存入磁盘,尽管如此,一般不希望遇到因单点故障导致的服务不可用。

引入镜像队列(Mirror Queue)的机制,可以将队列镜像到集群中的其他Broker节点之上,如果集群中的一个节点失效了,队列能自动地切换到镜像中的另一个节点上以保证服务的可用性。

11.4、Federation Exchange

使用原因

(broker北京),(broker深圳)彼此之间相距甚远,网络延迟是一个不得不面对的问题。有一个在北京的业务(Client北京)需要连接(broker北京),向其中的交换器exchangeA发送消息,此时的网络延迟很小,(Client北京)可以迅速将消息发送至exchangeA.中,就算在开启了publisherconfirm.机制或者事务机制的情况下,也可以迅速收到确认信息。此时又有个在深圳的业务(Client深圳)需要向exchangeA发送消息,那么(Client深圳)(broker北京)之间有很大的网络延迟,(Client深圳)将发送消息至exchangeA会经历一定的延迟,尤其是在开启了publisherconfirm.机制或者事务机制的情况下,(Client深圳)会等待很长的延迟时间来接收(broker北京)的确认信息,进而必然造成这条发送线程的性能降低,甚至造成一定程度上的阻塞。

将业务(Client深圳)部署到北京的机房可以解决这个问题,但是如果(Client深圳)调用的另些服务都部署在深圳,那么又会引发新的时延问题,总不见得将所有业务全部部署在一个机房,那么容灾又何以实现?这里使用Federation插件就可以很好地解决这个问题.

11.5、Federation queue

联邦队列可以在多个Broker节点(或者集群)之间为单个队列提供均衡负载的功能。一个联邦队列可以连接一个或者多个上游队列(upstream queue),并从这些上游队列中获取消息以满足本地消费者消费消息的需求。
RabbitMQ

11.6、Shovel

使用它的原因

Federation具备的数据转发功能类似,Shovel够可靠、持续地从一个Broker中的队列(作为源端,即
source)拉取数据并转发至另一个Broker中的交换器(作为目的端,即destination)。作为源端的队
为目的端的交换器可以同时位于同一个Broker,也可以位于个问F broRer rLl行为就像优秀的客户端应用是一种比较形象的比喻,这个"铲子"可以将消息从一方"铲子"另一方。Shovel行为就像优秀的客户端应用
程序能够负责连接源和目的地、负责消息的读写及负责连接失败问题的处理。
RabbitMQ

毕业设计范文