> 文章列表 > 分布式任务调度系统分析

分布式任务调度系统分析

分布式任务调度系统分析

背景介绍

首先,我们来思考一些几个业务场景:

  • XX 信用卡中心,每月 28 日凌晨 1:00 到 3:00 需要完成全网用户当月的费用清单的生成
  • XX 电商平台,需要每天上午 9:00 开始向会员推送送优惠券使用提醒
  • XX 公司,需要定时执行 Python 脚本,清理掉某文件服务系统中无效的 tmp 文件

最开始,在单台服务器使用Linux Cron 就能满足定时任务需求,但是随着任务数量的不断增长,单机模式会对机器负载产生巨大的压力,无法保证正常地触发运行任务。由此,就诞生了各种个样的分布式定时任务调度平台,比如 Quartz、XXL-Job、ElasticJob,PowerJob。但是,大部分公司可能都会选择自研

  • 自研更容易适配自有基础框架和技术工具
  • 自研系统的架构可灵活调整,并适配业务
  • 对开源项目做二次开发或者封装第三方 SDK 的开发和维护成本也不低

分布式任务调度系统设计

分布式任务调度系统分析

  • 采用分布式架构,解决单体架构遇到的性能瓶颈问题
  • 主要由调度器,执行器和Web控制台,API服务四个模块构成
    • 根据配置的路由策略进行调度计算、执行和停止具体任务、界面化管理任务和集群资源
      整个系统的核心在于调度器,调度器会实现负责管理任务的生命周期,维护任务的依赖关系(DAG 编排),支持定时任务触发,监控任务状态,管理任务的生命周期,维护任务状态机。
    • 分配任务到指定执行器。根据任务的类型、等待时间、优先级等信息,按照多种调度算法,对任务进行调度并将任务分发给合理的 执行器 来执行任务

开源系统 XXX-Job

一个分布式任务调度系统,基本会实现几个任务注册,任务调度,任务执行几个核心点

  • 任务注册:业务方注册任务到XXX-Job Admin
  • 任务触发:XXX-Job Admin 根据配置触发任务调度
  • 任务调度:任务触发之后,根据调度算法,找到执行器
  • 任务执行 :执行器执行任务,返回结果

XXX-Job

分布式任务调度系统分析

调度中心

负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块;
支持可视化、简单且动态的管理调度信息,包括任务新建,更新,删除,GLUE开发和任务报警等,所有上述操作都会实时生效,同时支持监控调度结果以及执行日志,支持执行器Failover,支持创建执行器等功能。

执行器

负责接收调度请求并执行任务逻辑。任务模块专注于任务的执行等操作,开发和维护更加简单和高效;接收“调度中心”的执行请求、终止请求和日志请求等。

项目构成

分布式任务调度系统分析

  • admin 是xxljob的控制台,可以配置执行器,定时任务,dashboard查看等功能
  • core 是业务方也要引入的jar包,内置通过netty于admin进行通信
  • samples 项目是测试项目,包含传统spring项目如何引入xxljob和springboot如何引入

核心类调用关系

分布式任务调度系统分析

类名 作用 备注
XxlJobAdminConfig 负责创建XxlJobScheduler实例
XxlJobScheduler 负责创建各种线程,包括任务注册主线程,调度容器的主线程,以及调度参数的配置线程池 JobTriggerPoolHelper
JobScheduleHelper 调度容器,创建一个守护线程查询所有下次执行时间在当前时间5秒内的定时任务,并按条件执行
JobTriggerPoolHelper 创建操作XxlJobTrigger的线程池,并添加trigger
XxlJobTrigger 表示一个调度参数的配置,会查询具体的定时任务信息XxlJobInfo
XxlJob 定义执行器的注解
JobThread 调用IJobHandler的executer执行任务,并回调调度中心
IJobHandler 抽象的执行器接口,定义了要执行的具体内容,同样的也是一个execute方法
EmbedServer 内嵌的Server,默认端口是9999
ExecutorBiz 其中的run方法用于调用执行器,有两个是实现类ExecutorBizImpl以及ExecutorBizClient 。

任务注册

xxlJobExecutor.setXxlJobBeanList(Arrays.asList(new SampleXxlJob())); 注册自己的任务

