记 ==> 首次使用rabbitMQ优化项目
昨天刚学习完了rabbitMQ,刚好我的项目有个模块挺符合使用rabbitMQ进行异步处理的。
这个模块大概功能是:用户发送的所有帖子都会添加到他的发件箱,当有个新用户关注了他,他发件箱内所有的博客都会被添加到关注他的用户的收件箱里
比如:A关注B,A的收件箱内添加B的所有帖子
后续B再发帖子,直接推送到A的收件箱
有点类似抖音,当我们关注了一个up主,我们可以在关注列表里刷到这个up主的视频,并且是按新发布时间降序排序的
接下来看看我的代码,(ps:本人小白勿喷)
当用户关注一个博主后,利用redis的Zset集合完成feed流TimeLine模式的读扩散(拉模式),从博主的收件箱内拉取所有帖子到用户收件箱。当博主后续再新发帖子,才用写扩散(推模式)推送到收件箱
当用户取关后,从用户收件箱内移除掉被取关博主的帖子
我的理解是,这个接口的功能就是单纯的实现用户关注和取关,并不关注帖子的拉取和删除,并不该把这两个功能冗杂在一个方法里。
于是我便打算用rabbitMQ的directExchange模式进行消息发送,关注或者取关后把消息发布出去,不关注对帖子的拉取的行为,让监听者去慢慢的拉。
修改代码~~
把当前用户和被关注取关的用户ID一起当做消息发送出去
引入AMQP依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>
yml内添加rabbitmq配置
更改序列化规则,后续消息使用map发送
写一个config,配置一下交互机,队列,并完成绑定
然后写两个方法分别接听拉取帖子和取消帖子这两个queue
拉取队列
取关队列
我把源码贴出来,献丑啦
package com.brrbaii.mqListener;import com.brrbaii.dto.Result;
import com.brrbaii.dto.UserDTO;
import com.brrbaii.utils.UserHolder;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.stereotype.Component;import java.util.Map;
import java.util.Set;import static com.brrbaii.common.MQConstants.*;
import static com.brrbaii.common.RedisConstants.*;
import static com.brrbaii.utils.SystemConstants.ALREADY_FOLLOW;@Component
public class FollowListener {@Autowiredprivate StringRedisTemplate stringRedisTemplate;/* 用户关注后,把被关注用户发件箱内的所有BLOG添加到当前用户的收件箱* @param map:被关注的用户ID和当前用户ID*/@RabbitListener(queues = PULL_BLOG_QUEUE)public void pullBlog2UserBox(Map<String,String> map){if( map == null){return;}String followerId = map.get("followedUserId");String userId = map.get("userId");//被关注的用户keyString followBox = FEED_USER_KEY + followerId;//功能:关注后,把该用户的发件箱内数据拷贝到粉丝收件箱//获取被关注用户所有博客--从0到当前时间就是获取全部,并按照降序排序Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(followBox, 0, System.currentTimeMillis());//用户没有博客,不用获取if (typedTuples==null || typedTuples.isEmpty()){return;}//获取当前用户的keyString userBOx = FEED_PUSH_KEY + userId;//把被关注用户的blog添加到“我”的收件箱for(ZSetOperations.TypedTuple<String> tuple:typedTuples){stringRedisTemplate.opsForZSet().add(userBOx,tuple.getValue(),tuple.getScore());}}/* 用户取关后,从用户的收件箱内清除被取关用户的BLOG* @param map:被关注的用户ID和当前用户ID*/@RabbitListener(queues = PUSH_BLOG_QUEUE)public void pushBlogFromUserBox(Map<String,String> map){if( map == null){return;}String followerId = map.get("followedUserId");String userId = map.get("userId");String cancelFollower = FEED_USER_KEY + followerId;//获取被取关用户的所有blog,这里无需关注scoreSet<String> cancelFollowerBox = stringRedisTemplate.opsForZSet().range(cancelFollower, 0, -1);//没有BLOG,直接结束if(cancelFollowerBox == null || cancelFollowerBox.isEmpty()){return;}//获取当前用户收件箱String userBox = FEED_PUSH_KEY + userId;//从当前用户的收件箱里,把被取关用户的博客删除for(String blogId : cancelFollowerBox){stringRedisTemplate.opsForZSet().remove(userBox,blogId);}}}
package com.brrbaii.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import static com.brrbaii.common.MQConstants.*;@Configuration
public class MQConfig {//拉取blog队列@Beanpublic Queue pullBlogQueue(){System.out.println("为什么没给我创建队列?");return new Queue(PULL_BLOG_QUEUE);}//移出blog队列@Beanpublic Queue pushBlogQueue(){return new Queue(PUSH_BLOG_QUEUE);}//声明交换机@Beanpublic DirectExchange blogDirectExchange(){return new DirectExchange(DIRECT_EXCHANGE_BLOG);}@Beanpublic Binding pullBlogBinding(Queue pullBlogQueue, DirectExchange blogDirectExchange){return BindingBuilder.bind(pullBlogQueue).to(blogDirectExchange).with(PULL_BLOG_ROUTING_KEY);}@Beanpublic Binding pushBlogBinding(Queue pushBlogQueue, DirectExchange blogDirectExchange){return BindingBuilder.bind(pushBlogQueue).to(blogDirectExchange).with(PUSH_BLOG_ROUTING_KEY);}}