> 文章列表 > 动态线程池Dinamic-Tp(源码篇)

动态线程池Dinamic-Tp(源码篇)

动态线程池Dinamic-Tp(源码篇)

 线程池系列:

【Executors】线程池的4种常见创建方式

【ThreadPoolExecutor】自定义线程池详解(一篇透彻)

 动态线程池Dinamic-Tp(基础篇)

 动态线程池Dinamic-Tp(接入篇)

 动态线程池Dinamic-Tp(源码篇)

前言

DynamicTp项目地址:

官网:首页 | dynamic-tp

gitee地址:https://gitee.com/dromara/dynamic-tp

github地址:https://github.com/dromara/dynamic-tp

本文主要讲一下dynamic-tp 框架中重要的类和方法源码。

ps:核心源码主要在core代码模块里面,接下来我们看一些主要的类

一、ApplicationContextHolder

这个类很简单,本身就是实现了ApplicationContextAware接口,通过它我们能获取到ApplicationContext,进而可以拿到容器中的bean。

二、DtpRegistry

这个类很关键,该类实现了 ApplicationRunner 和 Ordered  所以说它在我们的应用一启动的时候就会执行run方法,该类保存着所有已经注册的线程池对象。

该类主要实现线程池的注册,刷新配置的功能,后续我们修改nacos、zookeeper等配置中心配置,最后都会调用 refresh 方法去进行刷新操作。

