> 文章列表 > Dubbo服务目录组件Directory

Dubbo服务目录组件Directory

Dubbo服务目录组件Directory

目录

AbstractDirectory

StaticDirectory

RegistryDirectory

动态调整Invoker列表机制

Directory的初始化时机


在 Dubbo 中 存在 SPI 接口 org.apache.dubbo.rpc.cluster.Directory。即服务目录,用于存放服务提供列表。

简单说Directory保存了当前可以提供服务的服务提供者列表集合。当消费者进行服务调用时,可以从Directory中按照某些规则挑选出一个服务提供者来执行后续的调用。

官方的描述如下:
Directory即服务目录,存储了一些和服务提供者有关的信息。通过服务目录,服务消费者可获取到服务提供者的信息,如 ip、端口、服务协议等。通过这些信息,服务消费者就可通过Netty等客户端进行远程调用。在一个服务集群中,服务提供者数量并不是一成不变的,如果集群中新增了一台机器,相应地在服务目录中就要新增一条服务提供者记录。如果服务提供者的配置修改了,服务目录中的记录也要做相应的更新。

其实从代码实现角度讲的话,其实Directory和Router是配合使用的。

Directory实现了获取及动态刷新服务列表的功能,但是最终服务列表的数据会存储在Router组件中。在消费者调用时也是以Directory为入口从RouterChain中执行各个Router筛选出可用的列表供使用。

AbstractDirectory

AbstractDirectory中封装了Invoker列举流程,具体的列举逻辑则由子类实现,如AbstractDirectory类定义了doList方法,在StaticDirectory和RegistryDirectory两个子类中重写了doList方法。这是典型的模板模式; 

StaticDirectory

作为AbstractDirectory的子类,StaticDirectory实现的是静态服务目录。它内部存放的Invoker是不会变动的,理论上它和不可变List的功能很相似。

RegistryDirectory

RegistryDirectory也是AbstractDirectory的子类,与上面的StaticDirectory相比,RegistryDirectory是一种动态服务目录,这也是Dubbo中默认使用的Directory

当注册中心服务配置发生变化后,RegistryDirectory可收到与当前服务相关的变化并根据配置变更信息刷新Invoker列表。

动态调整Invoker列表机制

服务消费者在启动时,会订阅注册中心providers、configurators、routers节点,并设置回调函数为RegistryDirectory#notify()方法,当节点更新时会调用RegistryDirectory#notify()方法,逻辑大概是按照类型进行划分(configurators、routers,providers)并分别进行处理。如果是providers节点的更新,则会进一步调用refreshOverrideAndInvoker()方法进一步处理。

其实在启动时会立刻调用一次该回调方法,用于同步当前节点配置。

RegistryDirectory#notify()方法源码如下

	// 在调用该方法之前,会调用 AbstractRegistry#notify 中将URL 按照类别划分,再分别调用 RegistryDirectory#notify 方法。@Overridepublic synchronized void notify(List<URL> urls) {//  对 URLs 进行合法性过滤List<URL> categoryUrls = urls.stream()// 合法性组别校验,默认 providers.filter(this::isValidCategory).filter(this::isNotCompatibleFor26x).collect(Collectors.toList());/*** TODO Try to refactor the processing of these three type of urls using Collectors.groupBy()?*/// 筛选出配置信息URL 并转换成 configurators this.configurators = Configurator.toConfigurators(classifyUrls(categoryUrls, UrlUtils::isConfigurator)).orElse(configurators);// 筛选出路由URL 并转换成Router 添加到 AbstractDirectory#routerChain 中// RouterChain保存了服务提供者的URL列表转换为invoker列表和可用服务提供者对应的invokers列表和路由规则信息toRouters(classifyUrls(categoryUrls, UrlUtils::isRoute)).ifPresent(this::addRouters);// providers// 筛选出 提供者URL 并进行服务提供者的更新refreshOverrideAndInvoker(classifyUrls(categoryUrls, UrlUtils::isProvider));}

RegistryDirectory#refreshOverrideAndInvoker()方法源码如下 

	private void refreshOverrideAndInvoker(List<URL> urls) {// mock zookeeper://xxx?mock=return null// 重写URL(也就是把mock=return null等信息拼接到URL中)并保存到overrideDirectoryUrl中overrideDirectoryUrl();// 刷新 服务提供者 URL,根据URL 生成InvokerrefreshInvoker(urls);}// 刷新 服务列表private void refreshInvoker(List<URL> invokerUrls) {Assert.notNull(invokerUrls, "invokerUrls should not be null");// 如果只有一个 协议为 empty 的url,则表明需要销毁所有协议,因为empty 协议为空协议,个人理解就是为了防止空url存在而生成的无意义的urlif (invokerUrls.size() == 1 && invokerUrls.get(0) != null && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {// 设置禁止访问this.forbidden = true; // Forbid to access// invoker设置为空集合this.invokers = Collections.emptyList();routerChain.setInvokers(this.invokers);destroyAllInvokers(); // Close all invokers} else {// 设置允许访问this.forbidden = false; // Allow to access// urlInvokerMap 需要使用本地引用,因为 urlInvokerMap自身可能随时变化,可能指向 nullMap<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local referenceif (invokerUrls == Collections.<URL>emptyList()) {invokerUrls = new ArrayList<>();}// 这里防止并发更新,cachedInvokerUrls 被  volatile 修饰if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {invokerUrls.addAll(this.cachedInvokerUrls);} else {this.cachedInvokerUrls = new HashSet<>();this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison}if (invokerUrls.isEmpty()) {return;}// 将 url 转换成 Invoker,key为 url.toFullString(), value 为 InvokerMap<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map// state change// If the calculation is wrong, it is not processed.if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));return;}// 转化为不可修改 list,防止并发修改List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));// pre-route and build cache, notice that route cache should build on original Invoker list.// toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed.// 保存服务提供者 InvokerrouterChain.setInvokers(newInvokers);//  如果匹配多个 group,则进行合并this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;this.urlInvokerMap = newUrlInvokerMap;try {// 销毁无用的 InvokerdestroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker} catch (Exception e) {logger.warn("destroyUnusedInvokers error. ", e);}}}

Directory的初始化时机

当消费者服务启动时会通过 ReferenceConfig#createProxy()创建提供者的代理类。在这个方法中也完成了Directory的创建过程。