> 文章列表 > rabbitmq深入实践

rabbitmq深入实践

rabbitmq深入实践

生产者,交换机,队列,消费者
交换机和队列通过 rounting key 绑定者,rounting key 可以是#.,*.这类topic模式,
生产者发送消息内容+ rountingkey, 到达交换机后交换机检查与之绑定的队列,
如果能匹配上则把消息丢到队列里,消费者监听某个队列,如果有消息则进行消费。
生产者:confirm 消息到达 交换机 进行确认;return 消息不能到达 队列时 进行 回退

优点:
1.应用解耦
2.流量削峰
3.异步提速
缺点:
1.系统增加了消息中间件,增加了系统的复杂度,
2.要保证消息中间件的高可用

安装:
使用 Erlang (二郎神) 语言开发.
安装参考:https://blog.csdn.net/m0_67392182/article/details/126040124

wget --content-disposition https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-22.3.4.12-1.el7.x86_64.rpm/download.rpmyum localinstall erlang-22.3.4.12-1.el7.x86_64.rpmwget --content-disposition https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmq-server-3.8.13-1.el7.noarch.rpm/download.rpmrpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.ascyum install rabbitmq-server-3.8.13-1.el7.noarch.rpm启用rabbitmq server
systemctl start rabbitmq-server
启用管理界面
rabbitmq-plugins enable rabbitmq_management
重启
systemctl restart rabbitmq-server
访问 前端控制台
http://yourIp:15672/#/增加用户
rabbitmqctl add_user admin admin
为用户设置权限
rabbitmqctl set_user_tags admin administrator

可能遇到的问题:
rabbitmq深入实践
rabbitmq深入实践

rabbitmq深入实践
解决方案:
rabbitmq深入实践
rabbitmq 挂掉解决方案:https://blog.csdn.net/qq_41950229/article/details/105957872

rabbitmq深入实践

rabbitmq深入实践

启用rabbitmq server
systemctl start rabbitmq-server
启用管理界面
rabbitmq-plugins enable rabbitmq_management
重启
systemctl restart rabbitmq-server
访问 前端控制台
http://192.168.186.141:15672/#/
增加用户
rabbitmqctl add_user admin admin
给用户分配权限
rabbitmqctl set_user_tags admin administrator

rabbitmq 挂掉解决方案:https://blog.csdn.net/qq_41950229/article/details/105957872

代码链接

rabbitmq深入实践

rabbitmq深入实践

rabbitmq深入实践

交换机和队列通过路由key绑定,生产者首先发送消息(带着key的消息)到 交换机,交换机根据接收到的key 检查与它绑定的队列,如果符合 topic则发送到该队列上

rabbitmq深入实践
rabbitmq深入实践