public void initXxlJobExecutor() {// load executor propProperties xxlJobProp = loadProperties("xxl-job-executor.properties");// init executorxxlJobExecutor = new XxlJobSimpleExecutor();xxlJobExecutor.setAdminAddresses(xxlJobProp.getProperty("xxl.job.admin.addresses"));xxlJobExecutor.setAccessToken(xxlJobProp.getProperty("xxl.job.accessToken"));xxlJobExecutor.setAppname(xxlJobProp.getProperty("xxl.job.executor.appname"));xxlJobExecutor.setAddress(xxlJobProp.getProperty("xxl.job.executor.address"));xxlJobExecutor.setIp(xxlJobProp.getProperty("xxl.job.executor.ip"));xxlJobExecutor.setPort(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.port")));xxlJobExecutor.setLogPath(xxlJobProp.getProperty("xxl.job.executor.logpath"));xxlJobExecutor.setLogRetentionDays(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.logretentiondays")));// registry job beanxxlJobExecutor.setXxlJobBeanList(Arrays.asList(new SampleXxlJob()));// start executortry {xxlJobExecutor.start();} catch (Exception e) {logger.error(e.getMessage(), e);}
}
xxlJobExecutor 会执行 initJobHandlerMethodRepository :将任务处理handler记录在本地super.start() 调用 XxlJobExecutor@Override
public void start() {// init JobHandler Repository (for method)initJobHandlerMethodRepository(xxlJobBeanList);// super starttry {super.start();} catch (Exception e) {throw new RuntimeException(e);}
}
  • TriggerCallbackThread: 执行handler 之后上班结果的线程
  • initEmbedServer: 初始化服务,这里面也会把自己注册到 admin 里面去,让自己成为一个 executor
public void start() throws Exception {// init logpathXxlJobFileAppender.initLogPath(logPath);// init invoker, admin-clientinitAdminBizList(adminAddresses, accessToken);// init JobLogFileCleanThreadJobLogFileCleanThread.getInstance().start(logRetentionDays);// init TriggerCallbackThreadTriggerCallbackThread.getInstance().start();// init executor-serverinitEmbedServer(address, ip, port, appname, accessToken);
}

任务调度

admin的核心功能

public void init() throws Exception {//初始化fastTriggerPool和slowTriggerPool线程池对象JobTriggerPoolHelper.toStart();/*** 开启线程,每90s查询执行器的数据,如果执行器上次更新时间超过90s未更新,就移除这个执行器,并把存活的执行器更新*/JobRegistryHelper.getInstance().start();/*** 启动线程,查找任务执行失败的任务,* 1.设置了重试次数,就再次触发任务* 2.判断是否需邮件预警*/JobFailMonitorHelper.getInstance().start();/*** 启动线程,处理任务结果丢失的数据* 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;*/JobCompleteHelper.getInstance().start();JobLogReportHelper.getInstance().start();/*** 启动线程,执行任务*/JobScheduleHelper.getInstance().start();logger.info(">>>>>>>>> init xxl-job admin success.");
}

任务查询

  • 获取数据库的 lock:

    • preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
  • 找到待调度的任务

    • List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
  • 下发任务:

    • JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
public void start(){// schedule threadscheduleThread = new Thread(new Runnable() {@Overridepublic void run() {try {TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );} catch (InterruptedException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}logger.info(">>>>>>>>> init xxl-job admin scheduler success.");// pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;while (!scheduleThreadToStop) {// Scan Joblong start = System.currentTimeMillis();Connection conn = null;Boolean connAutoCommit = null;PreparedStatement preparedStatement = null;boolean preReadSuc = true;try {conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();connAutoCommit = conn.getAutoCommit();conn.setAutoCommit(false);preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );preparedStatement.execute();// tx start// 1、 查询从当前时间+5秒内要执行的任务long nowTime = System.currentTimeMillis();List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);if (scheduleList!=null && scheduleList.size()>0) {// 2、push time-ringfor (XxlJobInfo jobInfo: scheduleList) {// time-ring jump todo 如果任务超时5秒if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {// 2.1、trigger-expire > 5s:pass && make next-trigger-timelogger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());// 1、misfire matchMisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {// FIRE_ONCE_NOW 》 trigger todo 如果任务配置的"调度过期策略"是"立即执行一次",那么就触发一次任务JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );}// 2、fresh next todo 从当前时间开始,计算任务的下一次执行时间refreshNextValidTime(jobInfo, new Date());} else if (nowTime > jobInfo.getTriggerNextTime()) {// 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time todo 任务执行时间在当前时间的5s内// 1、trigger  触发任务JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );// 2、fresh next todo 从当前时间开始,计算任务的下一次执行时间refreshNextValidTime(jobInfo, new Date());// next-trigger-time in 5s, pre-read again todo 如果下一次的执行时间在未来5s内if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {// 1、make ring secondint ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);// 2、push time ringpushTimeRing(ringSecond, jobInfo.getId());// 3、fresh nextrefreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}} else {// 还没有到达任务执行的时间// 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time// 1 计算剩余的秒数字int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);// 2、把剩余秒数--任务id存入map中 ;==>下面的 ringThread 线程,会每一秒执行一次,查到对应的数据后,触发任务pushTimeRing(ringSecond, jobInfo.getId());// 3、重新计算下一次调度时间refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}}// 3、update trigger info TODO 修改jonInfo的内容for (XxlJobInfo jobInfo: scheduleList) {XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);}} else {preReadSuc = false;}

任务下发

  • 初始化下发参数, triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());

  • 找到需要下发的地址: routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());

  • 路由算法
    分布式任务调度系统分析

  • 调用执行器接口:·XxlJobRemotingUtil.postBody(addressUrl + “run”, accessToken, timeout, triggerParam, String.class);·

