> 文章列表 > spring redis Sentinel 哨兵 原理

spring redis Sentinel 哨兵 原理

spring redis Sentinel 哨兵 原理

客户端选择

redis 常用的连接客户端 有三个

  1. Jedis:是老牌的Redis的Java实现客户端,提供了比较全面的Redis命令的支持,
  2. Redisson:实现了分布式和可扩展的Java数据结构。
  3. Lettuce:高级Redis客户端,用于线程安全同步,异步和响应使用,支持集群,Sentinel,管道和编码器。

spring data redis

  1. 如果未指定 redis client 则 spring-boot-autoconfigure.jar 默认取 LettuceConnectionConfiguration中的LettuceConnectionFactory
org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration@Bean@ConditionalOnMissingBean(name = "redisTemplate")public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {RedisTemplate<Object, Object> template = new RedisTemplate<>();template.setConnectionFactory(redisConnectionFactory);return template;}org.springframework.boot.autoconfigure.data.redis.LettuceConnectionConfiguration@Bean@ConditionalOnMissingBean(RedisConnectionFactory.class)public LettuceConnectionFactory redisConnectionFactory(ClientResources clientResources) throws UnknownHostException {LettuceClientConfiguration clientConfig = getLettuceClientConfiguration(clientResources, this.properties.getLettuce().getPool());return createLettuceConnectionFactory(clientConfig);}

spring redis Sentinel Lettuce 实现

先看 io.lettuce.core.RedisClient

 private <K, V> StatefulRedisSentinelConnection<K, V> connectSentinel(RedisCodec<K, V> codec, RedisURI redisURI,Duration timeout) {assertNotNull(codec);checkValidRedisURI(redisURI);ConnectionBuilder connectionBuilder = ConnectionBuilder.connectionBuilder();connectionBuilder.clientOptions(ClientOptions.copyOf(getOptions()));connectionBuilder.clientResources(clientResources);DefaultEndpoint endpoint = new DefaultEndpoint(clientOptions);StatefulRedisSentinelConnectionImpl<K, V> connection = newStatefulRedisSentinelConnection(endpoint, codec, timeout);logger.debug("Trying to get a Redis Sentinel connection for one of: " + redisURI.getSentinels());connectionBuilder.endpoint(endpoint).commandHandler(() -> new CommandHandler(clientOptions, clientResources, endpoint)).connection(connection);connectionBuilder(getSocketAddressSupplier(redisURI), connectionBuilder, redisURI);if (clientOptions.isPingBeforeActivateConnection()) {connectionBuilder.enablePingBeforeConnect();}if (redisURI.getSentinels().isEmpty() && (isNotEmpty(redisURI.getHost()) || !isEmpty(redisURI.getSocket()))) {channelType(connectionBuilder, redisURI);try {getConnection(initializeChannelAsync(connectionBuilder));} catch (RuntimeException e) {connection.close();throw e;}} else {boolean connected = false;boolean first = true;Exception causingException = null;validateUrisAreOfSameConnectionType(redisURI.getSentinels());for (RedisURI uri : redisURI.getSentinels()) {if (first) {channelType(connectionBuilder, uri);first = false;}connectionBuilder.socketAddressSupplier(getSocketAddressSupplier(uri));if (logger.isDebugEnabled()) {SocketAddress socketAddress = SocketAddressResolver.resolve(uri, clientResources.dnsResolver());logger.debug("Connecting to Redis Sentinel, address: " + socketAddress);}try {getConnection(initializeChannelAsync(connectionBuilder));connected = true;break;} catch (Exception e) {logger.warn("Cannot connect Redis Sentinel at " + uri + ": " + e.toString());causingException = e;}}if (!connected) {connection.close();throw new RedisConnectionException("Cannot connect to a Redis Sentinel: " + redisURI.getSentinels(),causingException);}}if (LettuceStrings.isNotEmpty(redisURI.getClientName())) {connection.setClientName(redisURI.getClientName());}return connection;}

