> 文章列表 > Nacos 客户端的服务发现与服务订阅机制的纠缠 - 篇七

Nacos 客户端的服务发现与服务订阅机制的纠缠 - 篇七

Nacos 客户端的服务发现与服务订阅机制的纠缠 - 篇七

Nacos 客户端的服务发现与服务订阅机制的纠缠 - 篇七

历史篇章

🕐Nacos 客户端服务注册源码分析-篇一

🕑Nacos 客户端服务注册源码分析-篇二

🕒Nacos 客户端服务注册源码分析-篇三

🕓Nacos 服务端服务注册源码分析-篇四

🕔Nacos 服务端健康检查-篇五

🕕Nacos 客户端服务发现源码分析-篇六

Nacos 客户服务发现续接

之前,在第六篇的时候我们探究了 Nacos 客户端的服务发现源码的具体实现流程。

Nacos 客户端的服务发现与服务订阅机制的纠缠 - 篇七

最终是调用的 NamingService 的 getAllInstance 方法获取了所有的实例列表,而客户端实例列表是封装在一个 List <Instance> 的集合当中的。

//获取所以的实例信息,这里的实例信息就是客户端的信息
List<Instance> list = namingService.getAllInstances("nacos.test.1");

最终是调用 NamingClientProxyDelegate 类下的 subscribe 方法完成订阅,并返回实体信息的。

if (null == serviceInfo) {//如果本地的缓存不存在服务信息,则进行订阅//查找到最新的实例信息serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
}

由于这一部分的内容在之前的第六篇 Nacos 客户端服务发现源码分析-篇六 已经是分析过了的,所以这里我就不再进行赘述这一块的内容了,感兴趣的可以返回调转到指定的篇章进行浏览即可。

可能有些人好奇,哎。标题为什么称作 Nacos 客户端的服务发现与服务订阅机制的纠缠呢?

哈哈,其实他们两者是有联系的,具体是什么联系,就在我们接下来要探究 Nacos 客户端的服务订阅当中有其答案。

既然如此,我们就今天研究一把, Nacos 客户端服务订阅事件机制的具体实现叭。。。


Nacos 客户端服务订阅机制核心流程

首先,先谈谈什么是订阅?生活中那些那些方面体现着类似于订阅这样的概念?只要真正的理解了订阅这一概念,我们才能更好的进行接下来的内容。

订阅其实简单与生活对比来讲,其实就是预定。当然预定的这个动作有发出者,就必须有动作的承受者,举个栗子,外出旅游我们可以会定酒店,那么酒店的服务者就是动作的承受者,订酒店的对象就是动作的发出者,再比如我们的常常提到的订阅一个期刊,如果这个期刊的周期是一年,而该期刊每月都会推送该期的内容,那么订阅期刊的对象就是动作发出者,发布期刊的对象就是动作承受者。

订阅者订阅,承受者在接受到订阅者的指定命令后,周期性的完成指定的任务,这就是订阅。

所以对于注册中心 Nacos 也是同样提供了这样的服务的。。。

大致的流程就是 客户端 通过一个定时的任务每 6 秒从注册中心获取当前的实例列表,当发现实例发生了变化的时候,发布变更事件。对于订阅者而言,完成业务部分的处理(更新实例,更新本地缓存)。

我们可以通过一个流程图,观察其具体的实现。。。 原图点这里

Nacos 客户端的服务发现与服务订阅机制的纠缠 - 篇七

其实从图中已经大致的清楚了,客户端的这个订阅的整体流程。

我们从源码的角度进分析一波。

进入我们的 NacosNamingService 类当中

//在 NacosNamingService 中暴露了许多的重载的 subscribe 方法
//这里 NacosNamingService 类下的 subscribe 方法 和 NamingService 下的 getAllInstances 发现获取实例列表的方法重载的过程都是一样的
@Override
public void subscribe(String serviceName, EventListener listener) throws NacosException {//创建一个空的集群对象集合subscribe(serviceName, new ArrayList<String>(), listener);
}
@Override
public void subscribe(String serviceName, List<String> clusters, EventListener listener) throws NacosException {//设置默认的群组 DEFAULT_GROUP 默认群组subscribe(serviceName, Constants.DEFAULT_GROUP, clusters, listener);
}
@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)throws NacosException {//如果事件监听器为空 则返回if (null == listener) {return;}String clusterString = StringUtils.join(clusters, ",");//注册监听器changeNotifier.registerListener(groupName, serviceName, clusterString, listener);//对于订阅的本质就是服务的发现的一种方式,也就是服务在发现的时候执行订阅方法,同时触发定时任务去服务端拉去数据clientProxy.subscribe(serviceName, groupName, clusterString);
}

可以看到的是 NacosNamingService 中提供了大量的 subscribe 的重载方法,这些重载一些默认的参数。

走到 subscribe 方法的尽头,在该方法内可以看到有两个核心的方法 InstanceChangeNotifier 类下的registerListener 注册监听器方法与 NamingClientProxy 类下的 subscribe 订阅方法。我们就探究一下这两个方法具体实现,以及这两个方法的功能作用是什么?

changeNotifier.registerListener 注册监听器

/*** register listener.** @param groupName   group name* @param serviceName serviceName* @param clusters    clusters, concat by ','. such as 'xxx,yyy'* @param listener    custom listener*/
public void registerListener(String groupName, String serviceName, String clusters, EventListener listener) {String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);if (eventListeners == null) {synchronized (lock) {eventListeners = listenerMap.get(key);if (eventListeners == null) {eventListeners = new ConcurrentHashSet<EventListener>();listenerMap.put(key, eventListeners);}}}eventListeners.add(listener);
}
/*** deregister listener.** @param groupName   group name* @param serviceName serviceName* @param clusters    clusters, concat by ','. such as 'xxx,yyy'* @param listener    custom listener*/
public void deregisterListener(String groupName, String serviceName, String clusters, EventListener listener) {String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);if (eventListeners == null) {return;}eventListeners.remove(listener);if (CollectionUtils.isEmpty(eventListeners)) {listenerMap.remove(key);}
}