private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){// paramExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategyExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategyString shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;XxlJobLog jobLog = new XxlJobLog();jobLog.setJobGroup(jobInfo.getJobGroup());jobLog.setJobId(jobInfo.getId());jobLog.setTriggerTime(new Date());XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());// 初始化下发的参数TriggerParam triggerParam = new TriggerParam();triggerParam.setJobId(jobInfo.getId());triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());/ 代码省略// 找到需要下发的地址String address = null;ReturnT<String> routeAddressResult = null;if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {// 广播地址处理if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {if (index < group.getRegistryList().size()) {address = group.getRegistryList().get(index);} else {address = group.getRegistryList().get(0);}} else {// 根据路由找到地址routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {address = routeAddressResult.getContent();}}} else {routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));}// 4、trigger remote executorReturnT<String> triggerResult = null;if (address != null) {// 调用执行器// triggerResult = runExecutor(triggerParam, address);} else {triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);}/ 代码省略logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());

调用执行器接口

public ReturnT<String> run(TriggerParam triggerParam) {return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
}

任务执行

admin 会调用执行器的 addressUrl + “run”,这个接口主要用来触发任务执行的

// services mapping
try {switch (uri) {case "/beat":// 类似心跳接口return executorBiz.beat();case "/idleBeat":IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);return executorBiz.idleBeat(idleBeatParam);case "/run":TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);return executorBiz.run(triggerParam);case "/kill":KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);return executorBiz.kill(killParam);case "/log":LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);return executorBiz.log(logParam);default:return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");}
} catch (Exception e) {logger.error(e.getMessage(), e);return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
}
  • 这里面主要做下面几件事情
    • 根据任务类型,创建不同的 jobHandler
    • 创建一个线程执行 jobHandler,监听参数队列
    • 将任务参数给到 参数队列,jobHandler线程 获取参数
public ReturnT<String> run(TriggerParam triggerParam) {// load old:jobHandler + jobThread TODO 查询这个任务的线程,第一次执行任务,是没有这个任务的JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;String removeOldReason = null;// 根据任务类型,创建不同的 jobHandlerGlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());if (GlueTypeEnum.BEAN == glueTypeEnum) {//  创建 jobhandler,备注,执行器在启动的时候,会扫描@xxlJob注解修饰的方法,注册到map中,这里直接取出来,IJobHandler就是对jdk的method对象的封装IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());if (jobThread!=null && jobHandler != newJobHandler) {// change handler, need kill old threadremoveOldReason = "change jobhandler or glue type, and terminate the old job thread.";jobThread = null;jobHandler = null;}if (jobHandler == null) {jobHandler = newJobHandler;if (jobHandler == null) {return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");}}} else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {// valid old jobThreadif (jobThread != null &&!(jobThread.getHandler() instanceof GlueJobHandler&& ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {// change handler or gluesource updated, need kill old threadremoveOldReason = "change job source or glue type, and terminate the old job thread.";jobThread = null;jobHandler = null;}if (jobHandler == null) {try {IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());} catch (Exception e) {logger.error(e.getMessage(), e);return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());}}}  // 判断堵塞策略if (jobThread != null) {ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);// DISCARD_LATER,不能直接执行的话,就直接退出if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {// discard when runningif (jobThread.isRunningOrHasQueue()) {return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());}} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {//替换一个新的 jobThreadif (jobThread.isRunningOrHasQueue()) {removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();jobThread = null;}} else {// just queue trigger}}//  jobThread 为null的话,就创建一个,并执行线程,是一个死循环,从 triggerQueue中读取参数,并执行if (jobThread == null) {// 这里会stop 掉之前旧的存在的 jobThreadjobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);}//  把参数放在任务队列线程中的 triggerQueue 中//  pushTriggerQueue 也会判断是否是重复执行的 jobReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);return pushResult;
}

结果返回

  • 将结果给:TriggerCallbackThread.pushCallBack
  • TriggerCallbackThread 消费返回的结果
  • admin 接受返回结果,写入到DB
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.getXxlJobContext().getHandleCode(),XxlJobContext.getXxlJobContext().getHandleMsg() )

拿到任务的执行结果

@Overridepublic void run() {// normal callbackwhile(!toStop){try {// 拿到任务的执行结果HandleCallbackParam callback = getInstance().callBackQueue.take();if (callback != null) {// callback list paramList<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);callbackParamList.add(callback);// callback, will retry if errorif (callbackParamList!=null && callbackParamList.size()>0) {// 调用 admin 接口:api/callback 给 admin doCallback(callbackParamList);}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}}// 代码省略}
});