package org.example.topic;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.example.utils.RabbitConstant;
import org.example.utils.RabbitUtils;import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;//生产者
/*
创建 direct 交换机
* */
public class WeatherBureau {public static void main(String[] args) throws IOException, TimeoutException {Map area = new LinkedHashMap<String,String>();area.put("china.hunan.changsha.20221120","中国湖南长沙20221120天气数据");area.put("china.hubei.wuhan.20221120","中国湖北武汉20221120天气数据");area.put("china.hunan.changsha.20221128","中国湖南长沙20221128天气数据");area.put("us.cal.lsj.20221120","美国加州洛杉矶20221120天气数据");area.put("china.hebei.shijiazhuang.20221120","中国河北石家庄20221120天气数据");area.put("china.henan.zhengzhou.20221120","中国河南郑州20221120天气数据");area.put("china.hunan.changsha.20221129","中国湖南长沙20221129天气数据");Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();Iterator<Map.Entry<String,String>> itr = area.entrySet().iterator();while(itr.hasNext()){Map.Entry<String,String> me = itr.next();//arg1: 交换机的名字,arg2: 作为消息的 keychannel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC, me.getKey(), null,me.getValue().getBytes());}channel.close();connection.close();}
}
package org.example.topic;import com.rabbitmq.client.*;
import org.example.utils.RabbitConstant;
import org.example.utils.RabbitUtils;import java.io.IOException;public class BaiDu {public static void main(String[] args) throws IOException {//获取长连接Connection connection = RabbitUtils.getConnection();//获取虚拟连接final Channel channel = connection.createChannel();//声明队列信息channel.queueDeclare(RabbitConstant.QUEUE_BAIDU,false,false,false,null);//指定队列与交换机的关系以及rounting key 之间的关系channel.queueBind(RabbitConstant.QUEUE_BAIDU,RabbitConstant.EXCHANGE_WEATHER_TOPIC,"#.20221120");channel.queueBind(RabbitConstant.QUEUE_BAIDU,RabbitConstant.EXCHANGE_WEATHER_TOPIC,"china.hubei.#");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_BAIDU,false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("百度天气收到气象信息: "+new String(body));channel.basicAck(envelope.getDeliveryTag(),false);}});}
}
package org.example.topic;import com.rabbitmq.client.*;
import org.example.utils.RabbitConstant;
import org.example.utils.RabbitUtils;import java.io.IOException;//消费者
public class Sina {public static void main(String[] args) throws IOException {//获取长连接Connection connection = RabbitUtils.getConnection();//获取虚拟连接final Channel channel = connection.createChannel();//声明队列信息channel.queueDeclare(RabbitConstant.QUEUE_SINA,false,false,false,null);//指定队列与交换机的关系以及rounting key 之间的关系channel.queueBind(RabbitConstant.QUEUE_SINA,RabbitConstant.EXCHANGE_WEATHER_TOPIC,"*.hunan.changsha.20221128");channel.queueBind(RabbitConstant.QUEUE_SINA,RabbitConstant.EXCHANGE_WEATHER_TOPIC,"us.*.*.*");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_SINA,false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("新浪天气收到气象信息: "+new String(body));channel.basicAck(envelope.getDeliveryTag(),false);}});}
}

rabbitmq深入实践
rabbitmq深入实践
rabbitmq深入实践
confirm 确认到达 broker里,
return 表示被broker正常接收后,没有没有投递到对应的队列里面去

package org.example.confirm;import com.rabbitmq.client.*;
import org.example.utils.RabbitConstant;
import org.example.utils.RabbitUtils;import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;//生产者
/*
创建 direct 交换机
https://www.cnblogs.com/dwlovelife/p/10991371.html#%E5%A6%82%E4%BD%95%E7%90%86%E8%A7%A3
* */
public class WeatherBureau {public static void main(String[] args) throws IOException, TimeoutException {Map area = new LinkedHashMap<String,String>();area.put("china.hunan.changsha.20221120","中国湖南长沙20221120天气数据");area.put("china.hubei.wuhan.20221120","中国湖北武汉20221120天气数据");area.put("china.hunan.changsha.20221128","中国湖南长沙20221128天气数据");area.put("us.cal.lsj.20221120","美国加州洛杉矶20221120天气数据");area.put("china.hebei.shijiazhuang.20221120","中国河北石家庄20221120天气数据");area.put("china.henan.zhengzhou.20221120","中国河南郑州20221120天气数据");area.put("china.hunan.changsha.20221129","中国湖南长沙20221129天气数据");area.put("china.hunan.changsha.20221129","中国湖南长沙20221129天气数据");area.put("cn","中国湖南长沙20221129天气数据");Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();//开启confirm 监听模式channel.confirmSelect();//到达到broker中被确认channel.addConfirmListener(new ConfirmListener() {public void handleAck(long l, boolean b) throws IOException {// 第二个参数代表接收的数据是否为批量接收,一般我们用不到System.out.println("消息已被Broker接收,Tag: "+l);}public void handleNack(long l, boolean b) throws IOException {System.out.println("消息已被Broker接收,Tag: "+l);}});//到达broker被确认后,但是找不到对应的队列投递channel.addReturnListener(new ReturnCallback() {public void handle(Return aReturn) {System.err.println("============================");System.err.println("Return 编码: "+ aReturn.getReplyCode() + "-Return 描述"+ aReturn.getReplyText());System.err.println("交换机: "+ aReturn.getExchange()+"-路由key: "+ aReturn.getRoutingKey());System.err.println("Return 主题: "+new String(aReturn.getBody()));System.err.println("==============================");}});Iterator<Map.Entry<String,String>> itr = area.entrySet().iterator();while(itr.hasNext()){Map.Entry<String,String> me = itr.next();//arg1: 交换机的名字,arg2: 作为消息的routing key,arg3:如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC, me.getKey(), true,null,me.getValue().getBytes());}//这里不能关掉
//        channel.close();//这里不能关掉
//        connection.close();}
}

rabbitmq深入实践

https://github.com/ranhaoliu/rabbitmq-demo

rabbitmq深入实践
rabbitmq深入实践
rabbitmq深入实践
rabbitmq深入实践

rabbitmq深入实践

rabbitmq深入实践

rabbitmq深入实践
生产者:

<!--===============================TTL 开始=======================    --><rabbit:queue name="test_queue_ttl" id="test_queue_ttl"><rabbit:queue-arguments><entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry></rabbit:queue-arguments></rabbit:queue><rabbit:topic-exchange name="test_exchange_ttl"><rabbit:bindings><rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding></rabbit:bindings></rabbit:topic-exchange>
@Test
public void testTtl(){for(int i=0;i<10;i++){rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.baiqi","message ttl ...");}
}

10秒后会消失
rabbitmq深入实践

死信队列
rabbitmq深入实践

生产者:

<!--死信队列:1.声明正常得队列(test_queue_dlx)和交换机(test_exchange_dlx)2.声明死信队列(queue_dlx)和死信交换机(exchange_dlx)3.正常队列绑定死信交换机设置两个参数:* x-dead-letter-exchange: 死信交换机名称* x-dead-letter-routing-key: 发送给死信交换机的 routingkey--><!--  1.1 声明正常得队列(test_queue_dlx)和交换机(test_exchange_dlx) --><rabbit:queue name="test_queue_dlx" id="test_queue_dlx"><rabbit:queue-arguments><!--3.1: x-dead-letter-exchange 死信交换机名称; value 的值是 2.2 中的声明的死信交换机名称 --><entry key="x-dead-letter-exchange" value="exchange_dlx"></entry><!--3.2: x-dead-letter-routing-key: 发送给死信交换机的routingkey--><entry key="x-dead-letter-routing-key" value="dlx.hehe"></entry><!--4.1 设置队列的过期时间 ttl--><entry key="x-message-ttl" value="1000" value-type="java.lang.Integer"></entry><!--4.2 设置队列的长度限制 max-length--><entry key="x-max-length" value="10" value-type="java.lang.Integer"></entry></rabbit:queue-arguments></rabbit:queue><!--1.2 声明正常的交换机   --><rabbit:topic-exchange name="test_exchange_dlx"><rabbit:bindings><rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding></rabbit:bindings></rabbit:topic-exchange><!--2.1 声明死信队列(queue_dlx) --><rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue><!--2.2 死信交换机(exchange_dlx)--><rabbit:topic-exchange name="exchange_dlx"><rabbit:bindings><rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding></rabbit:bindings></rabbit:topic-exchange>
<!--=====================死信队列结束=======================
/*
* 发送测试死信消息:
* 1.过期时间
* 2.长度限制
* 3.消息拒收
* */@Testpublic void testDlx(){rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.hehe","死信消息...");}

rabbitmq深入实践
一些细节:
rabbitmq深入实践
rabbitmq深入实践
生产者:

<!--=======================延迟队列=============================--><!--延迟队列:1.声明正常得队列(order_queue)和交换机(order_exchange)2.声明死信队列(order_queue_dlx)和死信交换机(order_exchange_dlx)3.绑定,设置正常队列过期时间为30分钟正常队列绑定死信交换机设置两个参数:* x-dead-letter-exchange: 死信交换机名称* x-dead-letter-routing-key: 发送给死信交换机的 routingkey--><!--  1.1 声明正常得队列(order_queue)和交换机(order_exchange) --><rabbit:queue name="order_queue" id="order_queue"><rabbit:queue-arguments><!--3.绑定,设置正常队列过期时间为30分钟,此处设置10秒钟演示来用           --><!--3.1: x-dead-letter-exchange 死信交换机名称; value 的值是 2.2 中的声明的死信交换机名称 --><entry key="x-dead-letter-exchange" value="order_exchange_dlx"></entry><!--3.2: x-dead-letter-routing-key: 发送给死信交换机的routingkey--><entry key="x-dead-letter-routing-key" value="dlx.order.cannel"></entry><!--4.1 设置队列的过期时间 ttl--><entry key="x-message-ttl" value="1000" value-type="java.lang.Integer"></entry></rabbit:queue-arguments></rabbit:queue><!--1.2 声明正常的交换机   --><rabbit:topic-exchange name="order_exchange"><rabbit:bindings><rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding></rabbit:bindings></rabbit:topic-exchange><!--2.1 声明死信队列(order_queue_dlx) --><rabbit:queue name="order_queue_dlx" id="order_queue_dlx"></rabbit:queue><!--2.2 死信交换机(exchange_dlx)--><rabbit:topic-exchange name="order_exchange_dlx"><rabbit:bindings><rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbit:binding></rabbit:bindings></rabbit:topic-exchange>

消费者:

package org.example.rabbitmq.listener;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;@Component
public class OrderListener implements ChannelAwareMessageListener {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try{//1.接收转化消息System.out.println("message: "+new String(message.getBody()));//2.进行业务处理System.out.println("进行业务逻辑处理...");System.out.println("根据订单id查询其状态...");System.out.println("判断状态是否为支付成功...");System.out.println("取消订单,回滚库存..");//3.手动签收channel.basicAck(deliveryTag,true);}catch (Exception e){//拒绝签收/*第三个参数:requeue:重回队列。如果设置为true,则消息从新回到queue,broker会重新发送该消息给消费端,如果为false则拒绝签收* */channel.basicNack(deliveryTag,true,true);
//           channel.basicNack(deliveryTag,true,false);}}@Overridepublic void onMessage(Message message) {}@Overridepublic void containerAckMode(AcknowledgeMode mode) {}}
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="2"><rabbit:listener ref="orderListener" queue-names="order_queue_dlx"></rabbit:listener>
</rabbit:listener-container>

先运行消费者,再运行生产者测试代码发现 10秒钟后,消费者开始消费
rabbitmq深入实践
rabbitmq深入实践
rabbitmq深入实践
rabbitmq深入实践

== rabbitmq 集群:==
https://cloud.tencent.com/developer/article/1631148
https://www.leeks.info/zh_CN/latest/Linux_Notes/rabbitmq/RabbitMQ.html#id2
rabbitmq高可用集群搭建踩坑

systemctl start rabbitmq-server.service
1.环境准备(需重启客户端)
hostnamectl set-hostname m1
hostnamectl set-hostname m2
2.统一 erlang.cookie 文件中 cookie值 将m1 中的 .erlang.cookie 同步到 m2 中(在m1机器操作)
scp /var/lib/rabbitmq/.erlang.cookie m2:/var/lib/rabbitmq/.erlang.cookie
或者 scp /var/lib/rabbitmq/.erlang.cookie m2的ip:/var/lib/rabbitmq/.erlang.cookie
3.Rabbitmq 集群添加节点(在m2机器上操作)
#重启 m2机器中 rabbitmq 的服务
rabbitmqctl stop_apprabbitmqctl join_cluster --ram rabbit@m1 (m1上操作)
rabbitmqctl start_app 
#启用管理界面
rabbitmq-plugins enable rabbitmq_management
systemctl restart rabbitmq-server.service
#4, 查看集群信息
rabbitmqctl cluster_status

可能出现的问题:
unable to connect to epmd (port 4369) on m1: nxdomain (non-existing domain)
rabbitmq深入实践
最总解决: rabbitmq高可用集群搭建踩坑

rabbitmq深入实践
rabbitmq深入实践
rabbitmq深入实践
1.5.1

#1.安装
yum install haproxy
#2.配置haproxy.cfg 文件 具体参考 如下 1.5.2 配置HAProxy
vim /etc/haproxy/haproxy.cfg
#3.启动 haproxy
systemctl start haproxy
#4.查看haproxy 进程状态
systemctl status haproxy.service
#状态如下说明 已经启动成功 Active: active(running)
#访问如下地址对mq 结点进行监控
http://服务器Ip:1080/haproxy_stats
#代码中访问mq的地址则变为haproxy的地址:5672

1.5.2

#对mq集群进行监听
listen rabbitmq_clusterbind 0.0.0.0:5672option tcplogmode tcpoption clitcpkatimeout connect 1stimeout client 10stimeout server 10sbalance roundrobinserver node1 192.168.1.9:5672 check inter 5s rise 2 fall 3server node2 192.168.1.10:5672 check inter 5s rise 2 fall 3
#开启haproxy 监控服务
listen http_frontbind 0.0.0.0:1090stats refresh 30sstats uri /haproxy_statsstats auth admin:admin

效果
rabbitmq深入实践