> 文章列表 > 你还还还没学会RabbitMQ?-----------RabbitMQ详解及快速入门(工作模式)

你还还还没学会RabbitMQ?-----------RabbitMQ详解及快速入门(工作模式)

你还还还没学会RabbitMQ?-----------RabbitMQ详解及快速入门(工作模式)

你像天外来物一样,求之不得(咳咳,指offer)🌹

文章目录

    • 什么是MQ?
    • MQ的优势与劣势
    • 使用MQ需要满足的条件
    • 常见的MQ产品
    • 关于RabbitMQ
    • 生产者
    • 消费者
    • 工作模式
    • 订阅模式
    • 路由模式
    • 通配符模式

什么是MQ?

Message Queue 消息队列,是在消息的传输过程中保存消息的容器,多用于分布式系统之间通信。
你还还还没学会RabbitMQ?-----------RabbitMQ详解及快速入门(工作模式)

MQ的优势与劣势

优势:
应用解耦:比如一个订单系统需要和库存系统、支付系统等配合起来才能完成支付操作,如果库存系统挂了,那么订单系统也不能正常工作了;但是引入MQ之后,即使库存系统发生故障,只需要将订单系统要求库存系统执行的操作保存到MQ中就可以了,不会影响到订单系统。

异步提速:如果两个系统直接耦合,那么所执行的操作都是同步的,如果涉及到多个系统,那么同步就会耗费较多的时间,加入MQ之后,可以实现消息的异步发送,达到一个提速的作用,给客户一个较好的体验。

削峰填谷:在用户请求量很大的情况下,一个系统所容纳的请求量又是有限的,在这种情况下,可以引入MQ,MQ的作用就是承载用户请求,起到一个缓冲的作用。

劣势:
系统可用性降低:系统引入的外部依赖越多,系统的稳定性就越差。引入MQ之后需要保证MQ的高可用。

系统复杂度提高:需要保证消息传递的顺序性以及消息不会被重复消费。

一致性问题:通过MQ给多个系统发送消息,有的系统处理数据成功,而有的处理数据失败,如何保证数据的一致性问题。

使用MQ需要满足的条件

生产者不需要从消费者获得反馈; 容许短暂的不一致; 解耦、提速等方面的收益要超过加入、管理MQ的成本

常见的MQ产品

你还还还没学会RabbitMQ?-----------RabbitMQ详解及快速入门(工作模式)

关于RabbitMQ

基础架构:
你还还还没学会RabbitMQ?-----------RabbitMQ详解及快速入门(工作模式)

六种工作模式:简单模式、workqueues、发布订阅模式、路由模式、主题模式、远程调用模式

生产者

