调度系统: Quartz
最近在做数据中台架构设计,整体架构设计完后发现数据中台最重要的就是元数据和调度系统。元数据设计参考了atlas、metcat、datahus,模型设计、数据架构、技术架构基本完成。现在设计调度系统,才发现调度系统不像别的系统,主要是理清业务属性,进行模型抽象。调度系统包含更多的是非业务属性,为了设计出更优秀的调度系统,接下来需要对主流的调度系统做一些技术调研。主流的调度系统主要有:Quartz、elasticjob、xxl_job、oozie、airflow、ds。我们今天就从Quartz开始。
Quartz简介:
Quartz 是一个功能丰富的开源作业调度库,几乎可以集成到任何 Java 应用程序中——从最小的独立应用程序到最大的电子商务系统。Quartz 可用于创建简单或复杂的调度来执行数十个、数百个甚至数万个作业;其任务被定义为标准 Java 组件的作业,这些组件几乎可以执行您编写的任何程序。Quartz Scheduler 包括许多企业级特性,例如对 JTA 事务和集群的支持。
Quartz特征:
运行环境:
-
Quartz 可以嵌入另一个独立的应用程序中运行
-
Quartz 可以在应用服务器(或 servlet 容器)中实例化,并参与 XA 事务
-
Quartz 可以作为独立程序运行(在它自己的 Java 虚拟机中),通过 RMI 使用
-
Quartz 可以实例化为一组独立程序(具有负载平衡和故障转移功能)以执行作业
job调度:
按照各种时间的调度
job执行:
-
Jobs 可以是实现简单 Job 接口的任何 Java 类
-
job类实例可以由 Quartz 或应用程序的框架实例化。
-
当 Trigger 发生时,调度程序通知零个或多个实现 JobListener 和 TriggerListener 接口的 Java 对象。这些侦听器会在job执行后收到通知。
-
当job完成时,它们会返回一个 JobCompletionCode,通知调度程序成功或失败。JobCompletionCode 还可以根据成功/失败代码指示调度程序它应该采取的任何操作 - 例如立即重新执行作业。
job持久化:
-
Quartz 的设计包括一个 JobStore 接口,可以实现它来提供各种机制来存储作业。
-
通过使用包含的 JDBCJobStore,所有配置为“非易失性”的job和触发器都通过 JDBC 存储在关系数据库中。
-
通过使用包含的 RAMJobStore,所有job和触发器都存储在 RAM 中
事务:
-
Quartz 可以通过使用 JobStoreCMT(JDBCJobStore 的子类)参与 JTA 事务。
-
Quartz 可以围绕 Job 的执行管理 JTA 事务(开始并提交它们),以便 Job 执行的工作自动发生在 JTA 事务中。
集群扩展:
-
故障转移。
-
负载均衡。
-
Quartz 的内置集群功能依赖于通过 JDBCJobStore(如上所述)的数据库持久性。
-
Terracotta 对 Quartz 的扩展提供了集群功能,而不需要支持数据库。
监听器和插件:
-
应用程序可以通过实现一个或多个侦听器接口来捕获调度事件以监视或控制作业/触发器行为。
-
插件机制可用于向 Quartz 添加功能,例如保留作业执行的历史记录,或从文件加载作业和触发器定义。
-
Quartz 附带了一些“工厂内置”的插件和监听器。
Quartz入门:
1、实现一个job接口
static class SomeJob implements Job {@Overridepublic void execute(JobExecutionContext context) throws JobExecutionException {//no-opSystem.out.println("schedThread -test");}}
2、启动一个调度:
public void testJobStorage() throws Exception {ToolkitInternal mock = mockToolkitFor("mocked-not-clustered");Properties props = new Properties();props.load(getClass().getResourceAsStream("/org/quartz/quartz.properties"));props.setProperty(StdSchedulerFactory.PROP_JOB_STORE_CLASS, TerracottaJobStore.class.getName());props.setProperty(AbstractTerracottaJobStore.TC_CONFIGURL_PROP, "mocked-not-clustered");//从工厂获取一个Scheduler实例:StdSchedulerSchedulerFactory schedFact = new StdSchedulerFactory(props);Scheduler scheduler = schedFact.getScheduler();try {//开始scheduler.start();//构建一个jobdetailJobDetail jobDetail = JobBuilder.newJob(SomeJob.class).withIdentity("testjob", "testjobgroup").storeDurably().build();//添加一个jobdetailscheduler.addJob(jobDetail, false);//构建一个triggerTrigger trigger = TriggerBuilder.newTrigger().forJob(jobDetail).withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10)).build();//告诉scheduler 按照trigger调度jobscheduler.scheduleJob(trigger);} finally {scheduler.shutdown();}
Quartz核心概念:
1、Job:
用于表示被调度任务的抽象,真正调度的是job的实例:jobdetail。jobdetail由Quartz框架构建。jobdetail 的实例是jobinstance
2、trigger:
触发器,定义调度时间的元素,表示按照什么时间规则去执行。一个jobdetail可以关联多个trigger。但是一个trigger只能关联一个jobdetail
3、Scheduler:
负责根据出发起定义的时间规则调度任务,工厂模式创建Scheduler
4、jobstore:
负责存储job、trigger相关信息。
5、JobExecutionContext
job运行的上下文信息
Quartz主流程:
1、构造scheduler
根据配置构造SchedulerFactory,然后实例化scheduler。
public static Scheduler getDefaultScheduler() throws SchedulerException {StdSchedulerFactory fact = new StdSchedulerFactory();return fact.getScheduler();}public Scheduler getScheduler() throws SchedulerException {if (cfg == null) {initialize(); //初始化工厂}//schedulr 全局容器SchedulerRepository schedRep = SchedulerRepository.getInstance();Scheduler sched = schedRep.lookup(getSchedulerName());if (sched != null) {if (sched.isShutdown()) {schedRep.remove(getSchedulerName());} else {return sched;}}//实例化schedulersched = instantiate();return sched;}
实例化scheduler
private Scheduler instantiate() throws SchedulerException {if (cfg == null) {initialize();}if (initException != null) {throw initException;}//job存储器JobStore js = null; //线程池,自己实现的spi线程池,主要为QuartzScheduler使用ThreadPool tp = null; //Quartz的核心,scheduler的间接实现,包含注册job和listener实例的方法 QuartzScheduler qs = null;//管理ConnectionProviders(ConnectionProvider、PoolingConnectionProvider、 //JNDIConnectionProvider)的集合,并提供对其连接的透明访问DBConnectionManager dbMgr = null;String instanceIdGeneratorClass = null;Properties tProps = null;String userTXLocation = null;boolean wrapJobInTx = false;boolean autoId = false;long idleWaitTime = -1;long dbFailureRetry = 15000L; // 15 secsString classLoadHelperClass;String jobFactoryClass;//spi线程执行器,允许不同策略调度线程ThreadExecutor threadExecutor;//scheduler注册容器SchedulerRepository schedRep = SchedulerRepository.getInstance();// Get Scheduler Properties// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~String schedName = cfg.getStringProperty(PROP_SCHED_INSTANCE_NAME,"QuartzScheduler");String threadName = cfg.getStringProperty(PROP_SCHED_THREAD_NAME,schedName + "_QuartzSchedulerThread");String schedInstId = cfg.getStringProperty(PROP_SCHED_INSTANCE_ID,DEFAULT_INSTANCE_ID);if (schedInstId.equals(AUTO_GENERATE_INSTANCE_ID)) {autoId = true;instanceIdGeneratorClass = cfg.getStringProperty(PROP_SCHED_INSTANCE_ID_GENERATOR_CLASS,"org.quartz.simpl.SimpleInstanceIdGenerator");}else if (schedInstId.equals(SYSTEM_PROPERTY_AS_INSTANCE_ID)) {autoId = true;instanceIdGeneratorClass ="org.quartz.simpl.SystemPropertyInstanceIdGenerator";}userTXLocation = cfg.getStringProperty(PROP_SCHED_USER_TX_URL,userTXLocation);if (userTXLocation != null && userTXLocation.trim().length() == 0) {userTXLocation = null;}classLoadHelperClass = cfg.getStringProperty(PROP_SCHED_CLASS_LOAD_HELPER_CLASS,"org.quartz.simpl.CascadingClassLoadHelper");wrapJobInTx = cfg.getBooleanProperty(PROP_SCHED_WRAP_JOB_IN_USER_TX,wrapJobInTx);jobFactoryClass = cfg.getStringProperty(PROP_SCHED_JOB_FACTORY_CLASS, null);idleWaitTime = cfg.getLongProperty(PROP_SCHED_IDLE_WAIT_TIME,idleWaitTime);if(idleWaitTime > -1 && idleWaitTime < 1000) {throw new SchedulerException("org.quartz.scheduler.idleWaitTime of less than 1000ms is not legal.");}dbFailureRetry = cfg.getLongProperty(PROP_SCHED_DB_FAILURE_RETRY_INTERVAL, dbFailureRetry);if (dbFailureRetry < 0) {throw new SchedulerException(PROP_SCHED_DB_FAILURE_RETRY_INTERVAL + " of less than 0 ms is not legal.");}boolean makeSchedulerThreadDaemon =cfg.getBooleanProperty(PROP_SCHED_MAKE_SCHEDULER_THREAD_DAEMON);boolean threadsInheritInitalizersClassLoader =cfg.getBooleanProperty(PROP_SCHED_SCHEDULER_THREADS_INHERIT_CONTEXT_CLASS_LOADER_OF_INITIALIZING_THREAD);long batchTimeWindow = cfg.getLongProperty(PROP_SCHED_BATCH_TIME_WINDOW, 0L);int maxBatchSize = cfg.getIntProperty(PROP_SCHED_MAX_BATCH_SIZE, 1);boolean interruptJobsOnShutdown = cfg.getBooleanProperty(PROP_SCHED_INTERRUPT_JOBS_ON_SHUTDOWN, false);boolean interruptJobsOnShutdownWithWait = cfg.getBooleanProperty(PROP_SCHED_INTERRUPT_JOBS_ON_SHUTDOWN_WITH_WAIT, false);boolean jmxExport = cfg.getBooleanProperty(PROP_SCHED_JMX_EXPORT);String jmxObjectName = cfg.getStringProperty(PROP_SCHED_JMX_OBJECT_NAME);boolean jmxProxy = cfg.getBooleanProperty(PROP_SCHED_JMX_PROXY);String jmxProxyClass = cfg.getStringProperty(PROP_SCHED_JMX_PROXY_CLASS);boolean rmiExport = cfg.getBooleanProperty(PROP_SCHED_RMI_EXPORT, false);boolean rmiProxy = cfg.getBooleanProperty(PROP_SCHED_RMI_PROXY, false);String rmiHost = cfg.getStringProperty(PROP_SCHED_RMI_HOST, "localhost");int rmiPort = cfg.getIntProperty(PROP_SCHED_RMI_PORT, 1099);int rmiServerPort = cfg.getIntProperty(PROP_SCHED_RMI_SERVER_PORT, -1);String rmiCreateRegistry = cfg.getStringProperty(PROP_SCHED_RMI_CREATE_REGISTRY,QuartzSchedulerResources.CREATE_REGISTRY_NEVER);String rmiBindName = cfg.getStringProperty(PROP_SCHED_RMI_BIND_NAME);if (jmxProxy && rmiProxy) {throw new SchedulerConfigException("Cannot proxy both RMI and JMX.");}boolean managementRESTServiceEnabled = cfg.getBooleanProperty(MANAGEMENT_REST_SERVICE_ENABLED, false);String managementRESTServiceHostAndPort = cfg.getStringProperty(MANAGEMENT_REST_SERVICE_HOST_PORT, "0.0.0.0:9889");Properties schedCtxtProps = cfg.getPropertyGroup(PROP_SCHED_CONTEXT_PREFIX, true);// If Proxying to remote scheduler, short-circuit here...// ~~~~~~~~~~~~~~~~~~if (rmiProxy) {if (autoId) {schedInstId = DEFAULT_INSTANCE_ID;}String uid = (rmiBindName == null) ? QuartzSchedulerResources.getUniqueIdentifier(schedName, schedInstId) : rmiBindName;RemoteScheduler remoteScheduler = new RemoteScheduler(uid, rmiHost, rmiPort);schedRep.bind(remoteScheduler);return remoteScheduler;}// Create class load helperClassLoadHelper loadHelper = null;try {loadHelper = (ClassLoadHelper) loadClass(classLoadHelperClass).newInstance();} catch (Exception e) {throw new SchedulerConfigException("Unable to instantiate class load helper class: "+ e.getMessage(), e);}loadHelper.initialize();// If Proxying to remote JMX scheduler, short-circuit here...// ~~~~~~~~~~~~~~~~~~if (jmxProxy) {if (autoId) {schedInstId = DEFAULT_INSTANCE_ID;}if (jmxProxyClass == null) {throw new SchedulerConfigException("No JMX Proxy Scheduler class provided");}RemoteMBeanScheduler jmxScheduler = null;try {jmxScheduler = (RemoteMBeanScheduler)loadHelper.loadClass(jmxProxyClass).newInstance();} catch (Exception e) {throw new SchedulerConfigException("Unable to instantiate RemoteMBeanScheduler class.", e);}if (jmxObjectName == null) {jmxObjectName = QuartzSchedulerResources.generateJMXObjectName(schedName, schedInstId);}jmxScheduler.setSchedulerObjectName(jmxObjectName);tProps = cfg.getPropertyGroup(PROP_SCHED_JMX_PROXY, true);try {setBeanProps(jmxScheduler, tProps);} catch (Exception e) {initException = new SchedulerException("RemoteMBeanScheduler class '"+ jmxProxyClass + "' props could not be configured.", e);throw initException;}jmxScheduler.initialize();schedRep.bind(jmxScheduler);return jmxScheduler;}JobFactory jobFactory = null;if(jobFactoryClass != null) {try {jobFactory = (JobFactory) loadHelper.loadClass(jobFactoryClass).newInstance();} catch (Exception e) {throw new SchedulerConfigException("Unable to instantiate JobFactory class: "+ e.getMessage(), e);}tProps = cfg.getPropertyGroup(PROP_SCHED_JOB_FACTORY_PREFIX, true);try {setBeanProps(jobFactory, tProps);} catch (Exception e) {initException = new SchedulerException("JobFactory class '"+ jobFactoryClass + "' props could not be configured.", e);throw initException;}}InstanceIdGenerator instanceIdGenerator = null;if(instanceIdGeneratorClass != null) {try {instanceIdGenerator = (InstanceIdGenerator) loadHelper.loadClass(instanceIdGeneratorClass).newInstance();} catch (Exception e) {throw new SchedulerConfigException("Unable to instantiate InstanceIdGenerator class: "+ e.getMessage(), e);}tProps = cfg.getPropertyGroup(PROP_SCHED_INSTANCE_ID_GENERATOR_PREFIX, true);try {setBeanProps(instanceIdGenerator, tProps);} catch (Exception e) {initException = new SchedulerException("InstanceIdGenerator class '"+ instanceIdGeneratorClass + "' props could not be configured.", e);throw initException;}}// Get ThreadPool Properties// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~String tpClass = cfg.getStringProperty(PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName());if (tpClass == null) {initException = new SchedulerException("ThreadPool class not specified. ");throw initException;}try {tp = (ThreadPool) loadHelper.loadClass(tpClass).newInstance();} catch (Exception e) {initException = new SchedulerException("ThreadPool class '"+ tpClass + "' could not be instantiated.", e);throw initException;}tProps = cfg.getPropertyGroup(PROP_THREAD_POOL_PREFIX, true);try {setBeanProps(tp, tProps);} catch (Exception e) {initException = new SchedulerException("ThreadPool class '"+ tpClass + "' props could not be configured.", e);throw initException;}// Get JobStore Properties// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~String jsClass = cfg.getStringProperty(PROP_JOB_STORE_CLASS,RAMJobStore.class.getName());if (jsClass == null) {initException = new SchedulerException("JobStore class not specified. ");throw initException;}try {js = (JobStore) loadHelper.loadClass(jsClass).newInstance();} catch (Exception e) {initException = new SchedulerException("JobStore class '" + jsClass+ "' could not be instantiated.", e);throw initException;}SchedulerDetailsSetter.setDetails(js, schedName, schedInstId);tProps = cfg.getPropertyGroup(PROP_JOB_STORE_PREFIX, true, new String[] {PROP_JOB_STORE_LOCK_HANDLER_PREFIX});try {setBeanProps(js, tProps);} catch (Exception e) {initException = new SchedulerException("JobStore class '" + jsClass+ "' props could not be configured.", e);throw initException;}if (js instanceof JobStoreSupport) {// Install custom lock handler (Semaphore)String lockHandlerClass = cfg.getStringProperty(PROP_JOB_STORE_LOCK_HANDLER_CLASS);if (lockHandlerClass != null) {try {Semaphore lockHandler = (Semaphore)loadHelper.loadClass(lockHandlerClass).newInstance();tProps = cfg.getPropertyGroup(PROP_JOB_STORE_LOCK_HANDLER_PREFIX, true);// If this lock handler requires the table prefix, add it to its properties.if (lockHandler instanceof TablePrefixAware) {tProps.setProperty(PROP_TABLE_PREFIX, ((JobStoreSupport)js).getTablePrefix());tProps.setProperty(PROP_SCHED_NAME, schedName);}try {setBeanProps(lockHandler, tProps);} catch (Exception e) {initException = new SchedulerException("JobStore LockHandler class '" + lockHandlerClass+ "' props could not be configured.", e);throw initException;}((JobStoreSupport)js).setLockHandler(lockHandler);getLog().info("Using custom data access locking (synchronization): " + lockHandlerClass);} catch (Exception e) {initException = new SchedulerException("JobStore LockHandler class '" + lockHandlerClass+ "' could not be instantiated.", e);throw initException;}}}// Set up any DataSources// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~String[] dsNames = cfg.getPropertyGroups(PROP_DATASOURCE_PREFIX);for (int i = 0; i < dsNames.length; i++) {PropertiesParser pp = new PropertiesParser(cfg.getPropertyGroup(PROP_DATASOURCE_PREFIX + "." + dsNames[i], true));String cpClass = pp.getStringProperty(PROP_CONNECTION_PROVIDER_CLASS, null);// custom connectionProvider...if(cpClass != null) {ConnectionProvider cp = null;try {cp = (ConnectionProvider) loadHelper.loadClass(cpClass).newInstance();} catch (Exception e) {initException = new SchedulerException("ConnectionProvider class '" + cpClass+ "' could not be instantiated.", e);throw initException;}try {// remove the class name, so it isn't attempted to be setpp.getUnderlyingProperties().remove(PROP_CONNECTION_PROVIDER_CLASS);if (cp instanceof PoolingConnectionProvider) {populateProviderWithExtraProps((PoolingConnectionProvider)cp, pp.getUnderlyingProperties());} else {setBeanProps(cp, pp.getUnderlyingProperties());}cp.initialize();} catch (Exception e) {initException = new SchedulerException("ConnectionProvider class '" + cpClass+ "' props could not be configured.", e);throw initException;}dbMgr = DBConnectionManager.getInstance();dbMgr.addConnectionProvider(dsNames[i], cp);} else {String dsJndi = pp.getStringProperty(PROP_DATASOURCE_JNDI_URL, null);if (dsJndi != null) {boolean dsAlwaysLookup = pp.getBooleanProperty(PROP_DATASOURCE_JNDI_ALWAYS_LOOKUP);String dsJndiInitial = pp.getStringProperty(PROP_DATASOURCE_JNDI_INITIAL);String dsJndiProvider = pp.getStringProperty(PROP_DATASOURCE_JNDI_PROVDER);String dsJndiPrincipal = pp.getStringProperty(PROP_DATASOURCE_JNDI_PRINCIPAL);String dsJndiCredentials = pp.getStringProperty(PROP_DATASOURCE_JNDI_CREDENTIALS);Properties props = null;if (null != dsJndiInitial || null != dsJndiProvider|| null != dsJndiPrincipal || null != dsJndiCredentials) {props = new Properties();if (dsJndiInitial != null) {props.put(PROP_DATASOURCE_JNDI_INITIAL,dsJndiInitial);}if (dsJndiProvider != null) {props.put(PROP_DATASOURCE_JNDI_PROVDER,dsJndiProvider);}if (dsJndiPrincipal != null) {props.put(PROP_DATASOURCE_JNDI_PRINCIPAL,dsJndiPrincipal);}if (dsJndiCredentials != null) {props.put(PROP_DATASOURCE_JNDI_CREDENTIALS,dsJndiCredentials);}}JNDIConnectionProvider cp = new JNDIConnectionProvider(dsJndi,props, dsAlwaysLookup);dbMgr = DBConnectionManager.getInstance();dbMgr.addConnectionProvider(dsNames[i], cp);} else {String poolingProvider = pp.getStringProperty(PoolingConnectionProvider.POOLING_PROVIDER);String dsDriver = pp.getStringProperty(PoolingConnectionProvider.DB_DRIVER);String dsURL = pp.getStringProperty(PoolingConnectionProvider.DB_URL);if (dsDriver == null) {initException = new SchedulerException("Driver not specified for DataSource: "+ dsNames[i]);throw initException;}if (dsURL == null) {initException = new SchedulerException("DB URL not specified for DataSource: "+ dsNames[i]);throw initException;}// we load even these "core" providers by class name in order to avoid a static dependency on// the c3p0 and hikaricp librariesif(poolingProvider != null && poolingProvider.equals(PoolingConnectionProvider.POOLING_PROVIDER_HIKARICP)) {cpClass = "org.quartz.utils.HikariCpPoolingConnectionProvider";}else {cpClass = "org.quartz.utils.C3p0PoolingConnectionProvider";}log.info("Using ConnectionProvider class '" + cpClass + "' for data source '" + dsNames[i] + "'");try {ConnectionProvider cp = null;try {Constructor constructor = loadHelper.loadClass(cpClass).getConstructor(Properties.class);cp = (ConnectionProvider) constructor.newInstance(pp.getUnderlyingProperties());} catch (Exception e) {initException = new SchedulerException("ConnectionProvider class '" + cpClass+ "' could not be instantiated.", e);throw initException;}dbMgr = DBConnectionManager.getInstance();dbMgr.addConnectionProvider(dsNames[i], cp);// Populate the underlying C3P0/HikariCP data source pool propertiespopulateProviderWithExtraProps((PoolingConnectionProvider)cp, pp.getUnderlyingProperties());} catch (Exception sqle) {initException = new SchedulerException("Could not initialize DataSource: " + dsNames[i],sqle);throw initException;}}}}// Set up any SchedulerPlugins// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~String[] pluginNames = cfg.getPropertyGroups(PROP_PLUGIN_PREFIX);SchedulerPlugin[] plugins = new SchedulerPlugin[pluginNames.length];for (int i = 0; i < pluginNames.length; i++) {Properties pp = cfg.getPropertyGroup(PROP_PLUGIN_PREFIX + "."+ pluginNames[i], true);String plugInClass = pp.getProperty(PROP_PLUGIN_CLASS, null);if (plugInClass == null) {initException = new SchedulerException("SchedulerPlugin class not specified for plugin '"+ pluginNames[i] + "'");throw initException;}SchedulerPlugin plugin = null;try {plugin = (SchedulerPlugin)loadHelper.loadClass(plugInClass).newInstance();} catch (Exception e) {initException = new SchedulerException("SchedulerPlugin class '" + plugInClass+ "' could not be instantiated.", e);throw initException;}try {setBeanProps(plugin, pp);} catch (Exception e) {initException = new SchedulerException("JobStore SchedulerPlugin '" + plugInClass+ "' props could not be configured.", e);throw initException;}plugins[i] = plugin;}// Set up any JobListeners// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~Class<?>[] strArg = new Class[] { String.class };String[] jobListenerNames = cfg.getPropertyGroups(PROP_JOB_LISTENER_PREFIX);JobListener[] jobListeners = new JobListener[jobListenerNames.length];for (int i = 0; i < jobListenerNames.length; i++) {Properties lp = cfg.getPropertyGroup(PROP_JOB_LISTENER_PREFIX + "."+ jobListenerNames[i], true);String listenerClass = lp.getProperty(PROP_LISTENER_CLASS, null);if (listenerClass == null) {initException = new SchedulerException("JobListener class not specified for listener '"+ jobListenerNames[i] + "'");throw initException;}JobListener listener = null;try {listener = (JobListener)loadHelper.loadClass(listenerClass).newInstance();} catch (Exception e) {initException = new SchedulerException("JobListener class '" + listenerClass+ "' could not be instantiated.", e);throw initException;}try {Method nameSetter = null;try {nameSetter = listener.getClass().getMethod("setName", strArg);}catch(NoSuchMethodException ignore) {/* do nothing */}if(nameSetter != null) {nameSetter.invoke(listener, new Object[] {jobListenerNames[i] } );}setBeanProps(listener, lp);} catch (Exception e) {initException = new SchedulerException("JobListener '" + listenerClass+ "' props could not be configured.", e);throw initException;}jobListeners[i] = listener;}// Set up any TriggerListeners// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~String[] triggerListenerNames = cfg.getPropertyGroups(PROP_TRIGGER_LISTENER_PREFIX);TriggerListener[] triggerListeners = new TriggerListener[triggerListenerNames.length];for (int i = 0; i < triggerListenerNames.length; i++) {Properties lp = cfg.getPropertyGroup(PROP_TRIGGER_LISTENER_PREFIX + "."+ triggerListenerNames[i], true);String listenerClass = lp.getProperty(PROP_LISTENER_CLASS, null);if (listenerClass == null) {initException = new SchedulerException("TriggerListener class not specified for listener '"+ triggerListenerNames[i] + "'");throw initException;}TriggerListener listener = null;try {listener = (TriggerListener)loadHelper.loadClass(listenerClass).newInstance();} catch (Exception e) {initException = new SchedulerException("TriggerListener class '" + listenerClass+ "' could not be instantiated.", e);throw initException;}try {Method nameSetter = null;try {nameSetter = listener.getClass().getMethod("setName", strArg);}catch(NoSuchMethodException ignore) { /* do nothing */ }if(nameSetter != null) {nameSetter.invoke(listener, new Object[] {triggerListenerNames[i] } );}setBeanProps(listener, lp);} catch (Exception e) {initException = new SchedulerException("TriggerListener '" + listenerClass+ "' props could not be configured.", e);throw initException;}triggerListeners[i] = listener;}boolean tpInited = false;boolean qsInited = false;// Get ThreadExecutor Properties// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~String threadExecutorClass = cfg.getStringProperty(PROP_THREAD_EXECUTOR_CLASS);if (threadExecutorClass != null) {tProps = cfg.getPropertyGroup(PROP_THREAD_EXECUTOR, true);try {threadExecutor = (ThreadExecutor) loadHelper.loadClass(threadExecutorClass).newInstance();log.info("Using custom implementation for ThreadExecutor: " + threadExecutorClass);setBeanProps(threadExecutor, tProps);} catch (Exception e) {initException = new SchedulerException("ThreadExecutor class '" + threadExecutorClass + "' could not be instantiated.", e);throw initException;}} else {log.info("Using default implementation for ThreadExecutor");threadExecutor = new DefaultThreadExecutor();}// Fire everything up// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~try {JobRunShellFactory jrsf = null; // Create correct run-shell factory...if (userTXLocation != null) {UserTransactionHelper.setUserTxLocation(userTXLocation);}if (wrapJobInTx) {jrsf = new JTAJobRunShellFactory();} else {jrsf = new JTAAnnotationAwareJobRunShellFactory();}if (autoId) {try {schedInstId = DEFAULT_INSTANCE_ID;if (js.isClustered()) {schedInstId = instanceIdGenerator.generateInstanceId();}} catch (Exception e) {getLog().error("Couldn't generate instance Id!", e);throw new IllegalStateException("Cannot run without an instance id.");}}if (js.getClass().getName().startsWith("org.terracotta.quartz")) {try {String uuid = (String) js.getClass().getMethod("getUUID").invoke(js);if(schedInstId.equals(DEFAULT_INSTANCE_ID)) {schedInstId = "TERRACOTTA_CLUSTERED,node=" + uuid;if (jmxObjectName == null) {jmxObjectName = QuartzSchedulerResources.generateJMXObjectName(schedName, schedInstId);}} else if(jmxObjectName == null) {jmxObjectName = QuartzSchedulerResources.generateJMXObjectName(schedName, schedInstId + ",node=" + uuid);}} catch(Exception e) {throw new RuntimeException("Problem obtaining node id from TerracottaJobStore.", e);}if(null == cfg.getStringProperty(PROP_SCHED_JMX_EXPORT)) {jmxExport = true;}}if (js instanceof JobStoreSupport) {JobStoreSupport jjs = (JobStoreSupport)js;jjs.setDbRetryInterval(dbFailureRetry);if(threadsInheritInitalizersClassLoader)jjs.setThreadsInheritInitializersClassLoadContext(threadsInheritInitalizersClassLoader);jjs.setThreadExecutor(threadExecutor);}QuartzSchedulerResources rsrcs = new QuartzSchedulerResources();rsrcs.setName(schedName);rsrcs.setThreadName(threadName);rsrcs.setInstanceId(schedInstId);rsrcs.setJobRunShellFactory(jrsf);rsrcs.setMakeSchedulerThreadDaemon(makeSchedulerThreadDaemon);rsrcs.setThreadsInheritInitializersClassLoadContext(threadsInheritInitalizersClassLoader);rsrcs.setBatchTimeWindow(batchTimeWindow);rsrcs.setMaxBatchSize(maxBatchSize);rsrcs.setInterruptJobsOnShutdown(interruptJobsOnShutdown);rsrcs.setInterruptJobsOnShutdownWithWait(interruptJobsOnShutdownWithWait);rsrcs.setJMXExport(jmxExport);rsrcs.setJMXObjectName(jmxObjectName);if (managementRESTServiceEnabled) {ManagementRESTServiceConfiguration managementRESTServiceConfiguration = new ManagementRESTServiceConfiguration();managementRESTServiceConfiguration.setBind(managementRESTServiceHostAndPort);managementRESTServiceConfiguration.setEnabled(managementRESTServiceEnabled);rsrcs.setManagementRESTServiceConfiguration(managementRESTServiceConfiguration);}if (rmiExport) {rsrcs.setRMIRegistryHost(rmiHost);rsrcs.setRMIRegistryPort(rmiPort);rsrcs.setRMIServerPort(rmiServerPort);rsrcs.setRMICreateRegistryStrategy(rmiCreateRegistry);rsrcs.setRMIBindName(rmiBindName);}SchedulerDetailsSetter.setDetails(tp, schedName, schedInstId);rsrcs.setThreadExecutor(threadExecutor);threadExecutor.initialize();rsrcs.setThreadPool(tp);if(tp instanceof SimpleThreadPool) {if(threadsInheritInitalizersClassLoader)((SimpleThreadPool)tp).setThreadsInheritContextClassLoaderOfInitializingThread(threadsInheritInitalizersClassLoader);}tp.initialize();tpInited = true;rsrcs.setJobStore(js);// add pluginsfor (int i = 0; i < plugins.length; i++) {rsrcs.addSchedulerPlugin(plugins[i]);}qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);qsInited = true;// Create Scheduler ref...Scheduler scheduler = instantiate(rsrcs, qs);// set job factory if specifiedif(jobFactory != null) {qs.setJobFactory(jobFactory);}// Initialize plugins now that we have a Scheduler instance.for (int i = 0; i < plugins.length; i++) {plugins[i].initialize(pluginNames[i], scheduler, loadHelper);}// add listenersfor (int i = 0; i < jobListeners.length; i++) {qs.getListenerManager().addJobListener(jobListeners[i], EverythingMatcher.allJobs());}for (int i = 0; i < triggerListeners.length; i++) {qs.getListenerManager().addTriggerListener(triggerListeners[i], EverythingMatcher.allTriggers());}// set scheduler context data...for(Object key: schedCtxtProps.keySet()) {String val = schedCtxtProps.getProperty((String) key);scheduler.getContext().put((String)key, val);}// fire up job store, and runshell factoryjs.setInstanceId(schedInstId);js.setInstanceName(schedName);js.setThreadPoolSize(tp.getPoolSize());js.initialize(loadHelper, qs.getSchedulerSignaler());jrsf.initialize(scheduler);qs.initialize();getLog().info("Quartz scheduler '" + scheduler.getSchedulerName()+ "' initialized from " + propSrc);getLog().info("Quartz scheduler version: " + qs.getVersion());// prevents the repository from being garbage collectedqs.addNoGCObject(schedRep);// prevents the db manager from being garbage collectedif (dbMgr != null) {qs.addNoGCObject(dbMgr);}schedRep.bind(scheduler);return scheduler;}catch(SchedulerException e) {shutdownFromInstantiateException(tp, qs, tpInited, qsInited);throw e;}catch(RuntimeException re) {shutdownFromInstantiateException(tp, qs, tpInited, qsInited);throw re;}catch(Error re) {shutdownFromInstantiateException(tp, qs, tpInited, qsInited);throw re;}}
2、构造jobdetail
JobDetail job = newJob(SendMailJob.class).withIdentity("job1", "group1").build();JobDataMap jobData = job.getJobDataMap();jobData.put(SendMailJob.PROP_SMTP_HOST, "localhost");jobData.put(SendMailJob.PROP_SENDER, "sender@host.com");jobData.put(SendMailJob.PROP_RECIPIENT, "receiver@host.com");jobData.put(SendMailJob.PROP_SUBJECT, "test subject");jobData.put(SendMailJob.PROP_MESSAGE, "do not reply");jobData.put("mail.smtp.port", "2500");
3、构造trigger
SimpleTrigger trigger = newTrigger().withIdentity("trigger1", "group1").startNow().withSchedule(simpleSchedule().withIntervalInSeconds(120).withRepeatCount(1)).build();
4、启动调度
scheduler.scheduleJob(job, trigger);scheduler.start();
Quartz源码分析:
Quartz 的单机器部署方案
执行任务调度的核心类是 QuartzSchedulerThread ,调度线程从jobStore中获取需要执行的触发器列表,并修改触发器状态,Fire触发器,修改触发器信息(下次执行触发器的时间,以及触发器状态),并存储起来。最后创建具体的执行任务对象,封装成runshellrunnable, 通过worker线程池执行任务。
Quartz 的集群部署方案
集群方案有两种:数据库(mysql、oracle)和Terracotta模式。
数据库集群模式:
这种方案是分布式的,没有负责集中管理的节点,而是利用数据库行级锁的方式来实现集群环境下的并发控制。
scheduler实例在集群模式下首先获取QRTZ_LOCKS表中的行锁,Mysql 获取行锁的语句:
sched_name为应用集群的实例名,lock_name就是行级锁名。Quartz主要有两个行级锁触发器访问锁 (TRIGGER_ACCESS) 和 状态访问锁(STATE_ACCESS)
select * from QRTZ_Locks where sched_name = ? and lock_name = ? for update
这个架构解决了任务的分布式调度问题,同一个任务只能有一个节点运行,其他节点将不执行任务,当碰到大量短任务时,各个节点频繁的竞争数据库锁,节点越多性能就会越差。
Quartz的集群模式可以水平扩展,也可以分布式调度,但需要业务方在数据库中添加对应的表,有一定的强侵入性。
可以按照分布式锁优化
Terracotta模式:
Terracotta是一款由美国Terracotta公司开发的著名开源Java集群平台。它在JVM与Java应用之间实现了一个专门处理集群功能的抽象层,以其特有的增量检测、智能定向传送、分布式协作、服务器镜像、分片等技术,允许用户在不改变现有系统代码的情况下实现单机Java应用向集群话应用的无缝迁移。使得用户可以专注于商业逻辑的开发,由Terracotta负责实现高性能、高可用性、高稳定性的企业级Java集群。
Terracotta
Quartz优缺点:
有点 | 缺点 |
集群模式 |
mysql数据库锁,性能低,可以扩展为分布式锁 |
无中心(相对) | 调度简单,只能按照时间 |
简单易用 | 无工作流 |
分布式任务不支持 | |
没有治理 | |
无报警 |
综上可以看到,quartz只是个简单的调度框架,而不是一个调度产品