可以看到在 InstancesChangeNotifier 类下有两个关于监听器的方法,注册监听与取消监听。

注册监听其实就是在监听集合对象 ConcurrentHashSet<EventListener> 中添加一个监听事件,而对于取消监听是通过 key 将需要移除的监听事件从集合当中移除。

那么关于这个监听事件添加都监听集合当中后,这个监听事件是如何触发又如何调用执行的呢?这个。。。哈哈,留一个坑,其实这一块我自己还没有研究的特别清楚。。。

接下来我们看看,另一个重要的方法 clientProxy.subscribe() 服务订阅

clientProxy.subscribe 服务订阅

其实玩到这里呢,也就与我们的标题 Nacos 客户的服务发现与客户端服务订阅机制的纠缠,就关联了起来,为什么这么说呢?那让我们看看 clientProxy.subscribe 方法内部的具体实现咯。。。

//其实走到这里就可以看到,该方法与之前的服务发现调用的是同一个方法,这里其实在做的是服务列表的查询
//查询与订阅都调用了同样的而方法
@Override
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);//开启定时任务调度 UpdateTaskserviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);//获取缓存中的 ServiceInfoServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);if (null == result) {//如果缓存中没有数据,则进行订阅逻辑处理,基于 gRPC 协议result = grpcClientProxy.subscribe(serviceName, groupName, clusters);}//serviceInfo 本地缓存处理serviceInfoHolder.processServiceInfo(result);return result;
}

哈哈 ,看到这一块的代码是不是有一种似曾相识的感觉呢?

对咯,没错在第六篇 Nacos 客户端服务发现源码分析 当中的发现获取实例列表的时候在 NacosNamingService 中的 getAllInstances 方法多次重载之后调用的 clientProxy.sunscribe 调用的是同一个方法。

所以其实到这里是可以得到一个结论的,就是 在 Nacos 客户端的查询与订阅服务都是调用了同样的方法的

这就解释了为什么标题 Nacos 客户端的服务发现与服务订阅机制是冥冥之中有种联系在一起的呢。

我们还记得流程图中有一个关于 UpdateTask 定时任务调度吗?

让我们接下来看看,这个里面到底在做什么呢???

定时任务执行内容

//开启定时任务调度 UpdateTask
serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
/*** Schedule update if absent.** @param serviceName service name* @param groupName   group name* @param clusters    clusters*/
public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), cluif (futureMap.get(serviceKey) != null) {return;}//双重检测锁synchronized (futureMap) {if (futureMap.get(serviceKey) != null) {return;}//构建一个定时处理的任务,最终这里的 future 就是构建的定时任务,该任务用于在 run 中执行ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));futureMap.put(serviceKey, future);}
}
public UpdateTask(String serviceName, String groupName, String clusters) {this.serviceName = serviceName;this.groupName = groupName;this.clusters = clusters;this.groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);this.serviceKey = ServiceInfo.getKey(groupedServiceName, clusters);
}
private synchronized ScheduledFuture<?> addTask(UpdateTask task) {//执行延时函数,延时时间为 1000L * MICRO_SCALE = 1Sreturn executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}

可以看到在第二片代码中有这样一个方法,addTask () 对的,没错就是将通过 serviceName、groupName、cliusters 构建一个 UbdateTask 的更新任务对象,然后将其对象构建成一个未来执行的定时任务,添加到执行的集合当中,最终是由 ServiceInfoUpdateService 中的 run 方法去执行。

定时任务 run() 方法的执行

@Override
public void run() {long delayTime = DEFAULT_DELAY;try {//判断更改通知对象 serviceName 是否订阅if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKeNAMING_LOGGER.info("update task is stopped, service:" + groupedServiceName + ", clusters:" + clusters);return;}//获取缓存中的信息ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);//缓存为空if (serviceObj == null) {//生成一个服务实例对象serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false)//处理更新或添加到本地的缓存当中serviceInfoHolder.processServiceInfo(serviceObj);//更新最后一次的时间lastRefTime = serviceObj.getLastRefTime();return;}//过期服务,如果说,服务的更新时间是小于等于缓存刷新的时间的//那就说明本地的缓存不是最新的,而当前的服务实例信息也不是客户端最新的,//这个时候就需要从 注册中心 中重新的进行一次查询,获取最的服务实例信息并更新本地缓存if (serviceObj.getLastRefTime() <= lastRefTime) {serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false)//更新处理本地的缓存serviceInfoHolder.processServiceInfo(serviceObj);}//刷新更新的当前时间lastRefTime = serviceObj.getLastRefTime();if (CollectionUtils.isEmpty(serviceObj.getHosts())) {incFailCount();return;}//下次的更新缓存时间设置为缓存中的默认基数 (cacheMillis = 1000) * 6// TODO multiple time can be configured.delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;// 重置失败数量为 0// 可能会出现一些异常,比如调用 queryInstancesOfService 方法的时候// 没有 ServiceInfo 连接不到则会出现异常resetFailCount();} catch (Throwable e) {incFailCount();NAMING_LOGGER.warn("[NA] failed to update serviceName: " + groupedServiceName, e);} finally {// 下次调度刷新时间,下次执行的时间与failCount 失败的次数有关,failCount=0,则下次调度时间为6秒,最长为1分钟// 当无异常的情况下 failCount 始终都是 0 则默认的时间一直都 6 sexecutor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);}
}

未完待续。。。

学习计划指导