public class DtpRegistry implements ApplicationRunner, Ordered {//初始化dtp线程池Mapprivate static final Map<String, DtpExecutor> DTP_REGISTRY = new ConcurrentHashMap<>();//初始化普通线程池Mapprivate static final Map<String, ExecutorWrapper> COMMON_REGISTRY = new ConcurrentHashMap<>();//nacos配置文件private static DtpProperties dtpProperties;//获取所有dpt线程池名称列表public static List<String> listAllDtpNames() {return Lists.newArrayList(DTP_REGISTRY.keySet());}//获取所有普通线程池名称列表public static List<String> listAllCommonNames() {return Lists.newArrayList(COMMON_REGISTRY.keySet());}//注册dtp线程public static void registerDtp(DtpExecutor executor, String source) {DTP_REGISTRY.putIfAbsent(executor.getThreadPoolName(), executor);}//注册普通线程public static void registerCommon(ExecutorWrapper wrapper, String source) {COMMON_REGISTRY.putIfAbsent(wrapper.getThreadPoolName(), wrapper);}//根据线程名称获取dtp线程实例public static DtpExecutor getDtpExecutor(final String name) {val executor = DTP_REGISTRY.get(name);if (Objects.isNull(executor)) {throw new DtpException("Cannot find a specified dtpExecutor, name: " + name);}return executor;}//根据线程名称获取普通线程包装类实例public static ExecutorWrapper getCommonExecutor(final String name) {val executor = COMMON_REGISTRY.get(name);if (Objects.isNull(executor)) {throw new DtpException("Cannot find a specified commonExecutor, name: " + name);}return executor;}//刷新配置public static void refresh(DtpProperties dtpProperties) {if (Objects.isNull(dtpProperties) || CollectionUtils.isEmpty(dtpProperties.getExecutors())) {log.warn("DynamicTp refresh, empty threadPool properties.");return;}dtpProperties.getExecutors().forEach(x -> {if (StringUtils.isBlank(x.getThreadPoolName())) {log.warn("DynamicTp refresh, threadPoolName must not be empty.");return;}//首先在dtp线程池map中查找val dtpExecutor = DTP_REGISTRY.get(x.getThreadPoolName());if (Objects.nonNull(dtpExecutor)) {refresh(ExecutorWrapper.of(dtpExecutor), x);return;}//然后在普通线程池map中查找val executorWrapper = COMMON_REGISTRY.get(x.getThreadPoolName());if (Objects.nonNull(executorWrapper)) {refresh(executorWrapper, x);return;}});}//刷新配置private static void refresh(ExecutorWrapper executorWrapper, DtpExecutorProps props) {//检查核心参数if (props.coreParamIsInValid()) {return;}//参数转换TpMainFields oldFields = ExecutorConverter.convert(executorWrapper);doRefresh(executorWrapper, props);TpMainFields newFields = ExecutorConverter.convert(executorWrapper);if (oldFields.equals(newFields)) {return;}//查找修改项List<String> diffKeys = EQUATOR.getDiffFields(oldFields, newFields).stream().map(FieldInfo::getFieldName).collect(toList());//异步发送报警通知NoticeManager.doNoticeAsync(executorWrapper, oldFields, diffKeys);}//执行线程参数刷新操作private static void doRefresh(ExecutorWrapper executorWrapper, DtpExecutorProps props) {if (!(executorWrapper.getExecutor() instanceof ThreadPoolExecutor)) {return;}ThreadPoolExecutor executor = (ThreadPoolExecutor) executorWrapper.getExecutor();doRefreshPoolSize(executor, props);if (!Objects.equals(executor.getKeepAliveTime(props.getUnit()), props.getKeepAliveTime())) {executor.setKeepAliveTime(props.getKeepAliveTime(), props.getUnit());}if (!Objects.equals(executor.allowsCoreThreadTimeOut(), props.isAllowCoreThreadTimeOut())) {executor.allowCoreThreadTimeOut(props.isAllowCoreThreadTimeOut());}if (executor instanceof DtpExecutor) {doRefreshDtp((DtpExecutor) executor, props);return;}doRefreshCommon(executorWrapper, props);}//执行普通线程池参数刷新操作private static void doRefreshCommon(ExecutorWrapper executorWrapper, DtpExecutorProps props) {if (StringUtils.isNotBlank(props.getThreadPoolAliasName())) {executorWrapper.setThreadPoolAliasName(props.getThreadPoolAliasName());}ThreadPoolExecutor executor = (ThreadPoolExecutor) executorWrapper.getExecutor();// update reject handlerString currentRejectHandlerName = executor.getRejectedExecutionHandler().getClass().getSimpleName();if (!Objects.equals(currentRejectHandlerName, props.getRejectedHandlerType())) {val rejectHandler = RejectHandlerGetter.buildRejectedHandler(props.getRejectedHandlerType());executor.setRejectedExecutionHandler(rejectHandler);}//更新队列updateQueueProps(executor, props);//更新报警通知配置updateNotifyInfo(executorWrapper, props, dtpProperties.getPlatforms());}//执行dtp线程参数刷新操作private static void doRefreshDtp(DtpExecutor executor, DtpExecutorProps props) {if (StringUtils.isNotBlank(props.getThreadPoolAliasName())) {executor.setThreadPoolAliasName(props.getThreadPoolAliasName());}//更新拒绝策略if (!Objects.equals(executor.getRejectHandlerName(), props.getRejectedHandlerType())) {executor.setRejectedExecutionHandler(RejectHandlerGetter.getProxy(props.getRejectedHandlerType()));executor.setRejectHandlerName(props.getRejectedHandlerType());}executor.setWaitForTasksToCompleteOnShutdown(props.isWaitForTasksToCompleteOnShutdown());executor.setAwaitTerminationSeconds(props.getAwaitTerminationSeconds());executor.setPreStartAllCoreThreads(props.isPreStartAllCoreThreads());executor.setRunTimeout(props.getRunTimeout());executor.setQueueTimeout(props.getQueueTimeout());List<TaskWrapper> taskWrappers = TaskWrappers.getInstance().getByNames(props.getTaskWrapperNames());executor.setTaskWrappers(taskWrappers);//更新队列updateQueueProps(executor, props);//更新报警通知配置updateNotifyInfo(executor, props, dtpProperties.getPlatforms());}//更新核心线程数private static void doRefreshPoolSize(ThreadPoolExecutor executor, DtpExecutorProps props) {if (props.getMaximumPoolSize() < executor.getMaximumPoolSize()) {if (!Objects.equals(executor.getCorePoolSize(), props.getCorePoolSize())) {executor.setCorePoolSize(props.getCorePoolSize());}if (!Objects.equals(executor.getMaximumPoolSize(), props.getMaximumPoolSize())) {executor.setMaximumPoolSize(props.getMaximumPoolSize());}return;}if (!Objects.equals(executor.getMaximumPoolSize(), props.getMaximumPoolSize())) {executor.setMaximumPoolSize(props.getMaximumPoolSize());}if (!Objects.equals(executor.getCorePoolSize(), props.getCorePoolSize())) {executor.setCorePoolSize(props.getCorePoolSize());}}//更新队列private static void updateQueueProps(ThreadPoolExecutor executor, DtpExecutorProps props) {val blockingQueue = executor.getQueue();if (blockingQueue instanceof MemorySafeLinkedBlockingQueue) {((MemorySafeLinkedBlockingQueue<Runnable>) blockingQueue).setMaxFreeMemory(props.getMaxFreeMemory() * M_1);}if (!(blockingQueue instanceof VariableLinkedBlockingQueue)) {log.warn("DynamicTp refresh, the blockingqueue capacity cannot be reset, poolName: {}, queueType {}",props.getThreadPoolName(), blockingQueue.getClass().getSimpleName());return;}int capacity = blockingQueue.size() + blockingQueue.remainingCapacity();if (!Objects.equals(capacity, props.getQueueCapacity())) {((VariableLinkedBlockingQueue<Runnable>) blockingQueue).setCapacity(props.getQueueCapacity());}}@Autowiredpublic void setDtpProperties(DtpProperties dtpProperties) {DtpRegistry.dtpProperties = dtpProperties;}@Overridepublic int getOrder() {return Ordered.HIGHEST_PRECEDENCE + 1;}//主要启动打印已经注册的线程池,没有其他实际作用@Overridepublic void run(ApplicationArguments args) {Set<String> remoteExecutors = Collections.emptySet();if (CollectionUtils.isNotEmpty(dtpProperties.getExecutors())) {remoteExecutors = dtpProperties.getExecutors().stream().map(DtpExecutorProps::getThreadPoolName).collect(Collectors.toSet());}val registeredExecutors = Sets.newHashSet(DTP_REGISTRY.keySet());registeredExecutors.addAll(COMMON_REGISTRY.keySet());val localExecutors = CollectionUtils.subtract(registeredExecutors, remoteExecutors);log.info("DtpRegistry has been initialized, remote executors: {}, local executors: {}",remoteExecutors, localExecutors);}
}