总结

一个任务的生命周期

分布式任务调度系统分析

不足之处

  • 待调度的任务存储在一张表中,如果待调度任务过多的时候,可能会造成任务调度延迟
  • 只能选择一个调度策略
    • 执行器服务是优雅关闭,会自动调用 /reigstRomeve方法告诉admin,自己移除,admin会通过定时任务扫描,每90秒扫描一次执行器上次注册到时间,如果超过90s,就主动移除这个执行器异常退出,某些策略会导致任务在90s内一直失败(比如第一个)
  • 分片策略,这个策略是所有的机器都执行相同的参数,由执行器自己区分

一个任务调度系统核心点

  • 数据量大时保证调度时间:分库分表
  • 任务丢失处理:调度器定时扫描执行时间过长的任务

// 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
Date losedTime = DateUtil.addMinutes(new Date(), -10);
List<Long> losedJobIds  = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);
  • 任务幂等性:不能重复执行,在执行器记录现在有哪一些任务在执行, 但是,执行器在执行业务代码时也建议最好幂等
public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {//  triggerLogIdSet 记录了当前执行器正在执行的任务,发现重复时不会写入到 triggerQueue 中if (triggerLogIdSet.contains(triggerParam.getLogId())) {logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());}triggerLogIdSet.add(triggerParam.getLogId());triggerQueue.add(triggerParam);return ReturnT.SUCCESS;
}
  • 调度策略:解决如何分配到哪一个执行器问题,实现负载均衡,分片处理,高容错,节点故障转移作业
    • 实现负载均衡的一个需要执行器上报当前的状态:当前的任务执行数量,待调度的任务数量,读取服务器系统负载水平,根据这些,选择一个综合数值最小的执行器出来
  • 分片处理:分片处理通常需要调度器支持
    • java 实现分片
private static String calculatedExecutorParamValue(XxlJobInfo jobInfo,Integer index,Integer adminTotal) {String executorParam = jobInfo.getExecutorParam();if(StringUtils.isEmpty(executorParam)){return null;}try {List<String> paramList = JSONObject.parseArray(executorParam, String.class);if(CollectionUtils.isEmpty(paramList)){return null;}List<List<String>> paramAverageList = averageAssign(paramList, adminTotal);List<String> result = paramAverageList.get(index);return JSON.toJSONString(result);}catch (Exception e){logger.error("分片并行计算,解析参数错误,参数:{},错误原因:{}",executorParam,e.getMessage());}return null;
}public static <T> List <List<T>> averageAssign(List<T>source,int n){List <List<T>> result=new ArrayList<List<T>>();int remainder=source.size()%n;  //先计算出余数int number=source.size()/n;  //然后是商int offset=0;//偏移量(用以标识加的余数)for(int i=0;i<n;i++){List<T>value;if(remainder>0){value=source.subList(i*number+offset, (i+1)*number+offset+1);remainder--;offset++;}else{value=source.subList(i*number+offset, (i+1)*number+offset);}result.add(value);}return result;
}
  • go 实现分片