io.lettuce.core.protocol.ConnectionWatchdog

 @Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {logger.debug("{} channelInactive()", logPrefix());if (!armed) {logger.debug("{} ConnectionWatchdog not armed", logPrefix());return;}channel = null;if (listenOnChannelInactive && !reconnectionHandler.isReconnectSuspended()) {scheduleReconnect();} else {logger.debug("{} Reconnect scheduling disabled", logPrefix(), ctx);}super.channelInactive(ctx);}/*** Enable {@link ConnectionWatchdog} to listen for disconnected events.*/void arm() {this.armed = true;setListenOnChannelInactive(true);}/*** Schedule reconnect if channel is not available/not active.*/public void scheduleReconnect() {logger.debug("{} scheduleReconnect()", logPrefix());if (!isEventLoopGroupActive()) {logger.debug("isEventLoopGroupActive() == false");return;}if (!isListenOnChannelInactive()) {logger.debug("Skip reconnect scheduling, listener disabled");return;}if ((channel == null || !channel.isActive()) && reconnectSchedulerSync.compareAndSet(false, true)) {attempts++;final int attempt = attempts;int timeout = (int) reconnectDelay.createDelay(attempt).toMillis();logger.debug("{} Reconnect attempt {}, delay {}ms", logPrefix(), attempt, timeout);this.reconnectScheduleTimeout = timer.newTimeout(it -> {reconnectScheduleTimeout = null;if (!isEventLoopGroupActive()) {logger.warn("Cannot execute scheduled reconnect timer, reconnect workers are terminated");return;}reconnectWorkers.submit(() -> {ConnectionWatchdog.this.run(attempt);return null;});}, timeout, TimeUnit.MILLISECONDS);// Set back to null when ConnectionWatchdog#run runs earlier than reconnectScheduleTimeout's assignment.if (!reconnectSchedulerSync.get()) {reconnectScheduleTimeout = null;}} else {logger.debug("{} Skipping scheduleReconnect() because I have an active channel", logPrefix());}}

通过以上源码可以得出如下初始化流程:

  1. 遍历Sentinel节点集合,找到一个可用的Sentinel节点,如果找不到就从Sentinel节点集合中去找下一个;如果都找不到直接抛出异常给客户端:
  2. 找到一个可用的Sentinel节点, 执行sentinelGetMasterAddrByName( masterName),通过主机名称找到对应主节点信息
  3. 根据ConnectionWatchdog重连的机制,进行故障转移 当发现 netty handler 中 channelInactive 触发 (与远程主机的连接意外断开时。当尝试连接远程主机但连接失败时。当关闭当前通道时。) 会重新去 拉去 Sentinel中对应的主节点信息

spring redis Sentinel Lettuce 实现

redis.clients.jedis.JedisSentinelPool

public JedisSentinelPool(String masterName, Set<String> sentinels, GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password, int database, String clientName) {HostAndPort master = this.initSentinels(sentinels, masterName);this.initPool(master);
}private HostAndPort initSentinels(Set<String> sentinels, final String masterName) {HostAndPort master = null;boolean sentinelAvailable = false;log.info("Trying to find master from available Sentinels...");for (String sentinel : sentinels) {final HostAndPort hap = HostAndPort.parseString(sentinel);log.fine("Connecting to Sentinel " + hap);Jedis jedis = null;try {jedis = new Jedis(hap.getHost(), hap.getPort());List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName);// connected to sentinel...sentinelAvailable = true;if (masterAddr == null || masterAddr.size() != 2) {log.warning("Can not get master addr, master name: " + masterName + ". Sentinel: " + hap+ ".");continue;}master = toHostAndPort(masterAddr);log.fine("Found Redis master at " + master);break;} catch (JedisException e) {// resolves #1036, it should handle JedisException there's another chance// of raising JedisDataExceptionlog.warning("Cannot get master address from sentinel running @ " + hap + ". Reason: " + e+ ". Trying next one.");} finally {if (jedis != null) {jedis.close();}}}if (master == null) {if (sentinelAvailable) {// can connect to sentinel, but master name seems to not// monitoredthrow new JedisException("Can connect to sentinel, but " + masterName+ " seems to be not monitored...");} else {throw new JedisConnectionException("All sentinels down, cannot determine where is "+ masterName + " master is running...");}}log.info("Redis master running at " + master + ", starting Sentinel listeners...");for (String sentinel : sentinels) {final HostAndPort hap = HostAndPort.parseString(sentinel);MasterListener masterListener = new MasterListener(masterName, hap.getHost(), hap.getPort());// whether MasterListener threads are alive or not, process can be stoppedmasterListener.setDaemon(true);masterListeners.add(masterListener);masterListener.start();}return master;}

通过以上源码可以得出如下初始化流程:

  1. initSentinels用sentinels和masterName,也就是Sentinel节点的集合和Redis数据节点的名字来初始化,获取到Redis的主节点,同时呢,为每个Sentinel创建了一个监听的进程。
  2. MasterListener是一个线程,为每个Sentinel节点创建一个线程,来订阅channel:+switch-master,从而当主节点发生切换的时候,可以及时感知,修改Sentinel的客户端缓冲池JedisSentinelPool。

spring redis Sentinel redisson 实现


/**  
* Redisson构造方法  
* @param config for Redisson  
* @return Redisson instance  
*/  
protected Redisson(Config config) {  //赋值变量config  this.config = config;  //产生一份对于传入config的备份  Config configCopy = new Config(config);  //根据配置config的类型(主从模式、单机模式、哨兵模式、集群模式、亚马逊云模式、微软云模式)而进行不同的初始化  connectionManager = ConfigSupport.createConnectionManager(configCopy);  //连接池对象回收调度器  evictionScheduler = new EvictionScheduler(connectionManager.getCommandExecutor());  //Redisson的对象编码类  codecProvider = configCopy.getCodecProvider();  //Redisson的ResolverProvider,默认为org.redisson.liveobject.provider.DefaultResolverProvider  resolverProvider = configCopy.getResolverProvider();  

spring redis Sentinel 哨兵 原理
跟进去看哨兵模式下的 ConnectionManager
org.redisson.connection.SentinelConnectionManager

public SentinelConnectionManager(SentinelServersConfig cfg, Config config, UUID id) {super(config, id);if (cfg.getMasterName() == null) {throw new IllegalArgumentException("masterName parameter is not defined!");}if (cfg.getSentinelAddresses().isEmpty()) {throw new IllegalArgumentException("At least one sentinel node should be defined!");}this.config = create(cfg);this.sentinelPassword = cfg.getSentinelPassword();initTimer(this.config);this.natMapper = cfg.getNatMapper();this.sentinelResolver = resolverGroup.getResolver(getGroup().next());for (String address : cfg.getSentinelAddresses()) {RedisURI addr = new RedisURI(address);scheme = addr.getScheme();addr = applyNatMap(addr);if (NetUtil.createByteArrayFromIpAddressString(addr.getHost()) == null && !addr.getHost().equals("localhost")) {sentinelHosts.add(addr);}}checkAuth(cfg);Throwable lastException = null;for (String address : cfg.getSentinelAddresses()) {RedisURI addr = new RedisURI(address);addr = applyNatMap(addr);RedisClient client = createClient(NodeType.SENTINEL, addr, this.config.getConnectTimeout(), this.config.getTimeout(), null);try {RedisConnection connection = null;try {connection = client.connect();if (!connection.isActive()) {continue;}} catch (RedisConnectionException e) {continue;}InetSocketAddress master = connection.sync(RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, cfg.getMasterName());if (master == null) {throw new RedisConnectionException("Master node is undefined! SENTINEL GET-MASTER-ADDR-BY-NAME command returns empty result!");}RedisURI masterHost = toURI(master.getHostString(), String.valueOf(master.getPort()));this.config.setMasterAddress(masterHost.toString());currentMaster.set(masterHost);log.info("master: {} added", masterHost);List<Map<String, String>> sentinelSlaves = connection.sync(StringCodec.INSTANCE, RedisCommands.SENTINEL_SLAVES, cfg.getMasterName());for (Map<String, String> map : sentinelSlaves) {if (map.isEmpty()) {continue;}String ip = map.get("ip");String port = map.get("port");String flags = map.getOrDefault("flags", "");RedisURI host = toURI(ip, port);this.config.addSlaveAddress(host.toString());log.debug("slave {} state: {}", host, map);log.info("slave: {} added", host);if (flags.contains("s_down") || flags.contains("disconnected")) {disconnectedSlaves.add(host);log.warn("slave: {} is down", host);}}List<Map<String, String>> sentinelSentinels = connection.sync(StringCodec.INSTANCE, RedisCommands.SENTINEL_SENTINELS, cfg.getMasterName());List<RFuture<Void>> connectionFutures = new ArrayList<>(sentinelSentinels.size());for (Map<String, String> map : sentinelSentinels) {if (map.isEmpty()) {continue;}String ip = map.get("ip");String port = map.get("port");RedisURI sentinelAddr = toURI(ip, port);RFuture<Void> future = registerSentinel(sentinelAddr, this.config, null);connectionFutures.add(future);}RFuture<Void> f = registerSentinel(addr, this.config, null);connectionFutures.add(f);for (RFuture<Void> future : connectionFutures) {future.awaitUninterruptibly(this.config.getConnectTimeout());}break;} catch (RedisConnectionException e) {stopThreads();throw e;} catch (Exception e) {lastException = e;log.warn(e.getMessage());} finally {client.shutdownAsync();}}if (cfg.isCheckSentinelsList()) {if (sentinels.isEmpty()) {stopThreads();throw new RedisConnectionException("SENTINEL SENTINELS command returns empty result! Set checkSentinelsList = false to avoid this check.", lastException);} else if (sentinels.size() < 2) {stopThreads();throw new RedisConnectionException("SENTINEL SENTINELS command returns less than 2 nodes! At least two sentinels should be defined in Redis configuration. Set checkSentinelsList = false to avoid this check.", lastException);}}if (currentMaster.get() == null) {stopThreads();throw new RedisConnectionException("Can't connect to servers!", lastException);}if (this.config.getReadMode() != ReadMode.MASTER && this.config.getSlaveAddresses().isEmpty()) {log.warn("ReadMode = " + this.config.getReadMode() + ", but slave nodes are not found!");}initSingleEntry();scheduleChangeCheck(cfg, null);}

启一个 schedule 每一秒调度一次检查 是否发生过 故障转移
scheduleChangeCheck

    private void scheduleChangeCheck(SentinelServersConfig cfg, Iterator<RedisClient> iterator) {monitorFuture = group.schedule(new Runnable() {@Overridepublic void run() {AtomicReference<Throwable> lastException = new AtomicReference<Throwable>();Iterator<RedisClient> iter = iterator;if (iter == null) {// Shuffle the list so all clients don't prefer the same sentinelList<RedisClient> clients = new ArrayList<>(sentinels.values());Collections.shuffle(clients);iter = clients.iterator();}checkState(cfg, iter, lastException);}}, cfg.getScanInterval(), TimeUnit.MILLISECONDS);}
private void checkState(SentinelServersConfig cfg, Iterator<RedisClient> iterator, AtomicReference<Throwable> lastException) {if (!iterator.hasNext()) {if (lastException.get() != null) {log.error("Can't update cluster state", lastException.get());}performSentinelDNSCheck(null);scheduleChangeCheck(cfg, null);return;}if (!getShutdownLatch().acquire()) {return;}RedisClient client = iterator.next();RedisURI addr = getIpAddr(client.getAddr());RFuture<RedisConnection> connectionFuture = connectToNode(cfg, addr, null);connectionFuture.onComplete((connection, e) -> {if (e != null) {lastException.set(e);getShutdownLatch().release();checkState(cfg, iterator, lastException);return;}updateState(cfg, connection, iterator);});}private void updateState(SentinelServersConfig cfg, RedisConnection connection, Iterator<RedisClient> iterator) {AtomicInteger commands = new AtomicInteger(2);BiConsumer<Object, Throwable> commonListener = new BiConsumer<Object, Throwable>() {private final AtomicBoolean failed = new AtomicBoolean();@Overridepublic void accept(Object t, Throwable u) {if (commands.decrementAndGet() == 0) {getShutdownLatch().release();if (failed.get()) {scheduleChangeCheck(cfg, iterator);} else {scheduleChangeCheck(cfg, null);}}if (u != null && failed.compareAndSet(false, true)) {log.error("Can't execute SENTINEL commands on " + connection.getRedisClient().getAddr(), u);closeNodeConnection(connection);}}};RFuture<InetSocketAddress> masterFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_GET_MASTER_ADDR_BY_NAME, cfg.getMasterName());masterFuture.onComplete((master, e) -> {if (e != null) {return;}RedisURI current = currentMaster.get();RedisURI newMaster = toURI(master.getHostString(), String.valueOf(master.getPort()));if (!newMaster.equals(current)&& currentMaster.compareAndSet(current, newMaster)) {RFuture<RedisClient> changeFuture = changeMaster(singleSlotRange.getStartSlot(), newMaster);changeFuture.onComplete((res, ex) -> {if (ex != null) {currentMaster.compareAndSet(newMaster, current);}});}});masterFuture.onComplete(commonListener);if (!config.checkSkipSlavesInit()) {RFuture<List<Map<String, String>>> slavesFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_SLAVES, cfg.getMasterName());commands.incrementAndGet();slavesFuture.onComplete((slavesMap, e) -> {if (e != null) {return;}Set<RedisURI> currentSlaves = new HashSet<>(slavesMap.size());List<RFuture<Void>> futures = new ArrayList<>();for (Map<String, String> map : slavesMap) {if (map.isEmpty()) {continue;}String ip = map.get("ip");String port = map.get("port");String flags = map.getOrDefault("flags", "");String masterHost = map.get("master-host");String masterPort = map.get("master-port");RedisURI slaveAddr = toURI(ip, port);if (flags.contains("s_down") || flags.contains("disconnected")) {slaveDown(slaveAddr);continue;}if ("?".equals(masterHost) || !isUseSameMaster(slaveAddr, masterHost, masterPort)) {continue;}currentSlaves.add(slaveAddr);RFuture<Void> slaveFuture = addSlave(slaveAddr);futures.add(slaveFuture);}CountableListener<Void> listener = new CountableListener<Void>() {@Overrideprotected void onSuccess(Void value) {MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());entry.getAllEntries().stream().map(e -> e.getClient().getAddr()).map(a -> toURI(a.getAddress().getHostAddress(), String.valueOf(a.getPort()))).filter(a -> !currentSlaves.contains(a) && !a.equals(currentMaster.get())).forEach(a -> slaveDown(a));};};listener.setCounter(futures.size());for (RFuture<Void> f : futures) {f.onComplete(listener);}});slavesFuture.onComplete(commonListener);}RFuture<List<Map<String, String>>> sentinelsFuture = connection.async(StringCodec.INSTANCE, RedisCommands.SENTINEL_SENTINELS, cfg.getMasterName());sentinelsFuture.onComplete((list, e) -> {if (e != null || list.isEmpty()) {return;}Set<RedisURI> newUris = list.stream().filter(m -> {String flags = m.getOrDefault("flags", "");if (!m.isEmpty() && !flags.contains("disconnected") && !flags.contains("s_down")) {return true;}return false;}).map(m -> {String ip = m.get("ip");String port = m.get("port");return toURI(ip, port);}).collect(Collectors.toSet());InetSocketAddress addr = connection.getRedisClient().getAddr();RedisURI currentAddr = getIpAddr(addr);newUris.add(currentAddr);updateSentinels(newUris);});sentinelsFuture.onComplete(commonListener);}

通过以上源码可以得出如下初始化流程:

scheduleClusterChangeCheck定时检测集群节点状态
checkMasterNodesChange检测主节点状态

sentinel 查询 Master 的地址,如果发现 Master 变更了,则删除旧的 masterEntry,重建一个新的 masterEntry;