三、DtpPostProcessor

该类实现了 BeanPostProcessor

从名称可以看出是dtp的一个后置处理器类,重写postProcessAfterInitialization方法。

这个类对我们应用中定义的线程池进行拦截,并填充相关的map,以实现一些增强逻辑。

public class DtpPostProcessor implements BeanPostProcessor {@Overridepublic Object postProcessAfterInitialization(@NonNull Object bean, @NonNull String beanName) throws BeansException {if (!(bean instanceof ThreadPoolExecutor) && !(bean instanceof ThreadPoolTaskExecutor)) {return bean;}if (bean instanceof DtpExecutor) {DtpExecutor dtpExecutor = (DtpExecutor) bean;if (bean instanceof EagerDtpExecutor) {((TaskQueue) dtpExecutor.getQueue()).setExecutor((EagerDtpExecutor) dtpExecutor);}//注册dtp线程池registerDtp(dtpExecutor);return dtpExecutor;}ApplicationContext applicationContext = ApplicationContextHolder.getInstance();String dtpAnnotationVal;try {DynamicTp dynamicTp = applicationContext.findAnnotationOnBean(beanName, DynamicTp.class);if (Objects.nonNull(dynamicTp)) {dtpAnnotationVal = dynamicTp.value();} else {BeanDefinitionRegistry registry = (BeanDefinitionRegistry) applicationContext;BeanDefinition beanDefinition = registry.getBeanDefinition(beanName);if (!(beanDefinition instanceof AnnotatedBeanDefinition)) {return bean;}AnnotatedBeanDefinition annotatedBeanDefinition = (AnnotatedBeanDefinition) beanDefinition;MethodMetadata methodMetadata = (MethodMetadata) annotatedBeanDefinition.getSource();if (Objects.isNull(methodMetadata) || !methodMetadata.isAnnotated(DynamicTp.class.getName())) {return bean;}dtpAnnotationVal = Optional.ofNullable(methodMetadata.getAnnotationAttributes(DynamicTp.class.getName())).orElse(Collections.emptyMap()).getOrDefault("value", "").toString();}} catch (NoSuchBeanDefinitionException e) {log.error("There is no bean with the given name {}", beanName, e);return bean;}String poolName = StringUtils.isNotBlank(dtpAnnotationVal) ? dtpAnnotationVal : beanName;if (bean instanceof ThreadPoolTaskExecutor) {registerCommon(poolName, ((ThreadPoolTaskExecutor) bean).getThreadPoolExecutor());} else {registerCommon(poolName, (ThreadPoolExecutor) bean);}return bean;}//注册dtp线程池private void registerDtp(DtpExecutor executor) {DtpRegistry.registerDtp(executor, "beanPostProcessor");}//注册普通线程池private void registerCommon(String poolName, ThreadPoolExecutor executor) {ExecutorWrapper wrapper = new ExecutorWrapper(poolName, executor);DtpRegistry.registerCommon(wrapper, "beanPostProcessor");}
}

3.1:如果bean不是ThreadPoolExecutor或者ThreadPoolTaskExecutor,那么就不对bean做任何处理,直接返回

3.2:如果bean是DtpExecutor,调用registerDtp方法填充DTP_REGISTRY这个map

3.3:如果说bean上面的DynamicTp注解,使用注解的值作为线程池的名称,没有的话就使用bean的名称

3.4:如果bean是spring中的ThreadPoolTaskExecutor的话,那么就通过getThreadPoolExecutor()方法拿到ThreadPoolExecutor注册到COMMON_REGISTRY中

到这里执行完,就针对所有线程池对象完成了增强。

4. DtpMonitor

该类是用来进行监控的类,里面就是一些跟告警相关的逻辑,比如我们参数变更触发的飞书,邮件等告警。本身也实现了ApplicationRunner接口,因此其run方法也是我们重点要来看的。

@Override
public void run(ApplicationArguments args) {MONITOR_EXECUTOR.scheduleWithFixedDelay(this::run,0, dtpProperties.getMonitorInterval(), TimeUnit.SECONDS);
}/* Monitor interval, time unit(s)*/
private int monitorInterval = 5;

ps:可以看到,默认每隔5秒(可配置)就去检查一下线程池的配置。

private void run() {//拿到所有线程池的名称List<String> dtpNames = DtpRegistry.listAllDtpNames();//拿到所有标有DynamicTp注解的线程池List<String> commonNames = DtpRegistry.listAllCommonNames();//告警检查checkAlarm(dtpNames);collect(dtpNames, commonNames);}//针对每一个线程池,使用其名称从注册表中获取到线程池对象,然后触发告警
private void checkAlarm(List<String> dtpNames) {dtpNames.forEach(x -> {DtpExecutor executor = DtpRegistry.getDtpExecutor(x);//针对线程池的liveness和capacity指标进行告警AlarmManager.triggerAlarm(() -> doAlarm(executor, SCHEDULE_ALARM_TYPES));});//发送一个告警AlarmCheckEvent事件,这个事件在DtpAdapterListener中被监听处理publishAlarmCheckEvent();
}//收集数据,发送收集事件
private void collect(List<String> dtpNames, List<String> commonNames) {//如果关闭了指标收集,直接返回if (!dtpProperties.isEnabledCollect()) {return;}//拿到所有的线程池对象,并获取到线程池的各种属性统计指标ThreadPoolStatsdtpNames.forEach(x -> {DtpExecutor executor = DtpRegistry.getDtpExecutor(x);ThreadPoolStats poolStats = MetricsConverter.convert(executor);//根据指标收集类型(logging,micrometer)进行指标的处理,默认是进行日志打印doCollect(poolStats);});commonNames.forEach(x -> {ExecutorWrapper wrapper = DtpRegistry.getCommonExecutor(x);ThreadPoolStats poolStats = MetricsConverter.convert(wrapper);doCollect(poolStats);});//发送一个CollectEvent事件publishCollectEvent();}

五、 DtpEndpoint

这个就是基于sringboot-actuator来实现的一个端点,我们可以调用接口来获取一些指标数据,后续对接prometheus也会用到,里面的逻辑我们来看。

public List<Metrics> invoke() {List<Metrics> metricsList = Lists.newArrayList();List<String> dtpNames = DtpRegistry.listAllDtpNames();dtpNames.forEach(x -> {DtpExecutor executor = DtpRegistry.getDtpExecutor(x);metricsList.add(MetricsConverter.convert(executor));});List<String> commonNames = DtpRegistry.listAllCommonNames();commonNames.forEach(x -> {ExecutorWrapper wrapper = DtpRegistry.getCommonExecutor(x);metricsList.add(MetricsConverter.convert(wrapper));});val handlerMap = ApplicationContextHolder.getBeansOfType(MetricsAware.class);if (MapUtils.isNotEmpty(handlerMap)) {handlerMap.forEach((k, v) -> metricsList.addAll(v.getMultiPoolStats()));}JvmStats jvmStats = new JvmStats();RuntimeInfo runtimeInfo = new RuntimeInfo();jvmStats.setMaxMemory(FileUtil.readableFileSize(runtimeInfo.getMaxMemory()));jvmStats.setTotalMemory(FileUtil.readableFileSize(runtimeInfo.getTotalMemory()));jvmStats.setFreeMemory(FileUtil.readableFileSize(runtimeInfo.getFreeMemory()));jvmStats.setUsableMemory(FileUtil.readableFileSize(runtimeInfo.getUsableMemory()));metricsList.add(jvmStats);return metricsList;
}

六、DtpAdapterListener

这也是一个比较关键的类,实现了GenericApplicationListener接口,算是一个事件监听器,主要监听RefreshEvent、CollectEvent、AlarmCheckEvent这三个事件。

@Override
public void onApplicationEvent(@NonNull ApplicationEvent event) {try {if (event instanceof RefreshEvent) {doRefresh(((RefreshEvent) event).getDtpProperties());} else if (event instanceof CollectEvent) {doCollect(((CollectEvent) event).getDtpProperties());} else if (event instanceof AlarmCheckEvent) {doAlarmCheck(((AlarmCheckEvent) event).getDtpProperties());}} catch (Exception e) {log.error("DynamicTp adapter, event handle failed.", e);}
}

这里就是集中处理所有框架产生的事件了,我们先来看看事件都是在什么时机发布的。

6.1 CloudNacosRefresher

此处我们配置中心以cloudNacos为例来讲述

此类继承了SmartApplicationListener,所以这个类就是一个监听类,监听对象是RefreshScopeRefreshedEvent。

我们配置中心用的nacos,所以nacos更新配置的时候我们就会监听到刷新属性的事件。注:配置中心不同,监听对象不同。

@Override
public void onApplicationEvent(@NonNull ApplicationEvent event) {if (event instanceof RefreshScopeRefreshedEvent) {doRefresh(dtpProperties);}
}public void refresh(String content, ConfigFileTypeEnum fileType) {//如果更新的内容为空或者文件类型为空,则直接返回if (StringUtils.isBlank(content) || Objects.isNull(fileType)) {log.warn("DynamicTp refresh, empty content or null fileType.");return;}try {//获取到配置文件处理器,这里面维护了多重格式的文件解析器:比如properties、yaml以及json格式的解析器val configHandler = ConfigHandler.getInstance();//将配置文件解析为properties格式的数据val properties = configHandler.parseConfig(content, fileType);//当有配置变化的时候,执行刷新doRefresh(properties);} catch (IOException e) {log.error("DynamicTp refresh error, content: {}, fileType: {}", content, fileType, e);}
}protected void doRefresh(Map<Object, Object> properties) {if (MapUtil.isEmpty(properties)) {log.warn("DynamicTp refresh, empty properties.");return;}//这一步很关键,将发生变化的属性绑定到DtpProperties对象上PropertiesBinder.bindDtpProperties(properties, dtpProperties);//使用更新后的DtpProperties相关属性去更新线程池属性doRefresh(dtpProperties);
}protected void doRefresh(DtpProperties dtpProperties) {//调用具体刷新配置方法,此处就略过了,后续有兴趣可以查看源码DtpRegistry.refresh(dtpProperties);//发布刷新配置事件publishEvent(dtpProperties);}

七、队列

以上就是监控且刷新配置的过程,不过之前我们有一个疑问,为什么原生线程池不支持queue的修改,为什么dynatic-tp可以对线程池工作队列进行动态调整,那么接下来我们再看一下。

因为线程池原生使用的工作队列看源码我们会发现都是final 修饰的,所以是不可继承,不可修改的。dynatic-tp使用了新的队列VariableLinkedBlockingQueue 、MemorySafeLinkedBlockingQueue

private static void updateQueueProps(ThreadPoolExecutor executor, DtpExecutorProps props) {val blockingQueue = executor.getQueue();//如果队列是MemorySafeLinkedBlockingQueue,那么设置最大空前内存 默认是256Mif (blockingQueue instanceof MemorySafeLinkedBlockingQueue) {((MemorySafeLinkedBlockingQueue<Runnable>) blockingQueue).setMaxFreeMemory(props.getMaxFreeMemory() * M_1);}//如果队列是VariableLinkedBlockingQueue,那么设置队列容量值  默认是1024if (!(blockingQueue instanceof VariableLinkedBlockingQueue)) {log.warn("DynamicTp refresh, the blockingqueue capacity cannot be reset, poolName: {}, queueType {}",props.getThreadPoolName(), blockingQueue.getClass().getSimpleName());return;}int capacity = blockingQueue.size() + blockingQueue.remainingCapacity();if (!Objects.equals(capacity, props.getQueueCapacity())) {((VariableLinkedBlockingQueue<Runnable>) blockingQueue).setCapacity(props.getQueueCapacity());}
}

7.1 VariableLinkedBlockingQueue

这其实就是一个LinkedBlockingQueue队列,但是VariableLinkedBlockingQueue是支持修改容量的。队列默认长度为Integer.MAX_VALUE,所以当我们不对其进行限制时就会出现OOM的情况。

7.2 MemorySafeLinkedBlockingQueue

这是内存安全队列,通过控制内存大小从而控制队列的大小,当我们往队列添加元素的时候,会先判断有没有足够的空间,当我们不设置内存大小的时候默认256M,这样就不会出现OOM的情况。

两种队列的各自实现方法可以参考项目源码。。。

今天源码篇就介绍到这里谢谢大家!