package mainimport ("fmt"
)func chunk(slice []int, size int) [][]int {var chunks [][]intchunkSize := (len(slice) + size - 1) / sizefor i := 0; i < len(slice); i += chunkSize {end := i + chunkSizeif end > len(slice) {end = len(slice)}chunks = append(chunks, slice[i:end])}return chunks
}func main() {slice := []int{1, 2, 3, 4, 5}chunks := chunk(slice, 2)fmt.Println(chunks)
}

任务调度依赖:工作流模式调度依赖,一个有向无环图:DAG(directed acyclic graph)

分布式任务调度系统分析

package mainimport ("fmt""sync""time"
)//图结构
type DAG struct {Vertexs []*Vertex
}//顶点
type Vertex struct {Key      stringValue    interface{}Parents  []*VertexChildren []*Vertex
}//添加顶点
func (dag *DAG) AddVertex(v *Vertex) {dag.Vertexs = append(dag.Vertexs, v)
}//添加边
func (dag *DAG) AddEdge(from, to *Vertex) {from.Children = append(from.Children, to)to.Parents = append(to.Parents, from)
}func main() {var dag = &DAG{}//添加顶点va := &Vertex{Key: "a", Value: "1"}vb := &Vertex{Key: "b", Value: "2"}vc := &Vertex{Key: "c", Value: "3"}vd := &Vertex{Key: "d", Value: "4"}ve := &Vertex{Key: "e", Value: "5"}vf := &Vertex{Key: "f", Value: "6"}vg := &Vertex{Key: "g", Value: "7"}vh := &Vertex{Key: "h", Value: "8"}vi := &Vertex{Key: "i", Value: "9"}//添加边dag.AddEdge(va, vb)dag.AddEdge(va, vc)dag.AddEdge(va, vd)dag.AddEdge(vb, ve)dag.AddEdge(vb, vh)dag.AddEdge(vb, vf)dag.AddEdge(vc, vf)dag.AddEdge(vc, vg)dag.AddEdge(vd, vg)dag.AddEdge(vh, vi)dag.AddEdge(ve, vi)dag.AddEdge(vf, vi)dag.AddEdge(vg, vi)//[1] [] { a }//[2] [] { b, c, d }//[3] [] { h, e, f, g }//[4] [] { i }all := LayerBFS(va)startTime := time.Now()for _, layer := range all {fmt.Println("------------------")doTasks(layer)}fmt.Printf("cost:%f\\n", time.Since(startTime).Seconds())
}type Queue []interface{}func (q *Queue) Push(x interface{}) {*q = append(*q, x)
}func (q *Queue) Pop() interface{} {h := *qvar el interface{}l := len(h)el, *q = h[0], h[1:l]return el
}func (q *Queue) Len() int {return len(*q)
}func NewQueue() *Queue {return &Queue{}
}func LayerBFS(root *Vertex) [][]*Vertex {queue := NewQueue()queue.Push(root)visited := make(map[string]*Vertex)all := make([][]*Vertex, 0)for queue.Len() > 0 {qSize := queue.Len()tmp := make([]*Vertex, 0)for i := 0; i < qSize; i++ {//pop vertexe := queue.Pop()currVert := e.(*Vertex)if _, ok := visited[currVert.Key]; ok {continue}visited[currVert.Key] = currVerttmp = append(tmp, currVert)for _, val := range currVert.Children {if _, ok := visited[val.Key]; !ok {queue.Push(val) //add child}}}all = append(all, [][]*Vertex{tmp}...)}return all
}//并发执行
func doTasks(vertexs []*Vertex) {var wg sync.WaitGroupstartTime := time.Now()for _, v := range vertexs {wg.Add(1)go func(v *Vertex) {defer wg.Done()time.Sleep(2 * time.Second)fmt.Printf("do %v, result is %v \\n", v.Key, v.Value)}(v) //notice}wg.Wait()fmt.Printf("cost:%0.0f\\n", time.Since(startTime).Seconds())
}

文档参考

  • 实现一个任务调度系统,看这篇就够了
  • 伴鱼分布式调度系统 Jarvis 的设计与实现
  • 如何设计一个分布式任务调度系统