> 文章列表 > RabbitMQ消息丢失的情况,以及如何通过代码解决

RabbitMQ消息丢失的情况,以及如何通过代码解决

RabbitMQ消息丢失的情况,以及如何通过代码解决

目录

RabbitMQ消息丢失问题:

代码部分:

完整代码:

RabitMQConfig:

CourseMQListener:

生产者跟交换机通信的消息丢失解决 :

交换机跟消息队列的消息丢失:

消息队列跟消费者的消息丢失:

消费者服务器宕机:

重复消息的问题:


RabbitMQ消息丢失问题:

 

代码部分:

完整代码:

RabitMQConfig:

package com.dmdd.educourseservice.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.pool.ConnFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** RabbitMQ的配置*/
@Slf4j
@Configuration
public class RabbitMQConfig {public static final String QUEUE_COURSE_SAVE = "queue.course.save";public static final String QUEUE_COURSE_REMOVE = "queue.course.remove";public static final String KEY_COURSE_SAVE = "key.course.save";public static final String KEY_COURSE_REMOVE = "key.course.remove";public static final String COURSE_EXCHANGE = "edu.course.exchange";@Beanpublic Queue queueCourseSave() {return new Queue(QUEUE_COURSE_SAVE);}@Beanpublic Queue queueCourseRemove() {return new Queue(QUEUE_COURSE_REMOVE);}@Beanpublic TopicExchange topicExchange() {return new TopicExchange(COURSE_EXCHANGE);}@Beanpublic Binding bindCourseSave() {return BindingBuilder.bind(queueCourseSave()).to(topicExchange()).with(KEY_COURSE_SAVE);}@Beanpublic Binding bindCourseRemove() {return BindingBuilder.bind(queueCourseRemove()).to(topicExchange()).with(KEY_COURSE_REMOVE);}@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory factory){RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);//设置RabbitTemplate支持事务
//        rabbitTemplate.setChannelTransacted(true);
//        return rabbitTemplate;//设置发布确认回调接口 参数1 消息相关信息(id) 参数2 是否成功 参数3 失败原因    生产者与交换机之间rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack){log.info("消息投递成功{}",correlationData);}else {//消息补偿的逻辑,如:保存失败的信息,通过定时任务重新投递log.info("消息投递失败{}",cause);}});//设置消息发送失败返回回调,参数:消息内容,响应代码,响应文本,交换机,路由键  交换机与队列之间rabbitTemplate.setReturnCallback(((message, replyCode, replyText, exchange, routingKey) -> {//消息补偿的逻辑,如:保存失败消息,通过定时任务重新投递log.error("消息发送失败,message:{}, replyCode:{}, replyText:{}, exchange:{}, routingKey:{}",message, replyCode, replyText, exchange, routingKey);}));return  rabbitTemplate;}}

CourseMQListener:

