【如何优雅的实现延迟消息多次提醒】方案集合
大家好,说到这个话题,可能大家一定脱口而出就是rabbitmq,这个肯定没问题,但是如果你们的业务技术栈引入的MQ并不是这个呢,比如Kakfa呢,总不能为了这个功能更换技术栈吧
所以这里本博主收集了几种常见的方案,也简单列举了优缺点,大家按照自己的业务场景去选择
方案一:DelayedQueue
这种是利用简单的延迟队列结构来实现的 ,这里就不展开细说了,不常用,因为这个是单价版的,现在业务基本都是分布式的,但是虽然是简单的 但是也是最重要的,因为利用这个数据结构实现了接下来我说的所有的方案
方案二:redis+key的过期监听器
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;import java.util.UUID;public class RedisDelayedNotifyDemo {public static void main(String[] args) {Jedis jedis = new Jedis("localhost", 6379);// 消息IDString messageId = UUID.randomUUID().toString();// 消息内容String messageContent = "hello, world!";// 延迟时间(毫秒)long delayTime = 5000;// 将消息存入 Redis 中,并设置过期时间jedis.setex("delayed_messages:" + messageId, delayTime / 1000, messageContent);// 添加过期监听器jedis.psubscribe(new JedisPubSub() {@Overridepublic void onPSubscribe(String pattern, int subscribedChannels) {System.out.println("Subscribed to " + pattern);}
当然代码比较粗糙呀,不建议大家直接copy,这里只是伪代码,可以按照这个方案去网上搜寻,思路一致的,写法高级点
但是这个缺点也很明显,无法实现多次提醒
Redis 实现 key 过期的方案只能触发一次监听器,因为 Redis 是基于发布/订阅模式实现的,当 key 过期时,Redis 会发布一个过期事件,订阅了该事件的客户端可以接收到事件并执行相应的操作。一旦监听器被触发,就需要重新设置 key 的过期时间来实现下一次提醒,这样需要频繁地更新 key,增加了 Redis 的负担和延迟
如果需要实现多次提醒怎么办?
方案三:Redisson+DelayedQueue
import java.util.concurrent.TimeUnit;import org.redisson.Redisson;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.StringCodec;
import org.redisson.config.Config;public class RedisDelayedReminderExample {public static void main(String[] args) throws InterruptedException {Config config = new Config();config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("password");RedissonClient redisson = Redisson.create(config);RBlockingQueue<String> blockingQueue = redisson.getBlockingQueue("reminder-queue", new StringCodec());// 创建延迟队列RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingQueue);// 充值会员提前7天提醒任务String userId = "user123";delayedQueue.offer(userId, 7, TimeUnit.DAYS);// 充值会员提前1天提醒任务delayedQueue.offer(userId, 1, TimeUnit.DAYS);// 监听器处理任务RedisDelayedReminderListener listener = new RedisDelayedReminderListener();blockingQueue.takeAsync().thenAccept(listener::onDelayedTask);}
}class RedisDelayedReminderListener {public void onDelayedTask(String userId) {// 进行提醒操作System.out.println("Send reminder to user " + userId);}
}
上面的代码是加入了监听器逻辑,但是也可以不需要,看自己业务,重点不在这个
比如下面的
// 创建 Redisson 实例
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
RedissonClient redissonClient = Redisson.create(config);// 获取延迟队列和普通队列
RQueue<String> queue = redissonClient.getQueue("myQueue");
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(queue);// 将消息放入延迟队列,并设置多次提醒
String message = "Hello, World!";
long delayTime = 10000; // 10 秒后第一次提醒
int repeatTimes = 3; // 一共提醒 3 次
long interval = 5000; // 每次间隔 5 秒
delayedQueue.offer(message, delayTime, repeatTimes, interval);// 启动一个线程,从普通队列中获取消息,并处理
new Thread(() -> {while (true) {try {String msg = queue.take();System.out.println(msg);} catch (InterruptedException e) {e.printStackTrace();}}
}).start();
方案四:redisson+mapCache
在 Redisson 中,使用 MapCache 可以实现对 Map 类型的缓存操作。MapCache 是一个具有过期时间和最大缓存条目数限制的 Map,它可以自动将过期的缓存条目从缓存中移除,从而避免内存泄漏和数据膨胀问题。同时,MapCache 还提供了延迟失效功能,即在缓存条目失效前,可以通过修改缓存条目的 TTL (Time To Live) 值来延长缓存的生命周期。
为了实现延迟提醒功能,您可以在 MapCache 中存储需要提醒的事件信息,同时设置一个相应的 TTL 值。当 TTL 时间到期时,MapCache 会自动将该条目从缓存中删除。同时,您可以使用 Redisson 提供的监听器功能,在缓存条目失效时触发相应的事件通知操作,从而实现延迟提醒的功能。
下面是一个使用 MapCache 实现延迟提醒功能的简单示例代码:
// 初始化 Redisson 客户端
RedissonClient client = Redisson.create();// 创建 MapCache 对象,并设置过期时间和最大缓存条目数
RMapCache<String, String> mapCache = client.getMapCache("myMapCache");
mapCache.setMaxSize(100);
mapCache.setMaxIdleTime(30, TimeUnit.MINUTES);// 将需要提醒的事件信息存储到 MapCache 中,并设置相应的 TTL 值
mapCache.put("eventId", "eventInfo", 1, TimeUnit.HOURS);// 添加 MapCache 监听器,用于在缓存条目失效时触发相应的事件通知操作
mapCache.addListener(new MapCacheListener<String, String>() {@Overridepublic void onExpire(String key, String value) {// 在缓存条目失效时触发相应的事件通知操作System.out.println("Event expired: " + key);}
});
在上面的示例代码中,我们首先初始化了 Redisson 客户端,然后创建了一个名为 “myMapCache” 的 MapCache 对象,并设置了最大缓存条目数为 100,最大闲置时间为 30 分钟。接下来,我们将需要提醒的事件信息存储到 MapCache 中,并设置了一个 TTL 值为 1 小时。最后,我们添加了一个 MapCache 监听器,在缓存条目失效时触发相应的事件通知操作。
但是这个缺点明显,因为是单机的,因为这里的mapcache是本地缓存