> 文章列表 > 分布式任务调度框架Power-Job

分布式任务调度框架Power-Job

分布式任务调度框架Power-Job

分布式任务调度框架的由来及对比

在大型业务业务系统中,不可避免会出现一些需要定时执行需求的场景,例如定时同步数据,定时清洗数据,定时生成报表,大量机器一同执行某个任务,甚至有些需要分布式处理的任务例如需要更新一大批数据,单机耗时太长需要进行任务分发,利用集群的计算能力等等

现今为止,市面上流行的作业调度框架有老牌的Quartz、基于Quartz的elastic-job和原先基于Quartz后面移除依赖的xxl-job,Quartz可以视为第一代任务调度框架,基本上是现有所有分布式调度框架的“祖宗”。它不提供Web界面,只能通过API完成任务的配置,使用起来不够方便和灵活,同时它仅支持单机执行,无法有效利用整个集群的计算能力。

xxl-job可以视为第二代任务调度框架,在一定程度上解决了Quartz的不足,在过去几年中是个非常优秀的调度框架,不过放到今天来看,还是存在着一些不足的:

  • 例如数据库支持单一: 仅支持MySQL,使用其他DB需要自己改代码
  • 有限的分布式计算能力: 仅支持静态分片,无法很好的完成复杂任务的计算
  • 不支持工作流: 无法配置各个任务之间的依赖关系,不适用于有DAG需求的场景

PowerJob可以被认为是第三代任务调度框架,它是新一代分布式任务调度与计算框架,支持CRON、API、固定频率、固定延迟等调度策略,提供工作流来编排任务解决依赖关系,能让您轻松完成作业的调度与繁杂任务的分布式计算,在任务调度的基础上,还额外提供了分布式计算和工作流功能,其主要特性如下:

  • 使用简单: 提供前端Web界面,允许开发者可视化地完成调度任务的管理(增、删、改、查)、任务运行状态监控和运行日志查看等功能
  • 定时策略完善: 支持CRON表达式、固定频率、固定延迟和API四种定时调度策略
  • 执行模式丰富:支持单机、广播、Map、MapReduce四种执行模式,其中Map/MapReduce处理器能使开发者寥寥数行代码便获得集群分布式计算的能力
  • DAG工作流支持: 支持在线配置任务依赖关系,可视化得对任务进行编排,同时还支持上下游任务间的数据传递
  • 执行器支持广泛: 支持SpringBean、内置/外置Java类、Shell、Python等处理器,应用范围广
  • 运维便捷:支持在线日志功能,执行器产生的日志可以在前端控制台页面实时显示,降低debug成本,极大地提高开发效率
  • 依赖精简:最小仅依赖关系型数据库(MySQL/PostgreSQL/Oracle/MS SQLServer…),同时支持所有Spring Data JPA所支持的关系型数据库
  • 高可用&高性能:调度服务器经过精心设计,一改其他调度框架基于数据库锁的策略,实现了无锁化调度。部署多个调度服务器可以同时实现高可用和性能的提升(支持无限的水平扩展)
  • 故障转移与恢复: 任务执行失败后,可根据配置的重试策略完成重试,只要执行器集群有足够的计算节点,任务就能顺利完成
QuartZ xxl-job SchedulerX2.0 Power-job
定时类型 CRON CRON CRON、固定频率、固定延迟、OpenAPI CRON、固定频率、固定延迟、OpenAPI
任务类型 内置Java 内置Java、GLUE Java、Shell、Python等脚本 内置Java、外置Java(FatJar)、Shell、Python等脚本 内置Java、外置Java(容器)、Shell、Python等脚本
分布式任务 静态分片 MapReduce动态分片 MapReduce动态分片
在线任务治理 不支持 支持 支持 支持
日志白屏化 不支持 支持 不支持 支持
调度方式及性能 基于数据库锁、有性能瓶颈 基于数据库锁、有性能瓶颈 无锁化设计、性能空间极大
系统依赖 JDBC支持的关系型数据库类似于MySQL、Oracle等 MySQL 公测期间免费、增加需付费 任意Spring Data Jpa支持的关系型数据库
报警监控 邮件 短信 邮件且提供了可扩展接口
DAG工作流 不支持 不支持 支持 支持

Power-Job分布式任务调度框架

分布式任务调度框架Power-Job

Power-Job项目初始化

Administrator@DESKTOP-173DTHK MINGW64 /d/E/Programs
$ git clone https://github.com/KFCFans/PowerJob.git
Cloning into 'PowerJob'...
remote: Enumerating objects: 28221, done.
remote: Counting objects: 100% (423/423), done.
remote: Compressing objects: 100% (147/147), done.
remote: Total 28221 (delta 121), reused 422 (delta 121), pack-reused 27798
Receiving objects: 100% (28221/28221), 18.96 MiB | 286.00 KiB/s, done.
Resolving deltas: 100% (11330/11330), done.Administrator@DESKTOP-173DTHK MINGW64 /d/E/Programs

