Java使用elasticjob实现定时任务(v2.1.5)
elastic是一个定时任务库
https://shardingsphere.apache.org/elasticjob/index_zh.html
项目结构
依赖
<dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-core</artifactId><version>2.1.5</version></dependency>
实现simplejob
simplejob是使用最多、最简单的定时任务
任务类
定时任务类需要实现相应的定时任务接口(idea快捷键 ctrl+i)
public class MySimpleJob implements SimpleJob
然后在实现的execute里写定时任务的逻辑
public class MySimpleJob implements SimpleJob {@Overridepublic void execute(ShardingContext shardingContext) {System.out.println("分片项: " + shardingContext.getShardingItem() +",总分片项数: " + shardingContext.getShardingTotalCount());}
}
定时任务配置
新建App.java
public class App {
}
添加配置信息(都写在App.java里)
1)zookeeper配置信息(zookeeper作为注册中心,elasticjob将服务注册到zookeeper)
zookeeper搭建可以看我的这一篇文章
在windows搭建zookeeper(单机/集群) - 知乎
/*** 注册中心zookeeper*/public static CoordinatorRegistryCenter zkCenter() {// 参数1: zk的地址(集群就写多个,中间用逗号隔开),参数2: 命名空间var zc =new ZookeeperConfiguration("localhost:2181", "java-simple-job");var crc = new ZookeeperRegistryCenter(zc);// 初始化注册中心crc.init();return crc;}
2)simplejob任务配置
/*** simple-job配置** @return*/public static LiteJobConfiguration configurationSimple() {// 1,job核心配置var jcc = JobCoreConfiguration// 参数1: 任务名称,参数2: cron表达式(0/10 -> 10秒执行一次),参数3: 分片项数量.newBuilder("mySimpleJob", "0/10 * * * * ?", 2).build();// 2,job类型配置// 参数1: 核心配置,参数2: 任务类的全类名var jtc = new SimpleJobConfiguration(jcc, MySimpleJob.class.getCanonicalName());// 3,job根配置 (LiteJobConfiguration)return LiteJobConfiguration.newBuilder(jtc)// 有这个才能重新布置任务,否则修改不会生效.overwrite(true).build();}
3)启动定时任务
public static void main(String[] args) {// 启动定时任务// 参数1: 注册中心;参数2: 配置new JobScheduler(zkCenter(), configurationSimple()).init();}
启动
因为我们设置的分片数量是2,所以可以启动另一个定时任务,elasticjob会自动分配任务
复制运行配置
启动两个任务,可以看到自动分配任务,原本是一个服务执行分片1和0,现在是分别执行单个任务
dataflow任务
dataflow任务适合处理流式作业,和simplejob不同,分为数据抓取和处理,先获取数据然后进行处理
订单类(被处理的类)
public class Order {private Integer orderId;// 0 未处理; 1 已处理private Integer status;@Overridepublic String toString() {return "Order{" +"orderId=" + orderId +", status=" + status +'}';}public Integer getOrderId() {return orderId;}public void setOrderId(Integer orderId) {this.orderId = orderId;}public Integer getStatus() {return status;}public void setStatus(Integer status) {this.status = status;}
}
任务类
实现接口,有两个方法,对应抓取和处理,抓取方法的返回值会交给处理方法
public class MyDataflowJob implements DataflowJob<Order> { // 抓取数据@Overridepublic List<Order> fetchData(ShardingContext shardingContext) {return null;}// 处理数据@Overridepublic void processData(ShardingContext shardingContext, List<Order> data) { }
}
具体逻辑:初始化100个order,然后抓取指定数据(status为0 并且 订单号%分片总数 == 当前分片项)的订单进行处理,返回值交给处理方法,处理方法进行处理(将order的status设置为1)
public class MyDataflowJob implements DataflowJob<Order> {private List<Order> orders = new ArrayList<Order>();{// 实例化该类时执行for (int i = 0; i < 100; i++) {Order order = new Order();order.setOrderId(i + 1);// 未处理order.setStatus(0);orders.add(order);}}// 抓取数据@Overridepublic List<Order> fetchData(ShardingContext shardingContext) {// 将 订单号%分片总数 == 当前分片项 的订单进行处理var orderList = orders.stream()// 过滤状态为0的.filter(o -> o.getStatus() == 0).filter(o -> o.getOrderId() % shardingContext.getShardingTotalCount()== shardingContext.getShardingItem())// 放入集合.collect(toList());List<Order> subList = null;if (orderList != null && orderList.size() > 0) {// (抓)截取listsubList = orderList.subList(0, 10);}try {// 休眠3秒Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}LocalTime time = LocalTime.now();System.out.println(time + "我是分片项: " + shardingContext.getShardingItem() + ",我抓取的数据是: " + subList);return subList;}// 处理数据@Overridepublic void processData(ShardingContext shardingContext, List<Order> data) {// 设置为已处理,下次不会再抓取到data.forEach(o -> o.setStatus(1));try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}LocalTime time = LocalTime.now();System.out.println(time + "我是分片项: " + shardingContext.getShardingItem() + ",正在处理数据!");}
}
App.java
1)dataflow任务配置
/*** dataflow-job配置** @return*/public static LiteJobConfiguration configurationDataflow() {// 1,job核心配置var jcc = JobCoreConfiguration// 参数1: 任务名称,参数2: cron表达式(0/10 -> 10秒执行一次),参数3: 分片项数量.newBuilder("myDataflowJob", "0/10 * * * * ?", 2).build();// 2,job类型配置// 参数1: 核心配置,参数2: 任务类的全类名,参数3: 是否开启定时任务(不开则只执行1次)var jtc =new DataflowJobConfiguration(jcc, MyDataflowJob.class.getCanonicalName(), true);// 3,job根配置 (LiteJobConfiguration)return LiteJobConfiguration.newBuilder(jtc)// 有这个才能重新布置任务,否则修改不会生效.overwrite(true).build();}
2)main方法
public static void main(String[] args) {// 启动定时任务// 参数1: 注册中心;参数2: 配置new JobScheduler(zkCenter(), configurationDataflow()).init();}
启动
script任务
可以运行脚本文件(cmd、python……)
d盘下新建test.txt,修改内容后重命名为.cmd
%1这些是用来接收elastic传递来的参数的
echo running cmd cript: %1,%2,%3,%4,%5
App.java
1)任务配置
/*** script-job配置** @return*/public static LiteJobConfiguration configurationScript() {// 1,job核心配置var jcc = JobCoreConfiguration// 参数1: 任务名称,参数2: cron表达式(0/10 -> 10秒执行一次),参数3: 分片项数量.newBuilder("myScriptJob", "0/10 * * * * ?", 2).build();// 2,job类型配置// 参数1: 核心配置,参数2: 任务脚本所在目录var jtc =new ScriptJobConfiguration(jcc, "d:/test.cmd");// 3,job根配置 (LiteJobConfiguration)return LiteJobConfiguration.newBuilder(jtc)// 有这个才能重新布置任务,否则修改不会生效.overwrite(true).build();}
2)main方法
public static void main(String[] args) {// 启动定时任务// 参数1: 注册中心;参数2: 配置new JobScheduler(zkCenter(), configurationScript()).init();}