> 文章列表 > 分布式-调度

分布式-调度

分布式-调度

分布式Job的实现方式:

  1. 基于任务调度框架(如Quartz、Elastic-Job等)进行任务分发和调度,利用分布式缓存(如Redis、ZooKeeper等)进行节点间协调和状态同步。
  2. 基于消息队列(如Kafka、ActiveMQ等)进行任务分发和执行结果返回,利用分布式锁(如Redisson、Curator等)进行并发控制和任务序列化。
  3. 基于RPC框架(如Dubbo、gRPC等)进行任务分发和执行,利用分布式协调框架(如ZooKeeper、Consul等)进行服务注册和发现。

基于Quartz和Redis实现的分布式任务调度的

  1. 添加依赖
<dependency><groupId>org.quartz-scheduler</groupId><artifactId>quartz</artifactId><version>2.3.0</version>
</dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>3.5.3</version>
</dependency>
  1. 实现Job
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;public class MyJob implements Job {@Overridepublic void execute(JobExecutionContext context) throws JobExecutionException {System.out.println("Hello, Quartz!");}}
  1. 配置Quartz Scheduler
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import redis.clients.jedis.Jedis;public class QuartzScheduler {private static final String REDIS_KEY = "quartz_scheduler_lock";private static final int LOCK_EXPIRE_TIME = 10000; // Redis锁过期时间(毫秒)public static void main(String[] args) throws SchedulerException {// 创建SchedulerScheduler scheduler = StdSchedulerFactory.getDefaultScheduler();// 定义JobDetailJobDetail jobDetail = JobBuilder.newJob(MyJob.class).withIdentity("myJob", "group1").build();// 定义TriggerTrigger trigger = TriggerBuilder.newTrigger().withIdentity("myTrigger", "group1").startNow().withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).repeatForever()).build();// 设置JobDataMap,用于分布式环境下获取锁jobDetail.getJobDataMap().put("jedis", new Jedis("localhost"));jobDetail.getJobDataMap().put("redisKey", REDIS_KEY);jobDetail.getJobDataMap().put("lockExpireTime", LOCK_EXPIRE_TIME);// 注册JobDetail和Trigger到Schedulerscheduler.scheduleJob(jobDetail, trigger);// 启动Schedulerscheduler.start();}}
  1. 实现分布式锁
import redis.clients.jedis.Jedis;public class DistributedLock {public boolean tryLock(Jedis jedis, String lockKey, int expireTime) {long currentTime = System.currentTimeMillis();String result = jedis.set(lockKey, String.valueOf(currentTime + expireTime), "NX", "PX", expireTime);return "OK".equals(result);}public void unlock(Jedis jedis, String lockKey) {jedis.del(lockKey);}}
  1. 在MyJob中使用分布式锁
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import redis.clients.jedis.Jedis;public class MyJob implements Job {@Overridepublic void execute(JobExecutionContext context) throws JobExecutionException {Jedis jedis = (Jedis) context.getJobDetail().getJobDataMap().get("jedis");String redisKey = (String) context.getJobDetail().getJobDataMap().get("redisKey");int lockExpireTime = (int) context.getJobDetail().getJobDataMap().get("lockExpireTime");DistributedLock lock = new DistributedLock();if (lock.tryLock(jedis, redisKey, lockExpireTime)) {try {// 执行任务代码} finally {lock.unlock(jedis, redisKey);}} else {//异常}}

基于Kafka和Redisson的实现:

首先,我们需要引入Kafka和Redisson的依赖,在Maven中添加以下代码:

<!-- Kafka -->
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.8.0</version>
</dependency><!-- Redisson -->
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.16.1</version>
</dependency>

接下来,我们需要定义消息格式,包括任务的ID、执行参数等信息。假设我们要执行一个简单的加法操作,消息格式可以定义为:

public class TaskMessage {private String taskId;private int operand1;private int operand2;// 省略getter和setter方法
}

然后,我们需要编写生产者和消费者的代码。

生产者的代码如下所示:

public class TaskProducer {private static final String TOPIC = "task-topic";private final Producer<String, TaskMessage> producer;public TaskProducer() {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());this.producer = new KafkaProducer<>(props);}public void sendTask(TaskMessage message) throws InterruptedException, ExecutionException {// 将任务消息发送到KafkaProducerRecord<String, TaskMessage> record = new ProducerRecord<>(TOPIC, message.getTaskId(), message);Future<RecordMetadata> future = producer.send(record);// 等待消息发送完成future.get();}
}

在生产者中,我们使用Kafka的KafkaProducer类将任务消息发送到指定的Kafka主题。

消费者的代码如下所示:

public class TaskConsumer {private static final String TOPIC = "task-topic";private static final String GROUP_ID = "task-group";private final Consumer<String, TaskMessage> consumer;private final ExecutorService executor;private final RLock lock;public TaskConsumer() {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName());this.consumer = new KafkaConsumer<>(props);this.executor = Executors.newFixedThreadPool(10);Config config = new Config();config.useSingleServer().setAddress("redis://localhost:6379");RedissonClient redisson = Redisson.create(config);this.lock = redisson.getLock(TOPIC);}public void start() {consumer.subscribe(Collections.singleton(TOPIC));while (true) {ConsumerRecords<String, TaskMessage> records = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, TaskMessage> record : records) {executor.submit(() -> {try {// 获取锁,并执行任务lock.lock();System.out.println("Processing task: " + record.value().getTaskId());int result = record.value().getOperand1() + record.value().getOperand2();System.out.println("Result: " + result);// 返回执行结果sendResult(record.key(), result);} finally {lock.unlock();}});}}}private void sendResult(String taskId, int result) {// 将执行结果发送到KafkaProducer<String, Integer> producer = createResultProducer();ProducerRecord<String, Integer> record = new ProducerRecord<>(taskId, result);Future<RecordMetadata> future = producer.send(record);// 等待消息发送完成try {future.get();} catch (InterruptedException | ExecutionException e) {e.printStackTrace();} finally {producer.close();}}private Producer<String, Integer> createResultProducer() {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");}

Dubbo和ZooKeeper实现

首先,我们需要引入Dubbo和ZooKeeper的依赖,在Maven中添加以下代码:

<!-- Dubbo -->
<dependency><groupId>com.alibaba</groupId><artifactId>dubbo</artifactId><version>2.7.8</version>
</dependency><!-- ZooKeeper -->
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.6.3</version>
</dependency>

接下来,我们需要定义任务分发的接口和方法。假设我们要执行一个简单的加法操作,我们可以定义接口如下所示:

public interface TaskService {int execute(int operand1, int operand2);
}

然后,我们需要编写服务提供者和消费者的代码。

服务提供者的代码如下所示:

public class TaskServiceProvider {public static void main(String[] args) throws Exception {// 初始化Dubbo服务ServiceConfig<TaskService> serviceConfig = new ServiceConfig<>();serviceConfig.setInterface(TaskService.class);serviceConfig.setRef(new TaskServiceImpl());serviceConfig.setRegistry(new RegistryConfig("zookeeper://localhost:2181"));serviceConfig.export();System.in.read();}
}public class TaskServiceImpl implements TaskService {@Overridepublic int execute(int operand1, int operand2) {// 执行任务,并返回结果System.out.println("Executing task...");int result = operand1 + operand2;System.out.println("Result: " + result);return result;}
}

在服务提供者中,我们使用Dubbo的ServiceConfig类将任务执行方法注册到ZooKeeper,并启动Dubbo服务。

消费者的代码如下所示:

public class TaskServiceConsumer {public static void main(String[] args) {ApplicationConfig applicationConfig = new ApplicationConfig();applicationConfig.setName("task-consumer");RegistryConfig registryConfig = new RegistryConfig();registryConfig.setAddress("zookeeper://localhost:2181");ReferenceConfig<TaskService> referenceConfig = new ReferenceConfig<>();referenceConfig.setApplication(applicationConfig);referenceConfig.setRegistry(registryConfig);referenceConfig.setInterface(TaskService.class);TaskService taskService = referenceConfig.get();int result = taskService.execute(2, 3);System.out.println("Result: " + result);}
}

在消费者中,我们使用Dubbo的ReferenceConfig类从ZooKeeper中获取任务执行方法的引用,并调用该方法执行任务。

这样,我们就完成了基于Dubbo和ZooKeeper的任务分发与执行的Java实现Demo。

xxl-job

XXL-Job是一个轻量级分布式任务调度平台,提供可靠的定时任务和实时任务处理等功能。

接入XXL-Job的示例:

  1. 首先,我们需要在Maven中引入xxl-job-core的依赖:

    <dependency><groupId>com.xuxueli</groupId><artifactId>xxl-job-core</artifactId><version>2.3.0</version>
    </dependency>
    
  2. 然后,我们需要编写一个JobHandler类来处理任务。该类需要继承IJobHandler接口,并实现其中的execute方法。以下是一个简单的JobHandler示例:

    public class HelloWorldJobHandler extends IJobHandler {@Overridepublic ReturnT<String> execute(String param) throws Exception {System.out.println("Hello, world!");return ReturnT.SUCCESS;}
    }
    

在上述示例中,我们通过重写execute方法来实现具体的任务逻辑,当任务执行完成后,需要返回ReturnT.SUCCESS表示任务执行成功。

  1. 接着,在代码中注册任务Handler:
XxlJobExecutor.registJobHandler("helloWorldJobHandler", new HelloWorldJobHandler());

在上面的代码中,我们调用了XxlJobExecutor.registJobHandler方法,将名为helloWorldJobHandler的任务Handler注册到XXL-Job中。

  1. 最后,在XXL-Job的管理页面中创建一个任务,并指定刚才注册的任务Handler即可。这里不做过多介绍,具体可以参考XXL-Job的官方文档。

总的来说,Java接入XXL-Job的步骤主要包括:

  • 引入xxl-job-core的Maven依赖;
  • 编写一个JobHandler类,并实现其中的execute方法;
  • 在代码中注册任务Handler;
  • 在XXL-Job的管理页面中创建一个任务,并指定任务Handler。

其中,前三步是需要在Java代码中完成的,最后一步是需要在XXL-Job的管理页面中完成的。

注意点:

  1. 任务的幂等性问题:由于分布式job可能会被多个节点同时执行,因此需要保证任务的幂等性,即任务重复执行不会产生结果的变化。可以通过给任务添加唯一标识符或者使用乐观锁来实现任务的幂等性。
  2. 分片策略的粒度问题:当一个任务需要被分配到多个节点处理时,需要考虑分片的策略和粒度。如果粒度太细,每个节点处理的数据量过小,在网络传输和节点调度上会带来额外的开销;如果粒度太大,每个节点处理的数据量过大,容易导致节点负载不均衡。因此,需要根据具体的业务场景选择合适的分片策略和粒度。
  3. 任务调度的错误处理:在分布式job中,任务可能会出现执行失败、超时等情况。这时候需要有相应的错误处理机制,比如任务重试、失败告警等。
  4. 节点的动态扩容和缩容:在分布式job中,节点的数量可能随时发生变化。需要考虑节点的动态扩容和缩容对任务调度的影响,并采取相应的措施来适应节点数量的变化。
  5. 注册中心的高可用性与性能问题:分布式job通常需要使用注册中心来进行服务注册和发现。需要考虑注册中心的高可用性和性能问题,以确保任务调度的稳定和性能。
    总结: 使用分布式job需要充分考虑各种异常情况和性能问题,并采取相应的措施来保证任务的正确性和稳定性。

最佳实践:

  1. 任务的分片:在分布式job中,任务可能会被分成多个片段进行处理。需要考虑分片策略和粒度,以确保数据均衡和性能优化。可以采用哈希取模、范围等方式来进行分片。
  2. 注册中心的高可用性:注册中心通常是分布式job的关键组件,需要考虑其高可用性问题。可以采用集群部署或主从架构来保证注册中心的可靠性。
  3. 节点的负载均衡:为了避免节点之间出现负载不均衡的情况,可以采用轮询、随机等方式来进行调度。
  4. 任务的幂等性:由于分布式job可能会被多个节点同时执行,需要保证任务的幂等性。可以采用唯一标识符、数据校验等方式来实现幂等性。
  5. 错误处理机制:分布式job中,任务可能会出现错误。需要建立相应的错误处理机制,例如重试机制、失败告警等。
  6. 节点的动态扩缩容:在分布式job中,节点数量可能会随时变化。需要考虑节点的动态扩缩容对任务调度的影响,并做好相应的准备工作。
  7. 可视化管理界面:一个好的可视化管理界面可以提供给管理员直观的操作手段,包括任务的运行状态、执行日志、错误信息等。
  8. 监控与统计:监控和统计可以帮助管理员更好地了解任务的运行状况和系统性能,以及发现问题并及时解决。可以使用开源监控平台(如Prometheus, Grafana等)或商业监控平台。
    总结: 分布式job需要考虑到各种异常情况和性能问题,秉持着高可用、高性能、易管理的原则,才能够满足业务需求,并最终达到预期的效果。