导入 IDE,源码结构如下,我们需要启动调度服务器(powerjob-server),同时在 samples 工程中编写自己的处理器代码
分布式任务调度框架Power-Job

PowerJob由调度服务器(powerjob-server)和执行器(powerjob-worker)两部分组成,powerjob-server负责提供Web服务和完成任务的调度,powerjob-worker则负责执行用户所编写的任务代码,同时提供分布式计算能力

启动调度服务器

  1. 创建数据库(仅需要创建数据库):create database `powerjob-daily` default character set utf8mb4 collate utf8mb4_general_ci;
  2. 修改配置文件,配置文件的说明官方文档写的非常详细,需要修改的地方(路径: PowerJob/powerjob-server/powerjob-server-starter/src/main/resources/application-daily.properties)为数据库配置:
    • spring.datasource.core.jdbc-url
    • spring.datasource.core.username
    • spring.datasource.core.password
    • mongoDB 的同学也可以修改spring.data.mongodb.uri

powerjob-server 日常环境配置文件:application-daily.properties

oms.env=DAILY
logging.config=classpath:logback-dev.xml####### 外部数据库配置(需要用户更改为自己的数据库配置) #######
spring.datasource.core.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.core.jdbc-url=jdbc:mysql://localhost:3306/powerjob-daily?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
spring.datasource.core.username=root
spring.datasource.core.password=No1Bug2Please3!
spring.datasource.core.hikari.maximum-pool-size=20
spring.datasource.core.hikari.minimum-idle=5####### mongoDB配置,非核心依赖,通过配置 oms.mongodb.enable=false 来关闭 #######
oms.mongodb.enable=true
spring.data.mongodb.uri=mongodb://localhost:27017/powerjob-daily####### 邮件配置(不需要邮件报警可以删除以下配置来避免报错) #######
spring.mail.host=smtp.163.com
spring.mail.username=zqq@163.com
spring.mail.password=GOFZPNARMVKCGONV
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true####### 资源清理配置 #######
oms.instanceinfo.retention=1
oms.container.retention.local=1
oms.container.retention.remote=-1####### 缓存配置 #######
oms.instance.metadata.cache.size=1024

如果你暂时没有可用的数据库进行测试,那么可以使用以下数据库配置来一键启动 server

spring.datasource.core.driver-class-name=org.h2.Driver
spring.datasource.core.jdbc-url=jdbc:h2:file:~/h2/powerjob-daily-test
spring.datasource.core.username=sa
spring.datasource.core.password=
  1. 完成配置文件的修改后,可以直接通过启动类 tech.powerjob.server.PowerJobServerApplication 启动调度服务器(注意:需要使用 daily 配置文件启动,可自行百度搜索“SpringBoot 指定配置文件启动”),观察启动日志,查看是否启动成功~启动成功后,访问 http://127.0.0.1:7700/ ,如果能顺利出现 Web 界面,则说明调度服务器启动成功!
  2. 注册应用:点击主页应用注册按钮,填入 powerjob-agent-test 和控制台密码(用于进入控制台),注册示例应用(当然你也可以注册其他的 appName,只是别忘记在示例程序中同步修改~)

分布式任务调度框架Power-Job

编写示例代码

进入示例工程(powerjob-worker-samples),修改配置文件连接powerjob-server并编写自己的处理器代码

  1. 修改 powerjob-worker-samples 的 application.properties,将 powerjob.worker.app-name 改为刚刚在控制台注册的名称
server.port=8081spring.jpa.open-in-view=false########### powerjob-worker 配置 ###########
# akka 工作端口,可选,默认 27777
powerjob.worker.akka-port=27777
# 接入应用名称,用于分组隔离,推荐填写 本 Java 项目名称
powerjob.worker.app-name=powerjob-agent-test
# 调度服务器地址,IP:Port 或 域名,多值逗号分隔
powerjob.worker.server-address=127.0.0.1:7700,127.0.0.1:7701
# 持久化方式,可选,默认 disk
powerjob.worker.store-strategy=disk
# 任务返回结果信息的最大长度,超过这个长度的信息会被截断,默认 8192
powerjob.worker.max-result-length=4096
# 单个任务追加的工作流上下文最大长度,超过这个长度的会被直接丢弃,默认 8192
powerjob.worker.max-appended-wf-context-length=4096
  1. 编写自己的处理器:随便找个地方新建类,继承你想要使用的处理器(各个处理器的介绍可见官方文档官方文档,文档非常详细),这里为了简单演示,选择使用单机处理器 BasicProcessor,以下是代码示例
