Java中定时任务实现方式及源码剖析
概述在企业级应用开发场景中,定时任务占据着至关重要的地位。比如以下这些场景:
[*]用户4个小时以内没有进行任何操作,就自动清除用户会话。
[*]每天晚上凌晨自动拉取另一个业务系统的某部分数据。
[*]每隔15分钟,自动执行一段逻辑,更新某部分数据。
类似的场景会频繁出现在我们的日常开发中。在Java开发体系中,也有很多实现方案来满足这些场景。但是每个实现方案都有各自的优点和缺点。本文将重点剖析不同实现方案的技术原理以及它们之间的使用场景和差异。
在开始之前,需要先想下定时任务场景的核心技术点是什么?我是这样理解的:
到达未来某一个时间点,执行对应的逻辑。在到达这个时间点之前,需要一直等待。
核心的技术点在于,程序如何知道达到了指定的时间点,然后触发执行对应的逻辑。只要能实现这点,就可以实现定时任务了。本文列举以下六种方案:
[*]循环判定时间
[*]Sleep
[*]Timer
[*]ScheduledExecutorService
[*]Spring Scheduling
本文这五种方案的讲解顺序是有考究的,从简单到复杂,从底层到上层。循环判定时间、Sleep旨在摆脱所有组件或者框架带来的复杂度干扰,从最本质上理解定时任务的实现思路。Timer是JDK早先对定时任务的实现,相对来说是比较简单的。ScheduledExecutorService是对Timer的优化、而Spring Scheduling则是基于ScheduledExecutorService实现的。
循环判定时间
我们在线程中,直接使用死循环来不停的判定时间,看是否到了预期的时间点,如果到了就执行逻辑,否则就继续循环。这个方法应该基本不会使用到,但是最能说明定时任务的核心本质。举一个生活化场景的例子:我们请家人帮忙,30分钟后叫自己起床,如果家人不定闹钟的话,他就得不停的去看时间,到了30分钟叫我们起床。
实现
public class LoopScheduler {
public static void main(String[] args) {
long nowTime = System.currentTimeMillis();
long nextTime = nowTime + 15000;
while (true) {
if (System.currentTimeMillis() >= nextTime) {
nowTime = System.currentTimeMillis();
nextTime = nowTime + 15000;
System.out.println(nowTime + ":触发一次");
service();
}
}
}
public static void service() {
System.out.println("自定义逻辑执行");
}
}以上代码就可以实现每隔15s执行service方法一次。以下是执行情况:
可以看到确实非常严格的15s执行一次。实现了定时任务的效果。
分析
这种实现方式之所以基本不会实际使用,是因为这个while循环的空转会占用非常多宝贵的cpu资源。但是可以借此看到定时任务的实现框架。
Sleep
借助于Thread.sleep(),我们可以实现让线程等待指定时间后再执行。Thread.sleep()方法是一个JNI方法,其底层是与操作系统内核进行交互。调用该方法,可以将线程进入睡眠状态,在这种状态中,该线程不会获取到cpu资源。直到指定的睡眠时间结束,操作系统会根据调度策略将线程唤醒。
实现
public class SleepScheduler {
public static void main(String[] args) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
Thread.sleep(15000);
System.out.println(System.currentTimeMillis() + ":触发一次");
service();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void service() {
System.out.println("自定义逻辑执行");
}
});
thread.start();
}
}以上代码,先定义了一个线程,然后启动。线程的执行逻辑是,不断循环,每次循环里面先sleep 15s。然后再执行指定的逻辑。基本上可以实现跟上面一样的效果,每15s执行一次service逻辑。
我们观察下执行的情况就会发现,每次执行的间隔并不是严格的15s。一般都会比15s要多一点。这是因为sleep的机制导致的。sleep结束之后,线程并不会立马获得执行,线程只是会被重新放入调度队列参与下一次调度。
分析
使用Thread.sleep()跟我们自己用循环去判断时间相比,最大的优势在于我们节省了CPU资源。利用操作系统的线程调度能力去实现对时间的控制和判断。
Timer
Timer是JDK自带的调度工具类,针对定时任务的场景场景已经做了抽象和封装。Timer的核心入口是Timer类的schedule方法。用于提交一个任务,并且可以指定延迟时间和重复间隔。
/**
* @param task task to be scheduled.
* @param delaydelay in milliseconds before task is to be executed.
* @param period time in milliseconds between successive task executions.
*/
public void schedule(TimerTask task, long delay, long period) {
if (delay < 0)
throw new IllegalArgumentException("Negative delay.");
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, System.currentTimeMillis()+delay, -period);
}processScheduled方法比较长,但是逻辑其实很简单:
[*]首先基于获取到的bean、method,利用反射技术,封装为一个Runnable实例。
[*]根据@Schedule的不同参数配置,识别出不同类型的任务:cron表达式任务、fixedDelay、fixedRate任务。然后通过registry的不同api,提交这些任务。
public class TimerScheduler {
public static void main(String[] args) {
// 创建SimpleDateFormat对象,定义格式化样式
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
// 创建Timer对象
Timer timer = new Timer();
// 单次执行任务示例
TimerTask singleTask = new TimerTask() {
@Override
public void run() {
// 将当前时间的毫秒数转换为格式化后的时间字符串
String formattedTime = sdf.format(System.currentTimeMillis());
System.out.println(formattedTime + "单次执行任务:定时任务执行了");
}
};
// 延迟3000毫秒(3秒)后执行单次任务
timer.scheduleAtFixedRate(singleTask, 3000, 15000);
String formattedTime = sdf.format(System.currentTimeMillis());
System.out.println(formattedTime + "单次执行任务:定时任务已启动");
}
}ScheduledTaskRegistrar
前面提到了多次的registrar,就是ScheduledTaskRegistrar的实例。
ScheduledTaskRegistrar有这些成员属性:四种类型任务的集合、我们熟悉的ScheduledExecutorService。还有没出现过的TaskScheduler。
public abstract class TimerTask implements Runnable {
/**
* This object is used to control access to the TimerTask internals.
*/
final Object lock = new Object();/**
* The state of this task, chosen from the constants below.
*/
int state = VIRGIN;
/**This task has not yet been scheduled.
*/
static final int VIRGIN = 0;/**This task is scheduled for execution.If it is a non-repeating task,it has not yet been executed.
*/
static final int SCHEDULED = 1;/**This non-repeating task has already executed (or is currentlyexecuting) and has not been cancelled.
*/
static final int EXECUTED = 2;/**This task has been cancelled (with a call to TimerTask.cancel).
*/
static final int CANCELLED = 3;/**Next execution time for this task in the format returned bySystem.currentTimeMillis, assuming this task is scheduled for execution.For repeating tasks, this field is updated prior to each task execution.
*/
long nextExecutionTime;/**Period in milliseconds for repeating tasks.A positive value indicatesfixed-rate execution.A negative value indicates fixed-delay execution.A value of 0 indicates a non-repeating task.
*/
long period = 0;
}可以看到,afterPropertiesSet的主体就是调用scheduleTasks。而scheduleTasks方法的核心逻辑是:
[*]如果taskScheduler为空,则初始化localExecutor和taskScheduler。
[*]对四种不同类型的任务,循环加入调度。多的一种TriggerTask,是自定义任务。
我们先来看下比较熟悉的scheduleFixedRateTask和scheduleFixedDelayTask。这两个逻辑基本一致,所以我们以scheduleFixedRateTask为例。
public class Timer {
/**
* The timer task queue.This data structure is shared with the timer
* thread.The timer produces tasks, via its various schedule calls,
* and the timer thread consumes, executing timer tasks as appropriate,
* and removing them from the queue when they’re obsolete.
*/
private final TaskQueue queue = new TaskQueue();/**
* The timer thread.
*/
private final TimerThread thread = new TimerThread(queue);
public Timer(String name) {
thread.setName(name);
thread.start();
}
}
[*]对任务进行封装,然后调用同名方法scheduleFixedRateTask
[*]然后根据任务的不同配置,是否有InitialDelay,调用taskScheduler的不同方法去提交任务。
ThreadPoolTaskScheduler
ThreadPoolTaskScheduler是TaskScheduler的一个实现类。是Spring Scheduling对JDK ScheduledExecutorService的又一层封装。
上面我们看到,在ScheduledTaskRegistrar中提交固定频率的任务后,最终会调用this.taskScheduler.scheduleAtFixedRate方法。而在taskScheduler.scheduleAtFixedRate中,又最终会调用ScheduledExecutorService的scheduleAtFixedRate方法。
class TaskQueue {
/**
* Priority queue represented as a balanced binary heap: the two children
* of queue are queue and queue.The priority queue is
* ordered on the nextExecutionTime field: The TimerTask with the lowest
* nextExecutionTime is in queue (assuming the queue is nonempty).For
* each node n in the heap, and each descendant of n, d,
* n.nextExecutionTime <= d.nextExecutionTime.
*/
private TimerTask[] queue = new TimerTask;/**
* The number of tasks in the priority queue.(The tasks are stored in
* queue up to queue).
*/
private int size = 0;
}Cron表达式任务是如何实现的
我们知道Spring Scheduling是对ScheduledExecutorService的进一步封装。ScheduledExecutorService只支持固定延迟、固定频率、单次任务这三种任务,而Spring Scheduling还支持cron表达式任务,这个是怎么实现的呢?
我们要先回到ScheduledTaskRegistrar的scheduleCronTask方法。
class TimerThread extends Thread {
/**
* Our Timer's queue.We store this reference in preference to
* a reference to the Timer so the reference graph remains acyclic.
* Otherwise, the Timer would never be garbage-collected and this
* thread would never go away.
*/
private TaskQueue queue;
TimerThread(TaskQueue queue) {
this.queue = queue;
}
public void run() {
try {
mainLoop();
} finally {
// Someone killed this Thread, behave as if Timer cancelled
synchronized(queue) {
newTasksMayBeScheduled = false;
queue.clear();// Eliminate obsolete references
}
}
}
private void mainLoop() {
while (true) {
try {
TimerTask task;
boolean taskFired;
synchronized(queue) {
// Wait for queue to become non-empty
while (queue.isEmpty() && newTasksMayBeScheduled)
queue.wait();
if (queue.isEmpty())
break; // Queue is empty and will forever remain; die
// Queue nonempty; look at first event and do the right thing
long currentTime, executionTime;
task = queue.getMin();
synchronized(task.lock) {
if (task.state == TimerTask.CANCELLED) {
queue.removeMin();
continue;// No action required, poll queue again
}
currentTime = System.currentTimeMillis();
executionTime = task.nextExecutionTime;
if (taskFired = (executionTime<=currentTime)) {
if (task.period == 0) { // Non-repeating, remove
queue.removeMin();
task.state = TimerTask.EXECUTED;
} else { // Repeating task, reschedule
queue.rescheduleMin(
task.period<0 ? currentTime - task.period
: executionTime + task.period);
}
}
}
if (!taskFired) // Task hasn't yet fired; wait
queue.wait(executionTime - currentTime);
}
if (taskFired)// Task fired; run it, holding no locks
task.run();
} catch(InterruptedException e) {
}
}
}
}ReschedulingRunnable的构造方法入参包含ScheduledExecutorService的实例。其核心逻辑在于schedule方法和run方法。
调用的是taskScheduler的schedule方法。入参是Runnable实例和1个Trigger。
Trigger是触发器,一般是与任务关联的,用于计算任务的下次执行时间。而CronTask的触发器就是CronTrigger。
public void schedule(TimerTask task, long delay, long period) {
if (delay < 0)
throw new IllegalArgumentException("Negative delay.");
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, System.currentTimeMillis()+delay, -period);
}
public void scheduleAtFixedRate(TimerTask task, long delay, long period) {
if (delay < 0)
throw new IllegalArgumentException("Negative delay.");
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, System.currentTimeMillis()+delay, period);
}仍然是通过ScheduledExecutorService来实现的,但是在提交之前,有些额外的处理:
[*]Runnable任务又被封装了一层,类型是ReschedulingRunnable。Rescheduling是“重新调度“的意思。
[*]调用新生成的ReschedulingRunnable实例的schedule方法。
public class ScheduldExecutorServiceScheduler {
public static void main(String[] args) {
// 创建SimpleDateFormat对象,定义格式化样式
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
// 创建ScheduledExecutorService对象
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
Runnable runnable = new Runnable() {
@Override
public void run() {
// 将当前时间的毫秒数转换为格式化后的时间字符串
String formattedTime = sdf.format(System.currentTimeMillis());
System.out.println(formattedTime + "单次执行任务:定时任务执行了");
}
};
// 延迟3000毫秒(3秒)后执行单次任务,每15s开始执行一次任务
executor.scheduleAtFixedRate(runnable, 3000, 15000, java.util.concurrent.TimeUnit.MILLISECONDS);
// 延迟3000毫秒(3秒)后执行单次任务,每次任务结束后15s执行下一次任务
//executor.scheduleWithFixedDelay(runnable, 3000, 15000, java.util.concurrent.TimeUnit.MILLISECONDS);
// 延迟3000毫秒(3秒)后执行单次任务
//executor.schedule(runnable, 3000, java.util.concurrent.TimeUnit.MILLISECONDS);
String formattedTime = sdf.format(System.currentTimeMillis());
System.out.println(formattedTime + "单次执行任务:定时任务已启动");
}
}ReschedulingRunnable的构造方法入参包含ScheduledExecutorService的实例。其核心逻辑在于schedule方法和run方法。
schedule方法通过trigger计算出任务的执行时间,然后通过ScheduledExecutorService提交任务,一定要注意的是这里提交的是一次性任务。schedule方法并没有period参数。
那cron表达式式任务是如何实现周期执行的,重点在于run方法。ReschedulingRunnable继承了父类DelegatingErrorHandlingRunnable。我们的业务逻辑Runnable实例在ReschedulingRunnable的构造方法中,被传入了父类的属性中。所以在ReschedulingRunnable的run方法中,super.run是我们的业务逻辑。在业务逻辑后面,ReschedulingRunnable包装了额外的逻辑,就是再次调用schedule方法,计算下次任务的时间并且重新提交。
总结
本文从使用和源码实现层面介绍了Java中多种定时任务的实现方案。为了不离题太远和控制篇幅,更能体现本文的主要脉络。其实省略了一些技术细节。比如:
[*]Timer、ScheduledExecutorService、Spring Scheduling中涉及的线程安全问题的讨论,锁的应用等
[*]阻塞队列
[*]异步编程相关的知识,如Future。
大家可以自行补充,后面我可能也会补充这些技术点的文章。
参考
不用任何框架,Java 就能实现定时任务的 3 种方法!
[重要提示]
所有博客内容,在我的个人博客网站可见,欢迎访问: TwoFish
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页:
[1]