> 文章列表 > 【Redis】发布/订阅

【Redis】发布/订阅

【Redis】发布/订阅

说明

Redis发布/订阅(Pub/Sub)是一种消息传递模式,其中发送者(发布者)将消息发送到主题(频道),而接收者(订阅者)则从主题中接收这些消息。以下是Redis发布/订阅的实践和原理:

实践

发布者使用PUBLISH命令向指定主题发送消息。
订阅者使用SUBSCRIBE命令订阅一个或多个主题。
当发布者向某个主题发送消息时,所有订阅该主题的订阅者都会接收到该消息。
除了基本的PUBLISH/SUBSCRIBE模式外,Redis还支持模式匹配(Pattern-Matching)模式,即订阅者可以使用通配符订阅多个主题。例如,使用SUBSCRIBE news.*可以订阅news.sports、news.weather等所有以news.为前缀的主题。

原理

Redis使用发布/订阅频道来实现消息传递,每个频道都有一个名称。
订阅者通过向Redis服务器发送SUBSCRIBE命令来订阅一个或多个频道。
发布者使用PUBLISH命令将消息发送到指定频道。
当Redis服务器接收到一条新消息时,它将该消息发送给所有已订阅该频道的客户端。
消息传递过程是异步的,即当发布者发送消息后,不需要等待所有订阅者接收到该消息才能继续执行其他任务。
在Redis中,发布/订阅是通过使用内部通信机制实现的。当一个客户端订阅一个频道时,Redis会将该客户端添加到频道的订阅列表中。当其他客户端向该频道发送消息时,Redis会遍历订阅列表,并将消息发送给所有已订阅该频道的客户端。

代码实现(java)

以下是Java Redis客户端Jedis实现发布/订阅的具体代码:

jedis

发布者
import redis.clients.jedis.Jedis;public class Publisher {public static void main(String[] args) {// 创建Jedis对象连接Redis服务器Jedis jedis = new Jedis("localhost", 6379);// 向指定频道发布消息jedis.publish("news.sports", "Kobe Bryant has passed away.");// 关闭Jedis连接jedis.close();}
}

订阅者

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;public class Subscriber extends JedisPubSub {@Overridepublic void onMessage(String channel, String message) {System.out.println("Received message from channel " + channel + ": " + message);}public static void main(String[] args) {// 创建Jedis对象连接Redis服务器Jedis jedis = new Jedis("localhost", 6379);// 创建Subscriber对象Subscriber subscriber = new Subscriber();// 订阅一个或多个频道jedis.subscribe(subscriber, "news.sports", "news.weather");// 进入订阅状态,等待接收消息jedis.close();}
}

在上面的代码中,发布者使用jedis.publish()向指定频道发布消息,而订阅者则创建一个继承自JedisPubSub类的子类,并重写其onMessage()方法用于处理接收到的消息。接着,订阅者使用jedis.subscribe()方法订阅一个或多个频道,并在订阅状态下等待接收消息。

需要注意的是,由于Redis的发布/订阅功能是异步的,因此jedis.subscribe()和jedis.publish()方法不会阻塞当前线程。因此,在生产环境中,应该将发布/订阅功能作为一个独立的进程进行部署,以充分利用其异步性能优势。

以下是使用Java Redisson客户端实现发布/订阅的具体代码:

发布者
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.api.RTopic;public class Publisher {public static void main(String[] args) {// 创建RedissonClient连接Redis服务器RedissonClient redisson = Redisson.create();// 获取RTopic对象,指定频道名称RTopic<String> topic = redisson.getTopic("news.sports");// 向指定频道发布消息topic.publish("Kobe Bryant has passed away.");// 关闭Redisson连接redisson.shutdown();}
}
订阅者
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.redisson.api.listener.PatternMessageListener;
import org.redisson.api.listener.TopicPatternMessageListener;
import org.redisson.api.listener.TopicMessageListener;public class Subscriber {public static void main(String[] args) {// 创建RedissonClient连接Redis服务器RedissonClient redisson = Redisson.create();// 获取RTopic对象,指定频道名称RTopic<String> topic = redisson.getTopic("news.sports");// 添加消息监听器topic.addListener(new MessageListener<String>() {@Overridepublic void onMessage(CharSequence channel, String msg) {System.out.println("Received message from channel " + channel + ": " + msg);}});// 关闭Redisson连接redisson.shutdown();}
}

在上面的代码中,发布者使用RTopic.publish()向指定频道发布消息,而订阅者则使用RTopic.addListener()方法添加一个消息监听器。Redisson支持多种类型的监听器接口,包括MessageListener、TopicMessageListener、PatternMessageListener和TopicPatternMessageListener,分别用于处理不同类型的消息。

需要注意的是,由于Redisson客户端使用了Netty框架,因此在应用程序退出时必须显式调用Redisson.shutdown()方法关闭连接以确保资源正确释放。

以下是使用Go Redis客户端库redigo实现发布/订阅的具体代码:

发布者

package mainimport ("github.com/gomodule/redigo/redis"
)func main() {// 创建Redis连接conn, err := redis.Dial("tcp", "localhost:6379")if err != nil {panic(err)}defer conn.Close()// 向指定频道发布消息_, err = conn.Do("PUBLISH", "news.sports", "Kobe Bryant has passed away.")if err != nil {panic(err)}
}

订阅者

package mainimport ("fmt""github.com/gomodule/redigo/redis"
)func main() {// 创建Redis连接conn, err := redis.Dial("tcp", "localhost:6379")if err != nil {panic(err)}defer conn.Close()// 订阅一个或多个频道psc := redis.PubSubConn{Conn: conn}err = psc.Subscribe("news.sports")if err != nil {panic(err)}// 接收消息并处理for {switch v := psc.Receive().(type) {case redis.Message:fmt.Printf("Received message from channel %s: %s\\n", v.Channel, v.Data)case redis.Subscription:fmt.Printf("Subscription message: %s: %d\\n", v.Channel, v.Count)case error:panic(v)}}
}

在上面的代码中,发布者使用conn.Do()执行PUBLISH命令向指定频道发布消息,而订阅者则使用redis.PubSubConn类型创建一个PubSubConn对象,并通过Subscribe()方法订阅一个或多个频道。接着,订阅者使用Receive()方法从Redis服务器接收消息,并根据消息类型进行分类处理。

需要注意的是,由于Go Redis客户端库redigo不支持异步操作,因此订阅者在循环中调用Receive()方法时会被阻塞,直到有新消息到达。因此,在生产环境中,应该将订阅者作为一个独立的进程运行以充分利用其异步性能优势。