@Slf4j
@Component
public class StandaloneProcessorDemo implements BasicProcessor {@Overridepublic ProcessResult process(TaskContext context) throws Exception {// PowerJob 在线日志功能,使用该 Logger 打印的日志可以直接在 PowerJob 控制台查看OmsLogger omsLogger = context.getOmsLogger();omsLogger.info("StandaloneProcessorDemo start process,context is {}.", context);return new ProcessResult(true, "process successfully~");}
}
  1. 启动示例程序,即直接运行主类 tech.powerjob.samples.SampleApplication,观察控制台输出信息,判断是否启动成功。

任务的配置与运行

调度服务器与示例工程都启动完毕后,再次前往Web页面( http://127.0.0.1:7700/ ),进行任务的配置与运行

  1. 在首页输入框输入配置的应用名称,成功操作后会正式进入前端管理界面
    分布式任务调度框架Power-Job

  2. 点击任务管理 -> 新建任务(右上角),开始创建任务

分布式任务调度框架Power-Job
● 任务名称:名称
● 任务描述:描述
● 任务参数:任务处理时能够获取到的参数(即各个 Processor 的 process 方法入参 TaskContext 对象的jobParams 属性)(进行一次处理器开发就能理解了)
● 定时信息:该任务的触发方式,由下拉框和输入框组成
○ API -> 不需要填写任何参数,表明该任务由OpenAPI触发,不会被调度器主动调度执行
○ CRON -> 填写 CRON 表达式(在线生成)
○ 固定频率 -> 任务以固定的频率执行,填写整数,单位毫秒
○ 固定延迟 -> 任务以固定的延迟执行,填写整数,单位毫秒
○ 工作流 -> 不需要填写任何参数,表明该任务由工作流(workflow)触发
● 执行配置:由执行类型(单机、广播和 MapReduce )、处理器类型和处理器参数组成,后两项相互关联。
○ 内置Java处理器 -> 填写该处理器的全限定类名(eg, tech.powerjob.samples.processors.MapReduceProcessorDemo)
○ Java容器 -> 填写容器ID#处理器全限定类名(eg,1#cn.edu.zju.oms.container.ContainerMRProcessor)
○ SHELL、Python、SQL 、HTTP 等任务的执行:官方处理器的使用教程
● 运行配置
○ 最大实例数:该任务同时执行的数量,0 代表不限制实例数量
○ 单机线程并发数:该实例执行过程中每个 Worker 使用的线程数量(MapReduce 任务生效,其余无论填什么,都只会使用必要的线程数)
○ 运行时间限制:限定任务的最大运行时间,超时则视为失败,单位毫秒,0 代表不限制超时时间(不建议不限制超时时间)。
● 重试配置:
○ Instance 重试次数:实例级别,失败了整个任务实例重试,会更换 TaskTracker(本次任务实例的Master节点),代价较大,大型Map/MapReduce慎用
○ Task 重试次数:Task 级别,每个子 Task 失败后单独重试,会更换 ProcessorTracker(本次任务实际执行的 Worker 节点),代价较小,推荐使用
○ 注:请注意同时配置任务重试次数和子任务重试次数之后的重试放大,比如对于单机任务来说,假如任务重试次数和子任务重试次数都配置了 1 且都执行失败,实际执行次数会变成 4 次!推荐任务实例重试配置为 0,子任务重试次数根据实际情况配置
● 机器配置:用来标明允许执行任务的机器状态,避开那些摇摇欲坠的机器,0 代表无任何限制
○ 最低 CPU 核心数:填写浮点数,CPU 可用核心数小于该值的 Worker 将不会执行该任务
○ 最低内存(GB):填写浮点数,可用内存小于该值的 Worker 将不会执行该任务
○ 最低磁盘(GB):填写浮点数,可用磁盘空间小于该值的 Worker 将不会执行该任务
● 集群配置
○ 执行机器地址:指定集群中的某几台机器执行任务( debug 的好帮手),多值英文逗号分割,如192.168.1.1:27777,192.168.1.2:27777
○ 最大执行机器数量:限定调动执行的机器数量
● 报警配置:选择任务执行失败后报警通知的对象,需要事先录入

  1. 完成任务创建后,即可在控制台看到刚才创建的任务,如果觉得等待调度太过于漫长,可以直接点击运行按钮,立即运行本任务

分布式任务调度框架Power-Job

  1. 前往任务示例边栏,查看任务的运行状态和在线日志

分布式任务调度框架Power-Job

更多功能示例可见官方文档,工作流、MapReduce、容器等高级特性官方视频