> 文章列表 > xxl-job核心流程

xxl-job核心流程

xxl-job核心流程

xxl-job服务端
1、registryMonitorThread守护线程
     查询xxl_job_group表所有address_type=0执行器为自动注册的所有执行器,查询xxl_job_registry表所有update_time小于当前时间90秒
的应该死亡的注册地址信息,根据id删除所有xxl_job_registry表死亡的注册信息,查询xxl_job_registry表所有update_time大于当前时间90秒
的活着的注册信息,根据registry_group=EXECUTOR的信息根据registry_key即执行器编码形成分组,
找到注册到相同app_name的所有ip地址信息,遍历第一步查询出来的自动注册的所有执行器,将app_name相同的registry_key对应的ip地址以逗号分隔
更新到address_list字段,沉睡30秒后继续执行该守护线程的方法
2、monitorThread守护线程
        查询xxl_job_log表最旧的alarm_status=0默认告警且(handle_code = 200处理成功或者trigger_code in (0, 200) and handle_code = 0处理结束处理失败)
的1000条日志数据,循环所有的数据,将其状态alarm_status=0从0改成-1即无需警告,查出来的日志如果重试次数大于0的话进行重试操作并更新重试之后的xxl_job_log表数据,并对这些重试的日志进行邮件告警,配置多个邮件时,全部成功为告警成功,否则为告警失败,将xxl_job_log表数据改成alarm_status=2或3,即告警成功或告警失败
,沉睡10秒后继续执行该守护线程的方法

3、monitorThread守护线程
   先沉睡50秒,找xxl_job_log表调度成功handleCode=200且处理成功trigger_code = 200且调度时间trigger_time超过10分钟且xxl_job_registry表注册信息已经丢失的日志数据,循环查询出来的所有数据,将处理结果handleCode改成调度失败500,沉睡60秒后继续执行该守护线程的方法
4、logrThread守护线程
   查询xxl_job_log表最近3天处理中,处理成功、处理失败的数量,并将统计之后的结果更新到xxl_job_log_report表,当xxl.job.logretentiondays(默认30天)配置的日志清理天数大于7天且距离上次清理日志时间大于24小时,循环分页查询xxl_job_log表trigger_time小于配置的日志清理时间的最近1000条数据,
这1000条数据根据id进行清理,将上次清理日志时间更新为当前时间,沉睡1分钟后继续执行该守护线程的方法
5、scheduleThread守护线程
   随机休眠4秒到5秒的时间,分页一次读取6000条(xxl.job.triggerpool.fast.max+xxl.job.triggerpool.slow.max)*20  xxl_job_info表运行中且下次触发时间trigger_next_time不超过当前5秒后的数据,循环所有的数据,对于下次触发时间<当前时间-5秒的数据根据
过期策略决定是否要重试,将下次触发时间刷到最新的时间,对于当前时间-5秒<下次触发时间<当前时间的数据,及与现在相比5秒内过期的数据,直接执行一次触发调度,将下次触发时间刷到最新,如果当前时间<下次触发时间<当前时间+5的数据,即5秒内即将触发的
数据,int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60),根据下次触发时间将数据分割成以秒为单位的时间轮,Map<Integer, List<Integer>> ringData即存放不同的秒数的时间轮数据,value为当前所有任务id的列表,
然后将下次触发时间更新成最新的数据,然后循环更新后的结果list,数据库里面将数据更新到最新,提交更新,如果总的花费时间小于1秒的话休眠0秒到5秒
6、ringThread守护线程
   随机休眠0秒到1秒,获取到当前时间及当前时间向前一秒的时间轮数据,删除ringData数据并获取到所有的jobid,循环遍历所有的jobid列表,触发调度,然后继续循环执行该守护线程的方法

xxl-job客户端
1、XxlJobSpringExecutor注入到IOC容器后根据SmartInitializingSingleton和DisposableBean接口完成一系列的初始化操作和关闭时的销毁操作,根据applicationContext.getBeanNamesForType找到所有注到IOC容器的bean
,循环遍历所有的beanName数组,跳过@Lazy懒加载的bean,根据Object bean = applicationContext.getBean(beanDefinitionName)获取到当前Bean,根据Map<Method, XxlJob> annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
                        (MethodIntrospector.MetadataLookup<XxlJob>) method -> AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class));方法获取到所有添加XxlJob注解的方法,循环遍历所有的annotatedMethods,注册job的信息,
然后将注册的方法,初始化的方法以及销毁的方法放入ConcurrentMap<String, IJobHandler> jobHandlerRepository中
2、创建SpringGlueFactory工厂
3、执行启动方法
3.1 根据xxl.job.executor.logpath配置初始化日志地址,没有配置日志地址则有一个默认的日志地址/data/applogs/xxl-job/jobhandler,给logBasePath和glueSrcPath赋值
3.1 根据配置的注册中心地址xxl.job.admin.addresses和accessToken初始化List<AdminBiz> adminBizList客户端,
4、localThread守护线程
    找到xxl.job.executor.logpath日志信息的所有文件,遍历该文件夹下的所有文件,文件夹跳过,如果文件的创建时间<xxl.job.executor.logretentiondays配置的日志文件过期时间,则删除该文件,沉睡1天后继续执行该守护线程的方法
5、triggerCallbackThread守护线程
    从阻塞队列里面callBackQueue拿到回调参数HandleCallbackParam,将阻塞队列中的所有元素转移到新的callbackParamList列表,然后callbackParamList添加从阻塞队列拿出来的HandleCallbackParam,调用注册中心的回调接口api/callback
通知注册中心调用结果,回调成功后在本地保存xxl_job_log表回调成功的日志信息,失败保存失败信息,按照时间生成当前回调参数的日志文件,当任务停止后最后一次把阻塞队列里面的参数全都拿出来进行回调处理
6、triggerRetryCallbackThread守护线程
    找到所有的失败回调日志文件,反序列化其参数,删除改文件,继续尝试回调操作,沉睡30秒后继续执行该守护线程的方法
7、初始化executor-server服务器
   启动ServerBootstrap bootstrap netty服务器,端口默认为9999接收注册中心的回调信息,开始注册客户端信息到注册中心,
8、registryThread守护线程
   根据配置的appname和本地的netty服务器address地址构建xxl_job_registry表对象RegistryParam,循环所有的adminBizList客户端,执行注册方法registry,注册中心的register方法把xxl_job_registry的更新时间更新到当前时间,更新失败则直接保存新的数据
   沉睡30秒后继续执行该守护线程的方法,当线程停止后调用移除注册信息的方法,注册中心会删除注册的数据信息