> 文章列表 > Java使用elasticjob实现定时任务(v2.1.5)

Java使用elasticjob实现定时任务(v2.1.5)

Java使用elasticjob实现定时任务(v2.1.5)

elastic是一个定时任务库

https://shardingsphere.apache.org/elasticjob/index_zh.html

bff3a57751ea742698917f55e0557d5e.png

项目结构

ba89260324b237010225fa45c99d4290.png

​依赖

        <dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-core</artifactId><version>2.1.5</version></dependency>

实现simplejob

simplejob是使用最多、最简单的定时任务

任务类

定时任务类需要实现相应的定时任务接口(idea快捷键 ctrl+i)

public class MySimpleJob implements SimpleJob

然后在实现的execute里写定时任务的逻辑

public class MySimpleJob implements SimpleJob {@Overridepublic void execute(ShardingContext shardingContext) {System.out.println("分片项: " + shardingContext.getShardingItem() +",总分片项数: " + shardingContext.getShardingTotalCount());}
}

定时任务配置

新建App.java

public class App {
} 

添加配置信息(都写在App.java里)

1)zookeeper配置信息(zookeeper作为注册中心,elasticjob将服务注册到zookeeper)

zookeeper搭建可以看我的这一篇文章

在windows搭建zookeeper(单机/集群) - 知乎
    /*** 注册中心zookeeper*/public static CoordinatorRegistryCenter zkCenter() {// 参数1: zk的地址(集群就写多个,中间用逗号隔开),参数2: 命名空间var zc =new ZookeeperConfiguration("localhost:2181", "java-simple-job");var crc = new ZookeeperRegistryCenter(zc);// 初始化注册中心crc.init();return crc;}

2)simplejob任务配置

    /*** simple-job配置** @return*/public static LiteJobConfiguration configurationSimple() {// 1,job核心配置var jcc = JobCoreConfiguration// 参数1: 任务名称,参数2: cron表达式(0/10 -> 10秒执行一次),参数3: 分片项数量.newBuilder("mySimpleJob", "0/10 * * * * ?", 2).build();// 2,job类型配置// 参数1: 核心配置,参数2: 任务类的全类名var jtc = new SimpleJobConfiguration(jcc, MySimpleJob.class.getCanonicalName());// 3,job根配置 (LiteJobConfiguration)return LiteJobConfiguration.newBuilder(jtc)// 有这个才能重新布置任务,否则修改不会生效.overwrite(true).build();}

3)启动定时任务

    public static void main(String[] args) {// 启动定时任务// 参数1: 注册中心;参数2: 配置new JobScheduler(zkCenter(), configurationSimple()).init();}

启动

6e23bf6b17513046cfb242a2287ed547.png

因为我们设置的分片数量是2,所以可以启动另一个定时任务,elasticjob会自动分配任务

5ae8c0ea77479358af4513c9115a76eb.png

684c25b9f11c32a15e400d0da0eeb7ca.png

复制运行配置

启动两个任务,可以看到自动分配任务,原本是一个服务执行分片1和0,现在是分别执行单个任务

dbaa405674086c2f1e712ed5dcce613b.png

caa9dfdcb430d467751bcf161e5e8115.png

dataflow任务

dataflow任务适合处理流式作业,和simplejob不同,分为数据抓取和处理,先获取数据然后进行处理

订单类(被处理的类)

public class Order {private Integer orderId;// 0 未处理; 1 已处理private Integer status;@Overridepublic String toString() {return "Order{" +"orderId=" + orderId +", status=" + status +'}';}public Integer getOrderId() {return orderId;}public void setOrderId(Integer orderId) {this.orderId = orderId;}public Integer getStatus() {return status;}public void setStatus(Integer status) {this.status = status;}
}

任务类

实现接口,有两个方法,对应抓取和处理,抓取方法的返回值会交给处理方法

public class MyDataflowJob implements DataflowJob<Order> { // 抓取数据@Overridepublic List<Order> fetchData(ShardingContext shardingContext) {return null;}// 处理数据@Overridepublic void processData(ShardingContext shardingContext, List<Order> data) { }
} 

具体逻辑:初始化100个order,然后抓取指定数据(status为0 并且 订单号%分片总数 == 当前分片项)的订单进行处理,返回值交给处理方法,处理方法进行处理(将order的status设置为1)

public class MyDataflowJob implements DataflowJob<Order> {private List<Order> orders = new ArrayList<Order>();{// 实例化该类时执行for (int i = 0; i < 100; i++) {Order order = new Order();order.setOrderId(i + 1);// 未处理order.setStatus(0);orders.add(order);}}// 抓取数据@Overridepublic List<Order> fetchData(ShardingContext shardingContext) {// 将 订单号%分片总数 == 当前分片项 的订单进行处理var orderList = orders.stream()// 过滤状态为0的.filter(o -> o.getStatus() == 0).filter(o -> o.getOrderId() % shardingContext.getShardingTotalCount()== shardingContext.getShardingItem())// 放入集合.collect(toList());List<Order> subList = null;if (orderList != null && orderList.size() > 0) {// (抓)截取listsubList = orderList.subList(0, 10);}try {// 休眠3秒Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}LocalTime time = LocalTime.now();System.out.println(time + "我是分片项: " + shardingContext.getShardingItem() + ",我抓取的数据是: " + subList);return subList;}// 处理数据@Overridepublic void processData(ShardingContext shardingContext, List<Order> data) {// 设置为已处理,下次不会再抓取到data.forEach(o -> o.setStatus(1));try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}LocalTime time = LocalTime.now();System.out.println(time + "我是分片项: " + shardingContext.getShardingItem() + ",正在处理数据!");}
}

App.java

1)dataflow任务配置

   /*** dataflow-job配置** @return*/public static LiteJobConfiguration configurationDataflow() {// 1,job核心配置var jcc = JobCoreConfiguration// 参数1: 任务名称,参数2: cron表达式(0/10 -> 10秒执行一次),参数3: 分片项数量.newBuilder("myDataflowJob", "0/10 * * * * ?", 2).build();// 2,job类型配置// 参数1: 核心配置,参数2: 任务类的全类名,参数3: 是否开启定时任务(不开则只执行1次)var jtc =new DataflowJobConfiguration(jcc, MyDataflowJob.class.getCanonicalName(), true);// 3,job根配置 (LiteJobConfiguration)return LiteJobConfiguration.newBuilder(jtc)// 有这个才能重新布置任务,否则修改不会生效.overwrite(true).build();}

2)main方法

    public static void main(String[] args) {// 启动定时任务// 参数1: 注册中心;参数2: 配置new JobScheduler(zkCenter(), configurationDataflow()).init();}

启动

0363103046d4c3da2737246318e51fde.png

script任务

可以运行脚本文件(cmd、python……)

d盘下新建test.txt,修改内容后重命名为.cmd

%1这些是用来接收elastic传递来的参数的

echo running cmd cript: %1,%2,%3,%4,%5

App.java

1)任务配置

    /*** script-job配置** @return*/public static LiteJobConfiguration configurationScript() {// 1,job核心配置var jcc = JobCoreConfiguration// 参数1: 任务名称,参数2: cron表达式(0/10 -> 10秒执行一次),参数3: 分片项数量.newBuilder("myScriptJob", "0/10 * * * * ?", 2).build();// 2,job类型配置// 参数1: 核心配置,参数2: 任务脚本所在目录var jtc =new ScriptJobConfiguration(jcc, "d:/test.cmd");// 3,job根配置 (LiteJobConfiguration)return LiteJobConfiguration.newBuilder(jtc)// 有这个才能重新布置任务,否则修改不会生效.overwrite(true).build();}

2)main方法

    public static void main(String[] args) {// 启动定时任务// 参数1: 注册中心;参数2: 配置new JobScheduler(zkCenter(), configurationScript()).init();}

0c3768fe9861e8e2cb0bfc86fc9b7c97.png

后续更文:springboot整合(2.1.5和3.0.0-alpha)