Springboot 通过@Scheduled实现定时任务的思路:
利用springboot的BeanPostProcessor接口特性,当一个类实现了BeanPostProcessor之后,spring 容器在初始化系统的每个bean的时候都会调用这个类实现的BeanPostProcessor的接口方法,并把bean对像和名称作为参数传给这个类对像。
BeanPostProcessor接口方法有两个:
@Nullable
default Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
@Nullable
default Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
当一个bean被初始化之前会调用BeanPostProcessor对像的postProcessBeforeInitialization方法,初始化完成后调用postProcessAfterInitialization方法,因此可以在postProcessAfterInitialization里面遍历每个bean的所有方法,检查它们是否有@Scheduled注解,如果有则新建一个runnable,这个runnable用一个成员属性保存这个bean的对像引用,run执行的时候通过反射调用这个被@scheduled标记的方法就可以了。
当遍历了所有的bean,生成了一系列的runnable任务之后,需要把它们丢到线程池中运行,这时候需要利用到ApplicationListener接口特性。
ApplicationListener接口只有一个接口方法:
void onApplicationEvent(E event);
当spring 容器把所有的bean都实例化好了,并启动完成之后会调用实现这个接口的所有对像的onApplicationEvent方法,这个方法里面可以做一些系统启动需要执行的一些业务;
与BeanPostProcessor不同的是一个对像的这个接口方法只调用一次,而BeanPostProcessor的是每建一个bean就会调用它的对像的方法一次,因此可以在这个方法里面把前面生成的计划任务丢到线程池里面运行。
以上是springboot的实现思路,下面一步一步分析源码:
我们都是知道要使@Scheduled生效就得先用@EnableScheduling来开启计划任务
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Import({SchedulingConfiguration.class})
@Documented
public @interface EnableScheduling {
}
这是@EnableScheduling的定义,这里面通过@Import引入了SchedulingConfiguration这个配置类,使这个配置类生效,我们看看SchedulingConfiguration里面做了什么事情:
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {
@Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
return new ScheduledAnnotationBeanPostProcessor();
}
}
这个配置类中只做了一件事情,就是通过@Bean向spring容器中注入了一个bean:ScheduledAnnotationBeanPostProcessor
进入ScheduledAnnotationBeanPostProcessor类
public class ScheduledAnnotationBeanPostProcessor implements ScheduledTaskHolder,
MergedBeanDefinitionPostProcessor,
DestructionAwareBeanPostProcessor,
Ordered,
EmbeddedValueResolverAware,
BeanNameAware, BeanFactoryAware,
ApplicationContextAware,
SmartInitializingSingleton,
ApplicationListener<ContextRefreshedEvent>,
DisposableBean {
......
}
重点注意ScheduledAnnotationBeanPostProcessor实现了两个接口:DestructionAwareBeanPostProcessor和ApplicationListener接口。
ScheduledAnnotationBeanPostProcessor接口继承了BeanPostProcessor,按一开始所说的思路我们关注它对postProcessAfterInitialization接口方法的实现:
@Override
public Object postProcessAfterInitialization(final Object bean, String beanName) {
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
if (!this.nonAnnotatedClasses.contains(targetClass)) {
Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
method, Scheduled.class, Schedules.class);
return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
});
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(targetClass);
if (logger.isTraceEnabled()) {
logger.trace("No @Scheduled annotations found on bean class: " + bean.getClass());
}
}
else {
// Non-empty set of methods
annotatedMethods.forEach((method, scheduledMethods) ->
scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
if (logger.isDebugEnabled()) {
logger.debug(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
"': " + annotatedMethods);
}
}
}
return bean;
}
可以看到这里面遍历了每个bean的方法,并找出使用了@Scheduled的方法,调用processScheduled进行处理,processScheduled方法内容:
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
try {
Assert.isTrue(method.getParameterCount() == 0,
"Only no-arg methods may be annotated with @Scheduled");
Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass());
Runnable runnable = new ScheduledMethodRunnable(bean, invocableMethod);
boolean processedSchedule = false;
String errorMessage =
"Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
Set<ScheduledTask> tasks = new LinkedHashSet<>(4);
// Determine initial delay
long initialDelay = scheduled.initialDelay();
String initialDelayString = scheduled.initialDelayString();
if (StringUtils.hasText(initialDelayString)) {
Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
if (this.embeddedValueResolver != null) {
initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
}
if (StringUtils.hasLength(initialDelayString)) {
try {
initialDelay = parseDelayAsLong(initialDelayString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
}
}
}
// Check cron expression
String cron = scheduled.cron();
if (StringUtils.hasText(cron)) {
String zone = scheduled.zone();
if (this.embeddedValueResolver != null) {
cron = this.embeddedValueResolver.resolveStringValue(cron);
zone = this.embeddedValueResolver.resolveStringValue(zone);
}
if (StringUtils.hasLength(cron)) {
Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
processedSchedule = true;
TimeZone timeZone;
if (StringUtils.hasText(zone)) {
timeZone = StringUtils.parseTimeZoneString(zone);
}
else {
timeZone = TimeZone.getDefault();
}
tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
}
}
// At this point we don't need to differentiate between initial delay set or not anymore
if (initialDelay < 0) {
initialDelay = 0;
}
// Check fixed delay
long fixedDelay = scheduled.fixedDelay();
if (fixedDelay >= 0) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
String fixedDelayString = scheduled.fixedDelayString();
if (StringUtils.hasText(fixedDelayString)) {
if (this.embeddedValueResolver != null) {
fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
}
if (StringUtils.hasLength(fixedDelayString)) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
try {
fixedDelay = parseDelayAsLong(fixedDelayString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
}
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
}
// Check fixed rate
long fixedRate = scheduled.fixedRate();
if (fixedRate >= 0) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
String fixedRateString = scheduled.fixedRateString();
if (StringUtils.hasText(fixedRateString)) {
if (this.embeddedValueResolver != null) {
fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
}
if (StringUtils.hasLength(fixedRateString)) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
try {
fixedRate = parseDelayAsLong(fixedRateString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");
}
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
}
// Check whether we had any attribute set
Assert.isTrue(processedSchedule, errorMessage);
// Finally register the scheduled tasks
synchronized (this.scheduledTasks) {
Set<ScheduledTask> registeredTasks = this.scheduledTasks.get(bean);
if (registeredTasks == null) {
registeredTasks = new LinkedHashSet<>(4);
this.scheduledTasks.put(bean, registeredTasks);
}
registeredTasks.addAll(tasks);
}
}
catch (IllegalArgumentException ex) {
throw new IllegalStateException(
"Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
}
}
这个方法内容很多,但逻辑并不复杂,一是新建runable,二是解析@Scheduled注解传进来的参数(cron表达式或者延时等)并创建不同类型的任务提交到registrar中,registrar是用来注册要放到计划任务线程池中运行的任务的,它里面包含一个线程池和任务列表,这一步只是放到了列表中。
创建Runable的方法:
protected Runnable createRunnable(Object target, Method method) {
Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled");
Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass());
return new ScheduledMethodRunnable(target, invocableMethod);
}
返回的是new 的一个ScheduledMethodRunnable对像
public class ScheduledMethodRunnable implements Runnable {
private final Object target;
private final Method method;
......
public ScheduledMethodRunnable(Object target, Method method) {
this.target = target;
this.method = method;
}
public void run() {
try {
ReflectionUtils.makeAccessible(this.method);
this.method.invoke(this.target);
} catch (InvocationTargetException var2) {
ReflectionUtils.rethrowRuntimeException(var2.getTargetException());
} catch (IllegalAccessException var3) {
throw new UndeclaredThrowableException(var3);
}
}
}
这个类继承了Runnable,并代理bean,run方法里面通过反射执行被@Scheduled标记的方法。
现在@Scheduled已经被解析,并转成了Runable任务对像放到了ScheduledTaskRegistrar对像中了,现在就差把它们放到线程池中启动了
这时候我们再看看ScheduledAnnotationBeanPostProcessor对ApplicationListener的接口实现:
public void onApplicationEvent(ContextRefreshedEvent event) {
if (event.getApplicationContext() == this.applicationContext) {
this.finishRegistration();
}
}
这里面调用了finishRegistration完成注册功能:
private void finishRegistration() {
if (this.scheduler != null) {
this.registrar.setScheduler(this.scheduler);
}
if (this.beanFactory instanceof ListableBeanFactory) {
Map<String, SchedulingConfigurer> beans =
((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
AnnotationAwareOrderComparator.sort(configurers);
for (SchedulingConfigurer configurer : configurers) {
configurer.configureTasks(this.registrar);
}
}
if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");
try {
// Search for TaskScheduler bean...
this.registrar.setTaskScheduler(resolveSchedulerBean(beanFactory, TaskScheduler.class, false));
}
catch (NoUniqueBeanDefinitionException ex) {
logger.debug("Could not find unique TaskScheduler bean", ex);
try {
this.registrar.setTaskScheduler(resolveSchedulerBean(beanFactory, TaskScheduler.class, true));
}
catch (NoSuchBeanDefinitionException ex2) {
if (logger.isInfoEnabled()) {
logger.info("More than one TaskScheduler bean exists within the context, and " +
"none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
"(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
"ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
ex.getBeanNamesFound());
}
}
}
catch (NoSuchBeanDefinitionException ex) {
logger.debug("Could not find default TaskScheduler bean", ex);
// Search for ScheduledExecutorService bean next...
try {
this.registrar.setScheduler(resolveSchedulerBean(beanFactory, ScheduledExecutorService.class, false));
}
catch (NoUniqueBeanDefinitionException ex2) {
logger.debug("Could not find unique ScheduledExecutorService bean", ex2);
try {
this.registrar.setScheduler(resolveSchedulerBean(beanFactory, ScheduledExecutorService.class, true));
}
catch (NoSuchBeanDefinitionException ex3) {
if (logger.isInfoEnabled()) {
logger.info("More than one ScheduledExecutorService bean exists within the context, and " +
"none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
"(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
"ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
ex2.getBeanNamesFound());
}
}
}
catch (NoSuchBeanDefinitionException ex2) {
logger.debug("Could not find default ScheduledExecutorService bean", ex2);
// Giving up -> falling back to default scheduler within the registrar...
logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing");
}
}
}
this.registrar.afterPropertiesSet();
}
这个方法分三步:
1.找到SchedulingConfigurer接口的所有这现类,调用它的
void configureTasks(ScheduledTaskRegistrar registrar);
方法,通常我们可以自定义一个类实现SchedulingConfigurer,并对registrar里面的属性赋值,最常用的做法是自定线程池注入到registrar中,因为registrar默认是单线程的线程池,也即是说@Scheduled方法是串行的
2.对ScheduledTaskRegistrar对像中的TaskScheduler属性赋值,TaskScheduler这个属性即是线程池对像,如果在第一步我们已经自己指定了一个线程池,这部分代码会跳过。
赋值的时候会到bean对像池中找默认的线程池bean,找不到只打印日志,不作处理。因为后需ScheduledTaskRegistrar中在提交任务时自己会判读。
3.调用this.registrar.afterPropertiesSet();
这一步很重要,它的功能就是把所有任务提交到线程池中。
代码:
@Override
public void afterPropertiesSet() {
scheduleTasks();
}
/**
* Schedule all registered tasks against the underlying
* {@linkplain #setTaskScheduler(TaskScheduler) task scheduler}.
*/
@SuppressWarnings("deprecation")
protected void scheduleTasks() {
if (this.taskScheduler == null) {
this.localExecutor = Executors.newSingleThreadScheduledExecutor();
this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
}
if (this.triggerTasks != null) {
for (TriggerTask task : this.triggerTasks) {
addScheduledTask(scheduleTriggerTask(task));
}
}
if (this.cronTasks != null) {
for (CronTask task : this.cronTasks) {
addScheduledTask(scheduleCronTask(task));
}
}
if (this.fixedRateTasks != null) {
for (IntervalTask task : this.fixedRateTasks) {
addScheduledTask(scheduleFixedRateTask(task));
}
}
if (this.fixedDelayTasks != null) {
for (IntervalTask task : this.fixedDelayTasks) {
addScheduledTask(scheduleFixedDelayTask(task));
}
}
}
调用了scheduleTasks方法,这个方法首先判断线程池 是否为空,如果是空,就新建一个单线程的线程池,再遍历任务列表,把所有任务丢到线程池中。
到这一步,@Scheduled作用生效,方法将会定时执行了。
今天的文章Springboot @Scheduled注解实现原理分享到此就结束了,感谢您的阅读,如果确实帮到您,您可以动动手指转发给其他人。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/32160.html