package com.thorn.producer;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer_HelloWorld {public static void main(String[] args) throws IOException, TimeoutException {
//        创建连接工厂ConnectionFactory factory = new ConnectionFactory();
//    设置参数factory.setVirtualHost("/thorns");factory.setUsername("lwj");factory.setPassword("lwj");
//        创建链接Connection connection = factory.newConnection();
//        创建channelChannel channel = connection.createChannel();
//        创建队列channel.queueDeclare("hello_world",true,false,false,null);String body = "hello rabbitmq";
//        发送消息channel.basicPublish("","hello_world",null,body.getBytes());
//          释放资源channel.close();connection.close();}
}

运行上述程序之后,就能看到消息队列了。
你还还还没学会RabbitMQ?-----------RabbitMQ详解及快速入门(工作模式)

消费者

package com.thorn.consumer;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_HelloWorld {public static void main(String[] args) throws IOException, TimeoutException {//        创建连接工厂ConnectionFactory factory = new ConnectionFactory();
//    设置参数factory.setVirtualHost("/thorns");factory.setUsername("lwj");factory.setPassword("lwj");
//        创建链接Connection connection = factory.newConnection();
//        创建channelChannel channel = connection.createChannel();
//        创建队列channel.queueDeclare("hello_world",true,false,false,null);//        接收消息Consumer consumer = new DefaultConsumer(channel){
//            回调方法,当收到消息后,会自动执行该方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(consumerTag  );System.out.println(envelope.getRoutingKey());System.out.println(envelope.getExchange());System.out.println(properties);System.out.println(new String(body));}};channel.basicConsume("hello_world",true,consumer);}
}

执行结果,拿到消息队列中的消息了。
你还还还没学会RabbitMQ?-----------RabbitMQ详解及快速入门(工作模式)
这是简单模式下的。

工作模式

工作队列模式:
你还还还没学会RabbitMQ?-----------RabbitMQ详解及快速入门(工作模式)
多个消费者共同监听一个生产者,主要适用于任务较多的情况。
生产者(一口气生产十条消息):

package com.thorn.producer;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer_WorkQueues {public static void main(String[] args) throws IOException, TimeoutException {
//        创建连接工厂ConnectionFactory factory = new ConnectionFactory();
//    设置参数factory.setVirtualHost("/thorns");factory.setUsername("lwj");factory.setPassword("lwj");
//        创建链接Connection connection = factory.newConnection();
//        创建channelChannel channel = connection.createChannel();
//        创建队列channel.queueDeclare("work_queues",true,false,false,null);for (int i = 0; i < 10; i++) {String body = i + "  hello work_queues~~~";
//        发送消息channel.basicPublish("","work_queues",null,body.getBytes());}//          释放资源channel.close();connection.close();}
}

消费者(需要创建两个):

package com.thorn.consumer;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_WorkQueues1 {public static void main(String[] args) throws IOException, TimeoutException {//        创建连接工厂ConnectionFactory factory = new ConnectionFactory();
//    设置参数factory.setVirtualHost("/thorns");factory.setUsername("lwj");factory.setPassword("lwj");
//        创建链接Connection connection = factory.newConnection();
//        创建channelChannel channel = connection.createChannel();
//        创建队列channel.queueDeclare("work_queues",true,false,false,null);//        接收消息Consumer consumer = new DefaultConsumer(channel){
//            回调方法,当收到消息后,会自动执行该方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:"+new String(body));}};channel.basicConsume("work_queues",true,consumer);}
}

第一个消费者:
你还还还没学会RabbitMQ?-----------RabbitMQ详解及快速入门(工作模式)
第二个消费者:
你还还还没学会RabbitMQ?-----------RabbitMQ详解及快速入门(工作模式)
可以看到,一共生产了十条消息,每个消费者消费五条消息。

订阅模式

引入了交换机,交换机去创建两个队列。
你还还还没学会RabbitMQ?-----------RabbitMQ详解及快速入门(工作模式)
生产者:

package com.thorn.producer;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer_Pubhub {public static void main(String[] args) throws IOException, TimeoutException {
//        创建连接工厂ConnectionFactory factory = new ConnectionFactory();
//    设置参数factory.setVirtualHost("/thorns");factory.setUsername("lwj");factory.setPassword("lwj");
//        创建链接Connection connection = factory.newConnection();
//        创建channelChannel channel = connection.createChannel();
//          创建交换机String exchangeName = "test_fanout";channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);//        创建队列String queue1Name = "test_fanout_queue1";String queue2Name = "test_fanout_queue2";channel.queueDeclare(queue1Name,true,false,false,null);channel.queueDeclare(queue2Name,true,false,false,null);//        绑定队列和交换机channel.queueBind(queue1Name,exchangeName,"");channel.queueBind(queue2Name,exchangeName,"");//        发送消息String body = "日志信息:张三调用方法findAll,日志级别为info";channel.basicPublish(exchangeName,"",null,body.getBytes());
//        释放资源channel.close();connection.close();}
}

消费者:

package com.thorn.consumer;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_PubSub1 {public static void main(String[] args) throws IOException, TimeoutException {//        创建连接工厂ConnectionFactory factory = new ConnectionFactory();
//    设置参数factory.setVirtualHost("/thorns");factory.setUsername("lwj");factory.setPassword("lwj");
//        创建链接Connection connection = factory.newConnection();
//        创建channelChannel channel = connection.createChannel();String queue1Name = "test_fanout_queue1";String queue2Name = "test_fanout_queue2";
//        接收消息Consumer consumer = new DefaultConsumer(channel){
//            回调方法,当收到消息后,会自动执行该方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("将日志信息打印到控制台:");System.out.println("body:"+new String(body));}};channel.basicConsume(queue1Name,true,consumer);}
}

你还还还没学会RabbitMQ?-----------RabbitMQ详解及快速入门(工作模式)

package com.thorn.consumer;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_PubSub2 {public static void main(String[] args) throws IOException, TimeoutException {//        创建连接工厂ConnectionFactory factory = new ConnectionFactory();
//    设置参数factory.setVirtualHost("/thorns");factory.setUsername("lwj");factory.setPassword("lwj");
//        创建链接Connection connection = factory.newConnection();
//        创建channelChannel channel = connection.createChannel();String queue1Name = "test_fanout_queue1";String queue2Name = "test_fanout_queue2";
//        接收消息Consumer consumer = new DefaultConsumer(channel){
//            回调方法,当收到消息后,会自动执行该方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("将日志信息保存到数据库:");System.out.println("body:"+new String(body));}};channel.basicConsume(queue2Name,true,consumer);}
}

你还还还没学会RabbitMQ?-----------RabbitMQ详解及快速入门(工作模式)

路由模式

你还还还没学会RabbitMQ?-----------RabbitMQ详解及快速入门(工作模式)
生产者:

package com.thorn.producer;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer_Routing {public static void main(String[] args) throws IOException, TimeoutException {
//        创建连接工厂ConnectionFactory factory = new ConnectionFactory();
//    设置参数factory.setVirtualHost("/thorns");factory.setUsername("lwj");factory.setPassword("lwj");
//        创建链接Connection connection = factory.newConnection();
//        创建channelChannel channel = connection.createChannel();
//          创建交换机String exchangeName = "test_direct";channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);//        创建队列String queue1Name = "test_direct_queue1";String queue2Name = "test_direct_queue2";channel.queueDeclare(queue1Name,true,false,false,null);channel.queueDeclare(queue2Name,true,false,false,null);//        绑定队列和交换机
//        队列1绑定channel.queueBind(queue1Name,exchangeName,"error");
//        队列2绑定channel.queueBind(queue2Name,exchangeName,"info");channel.queueBind(queue2Name,exchangeName,"error");channel.queueBind(queue2Name,exchangeName,"warning");//        发送消息String body = "日志信息:张三调用方法findAll,日志级别为info";channel.basicPublish(exchangeName,"info",null,body.getBytes());
//        释放资源channel.close();connection.close();}
}

将error级别的信息放到队列1,将error、info、warning级别的信息放到队列2.

消费者1:

package com.thorn.consumer;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_Routing1 {public static void main(String[] args) throws IOException, TimeoutException {//        创建连接工厂ConnectionFactory factory = new ConnectionFactory();
//    设置参数factory.setVirtualHost("/thorns");factory.setUsername("lwj");factory.setPassword("lwj");
//        创建链接Connection connection = factory.newConnection();
//        创建channelChannel channel = connection.createChannel();String queue2Name = "test_direct_queue2";//        接收消息Consumer consumer = new DefaultConsumer(channel){
//            回调方法,当收到消息后,会自动执行该方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("将日志信息打印到控制台:");System.out.println("body:"+new String(body));}};channel.basicConsume(queue2Name,true,consumer);}
}

消费者2:

package com.thorn.consumer;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_Routing2 {public static void main(String[] args) throws IOException, TimeoutException {//        创建连接工厂ConnectionFactory factory = new ConnectionFactory();
//    设置参数factory.setVirtualHost("/thorns");factory.setUsername("lwj");factory.setPassword("lwj");
//        创建链接Connection connection = factory.newConnection();
//        创建channelChannel channel = connection.createChannel();String queue1Name = "test_direct_queue1";//        接收消息Consumer consumer = new DefaultConsumer(channel){
//            回调方法,当收到消息后,会自动执行该方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("将日志信息保存到数据库:");System.out.println("body:"+new String(body));}};channel.basicConsume(queue1Name,true,consumer);}
}

可以看到消费者1可以收到消息,而消费者2收不到消息
你还还还没学会RabbitMQ?-----------RabbitMQ详解及快速入门(工作模式)
你还还还没学会RabbitMQ?-----------RabbitMQ详解及快速入门(工作模式)

通配符模式

你还还还没学会RabbitMQ?-----------RabbitMQ详解及快速入门(工作模式)
生产者:

package com.thorn.producer;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer_Topics {public static void main(String[] args) throws IOException, TimeoutException {
//        创建连接工厂ConnectionFactory factory = new ConnectionFactory();
//    设置参数factory.setVirtualHost("/thorns");factory.setUsername("lwj");factory.setPassword("lwj");
//        创建链接Connection connection = factory.newConnection();
//        创建channelChannel channel = connection.createChannel();
//          创建交换机String exchangeName = "test_topic";channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);//        创建队列String queue1Name = "test_topic_queue1";String queue2Name = "test_topic_queue2";channel.queueDeclare(queue1Name,true,false,false,null);channel.queueDeclare(queue2Name,true,false,false,null);//        绑定队列和交换机channel.queueBind(queue1Name,exchangeName,"#.error");channel.queueBind(queue1Name,exchangeName,"order.*");
//        队列2打印所有信息channel.queueBind(queue2Name,exchangeName,"*.*");//        发送消息String body = "日志信息:张三调用方法findAll,日志级别为info";channel.basicPublish(exchangeName,"order.info",null,body.getBytes());
//        释放资源channel.close();connection.close();}
}

消费者1:

package com.thorn.consumer;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_Topics1 {public static void main(String[] args) throws IOException, TimeoutException {//        创建连接工厂ConnectionFactory factory = new ConnectionFactory();
//    设置参数factory.setVirtualHost("/thorns");factory.setUsername("lwj");factory.setPassword("lwj");
//        创建链接Connection connection = factory.newConnection();
//        创建channelChannel channel = connection.createChannel();String queue1Name = "test_topic_queue1";String queue2Name = "test_topic_queue2";//        接收消息Consumer consumer = new DefaultConsumer(channel){
//            回调方法,当收到消息后,会自动执行该方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("将日志信息存入到数据库");System.out.println("body:"+new String(body));}};channel.basicConsume(queue1Name,true,consumer);}
}

消费者2:

package com.thorn.consumer;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_Topics2 {public static void main(String[] args) throws IOException, TimeoutException {//        创建连接工厂ConnectionFactory factory = new ConnectionFactory();
//    设置参数factory.setVirtualHost("/thorns");factory.setUsername("lwj");factory.setPassword("lwj");
//        创建链接Connection connection = factory.newConnection();
//        创建channelChannel channel = connection.createChannel();String queue1Name = "test_topic_queue1";String queue2Name = "test_topic_queue2";//        接收消息Consumer consumer = new DefaultConsumer(channel){
//            回调方法,当收到消息后,会自动执行该方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("将日志信息打印到控制台");System.out.println("body:"+new String(body));}};channel.basicConsume(queue2Name,true,consumer);}
}

消费者1和消费者2都能收到消息:
你还还还没学会RabbitMQ?-----------RabbitMQ详解及快速入门(工作模式)
你还还还没学会RabbitMQ?-----------RabbitMQ详解及快速入门(工作模式)
恭喜你又学会了一项技术!
在这里插入图片描述
如果觉得有帮助的小伙伴点个赞吧~感谢收看!