package com.dmdd.edusearchservice.listener;import com.alibaba.fastjson.JSON;
import com.dmdd.common.entity.Course;
import com.dmdd.edusearchservice.service.CourseIndexService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Component;import java.util.concurrent.TimeUnit;@Slf4j
@Component
public class CourseMQListener {//对课程进行添加或更新的队列名public static final String QUEUE_COURSE_SAVE = "queue.course.save";//对课程进行删除的队列名public static final String QUEUE_COURSE_REMOVE = "queue.course.remove";//对课程进行添加或更新的路由键public static final String KEY_COURSE_SAVE = "key.course.save";//对课程进行删除的路由键public static final String KEY_COURSE_REMOVE = "key.course.remove";//课程交换机名public static final String COURSE_EXCHANGE = "edu.course.exchange";@Autowiredprivate CourseIndexService courseIndexService;@Autowiredprivate RedisTemplate redisTemplate;/*** 监听课程添加操作*/@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = QUEUE_COURSE_SAVE, durable = "true"),exchange = @Exchange(value = COURSE_EXCHANGE, type = ExchangeTypes.TOPIC, ignoreDeclarationExceptions = "true"), key = KEY_COURSE_SAVE)})public void receiveCourseSaveMessage(String data, Channel channel, Message message) {try {log.info("课程保存:{}", data);//获得消息的idString id = message.getMessageProperties().getHeader("spring_returned_message_correlation");//将id保存到redis中,如果id存在就不执行业务逻辑ValueOperations<String, String> ops = redisTemplate.opsForValue();//如果id不存在就,设置id为键,执行业务逻辑if (ops.setIfAbsent(id, "0", 1000, TimeUnit.SECONDS)) {//将json转换为课程对象Course course = JSON.parseObject(data, Course.class);courseIndexService.saveCourse(course);log.info("上传到elasticsearch成功");//逻辑执行完将消息状态设置为1ops.set(id, "1", 1000, TimeUnit.SECONDS);//手动确认channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} else {//id存在,重复消息log.info("消息已经存在,重复消息 {}", id);if ("1".equals(ops.get(id))) {log.info("该消息已经执行完毕 {}", id);//手动确认channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}} catch (Exception ex) {log.error("接收消息出现异常", ex);}}/*** 监听课程删除操作*/@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = QUEUE_COURSE_REMOVE, durable = "true"),exchange = @Exchange(value = COURSE_EXCHANGE, type = ExchangeTypes.TOPIC, ignoreDeclarationExceptions = "true"),key = KEY_COURSE_REMOVE)})public void receiveCourseDeleteMessage(Long id) {try {log.info("课程删除完成:{}", id);courseIndexService.removeCourse(String.valueOf(id));} catch (Exception ex) {log.error("接收消息出现异常", ex);}}
}

生产者跟交换机通信的消息丢失解决 :

//设置发布确认回调接口 参数1 消息相关信息(id) 参数2 是否成功 参数3 失败原因    生产者与交换机之间
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack){log.info("消息投递成功{}",correlationData);}else {//消息补偿的逻辑,如:保存失败的信息,通过定时任务重新投递log.info("消息投递失败{}",cause);}

通过ack响应确认交换机是否收到消息。可以方便排查

交换机跟消息队列的消息丢失:

//设置消息发送失败返回回调,参数:消息内容,响应代码,响应文本,交换机,路由键  交换机与队列之间
rabbitTemplate.setReturnCallback(((message, replyCode, replyText, exchange, routingKey) -> {//消息补偿的逻辑,如:保存失败消息,通过定时任务重新投递log.error("消息发送失败,message:{}, replyCode:{}, replyText:{}, exchange:{}, routingKey:{}",message, replyCode, replyText, exchange, routingKey);
}));

查询交换机与消息队列可能丢失消息的问题

消息队列跟消费者的消息丢失:

消费者服务器宕机:

//手动确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

 消息队列发送消息给消费者时,消费者服务器宕机了,可以通过该代码解决

重复消息的问题:

@Test
void contextLoadas(){Course course = new Course();course.setId(222);course.setCourseName("测试mq+redis完全版");String json = JSON.toJSONString(course);rabbitTemplate.convertAndSend(RabbitMQConfig.COURSE_EXCHANGE,RabbitMQConfig.KEY_COURSE_SAVE,json,new CorrelationData(UUID.randomUUID().toString()));
}
if (ops.setIfAbsent(id, "0", 1000, TimeUnit.SECONDS)) {//将json转换为课程对象Course course = JSON.parseObject(data, Course.class);courseIndexService.saveCourse(course);log.info("上传到elasticsearch成功");//逻辑执行完将消息状态设置为1ops.set(id, "1", 1000, TimeUnit.SECONDS);//手动确认channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} else {//id存在,重复消息log.info("消息已经存在,重复消息 {}", id);if ("1".equals(ops.get(id))) {log.info("该消息已经执行完毕 {}", id);//手动确认channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
}

 消息发出去之后,可能出现网络抖动或延迟问题,导致消息发送超时,重复发送相同消息,因此我们可以发送消息时,传一个加密过的uuid,跟随消息内容一起过去,将uuid作为redis的键来储存信息,并设定状态码为0,当消费者服务将该消息执行完后,状态码为1,如果在执行服务期间有重复的消息发送过来,就会直接打回。