一、功能說(shuō)明
SpringBoot的定時(shí)任務(wù)的加強(qiáng)工具,實(shí)現(xiàn)對(duì)SpringBoot原生的定時(shí)任務(wù)進(jìn)行動(dòng)態(tài)管理,完全兼容原生@Scheduled注解,無(wú)需對(duì)原本的定時(shí)任務(wù)進(jìn)行修改
二、快速使用
具體的功能已經(jīng)封裝成SpringBoot-starter即插即用
com.github.guoyixing spring-boot-starter-super-scheduled 0.3.1
三、實(shí)現(xiàn)原理
1、動(dòng)態(tài)管理實(shí)現(xiàn)
(1) 配置管理介紹
@Component("superScheduledConfig") publicclassSuperScheduledConfig{ /** *執(zhí)行定時(shí)任務(wù)的線(xiàn)程池 */ privateThreadPoolTaskSchedulertaskScheduler; /** *定時(shí)任務(wù)名稱(chēng)與定時(shí)任務(wù)回調(diào)鉤子的關(guān)聯(lián)關(guān)系容器 */ privateMapnameToScheduledFuture=newConcurrentHashMap<>(); /** *定時(shí)任務(wù)名稱(chēng)與定時(shí)任務(wù)需要執(zhí)行的邏輯的關(guān)聯(lián)關(guān)系容器 */ privateMap nameToRunnable=newConcurrentHashMap<>(); /** *定時(shí)任務(wù)名稱(chēng)與定時(shí)任務(wù)的源信息的關(guān)聯(lián)關(guān)系容器 */ privateMap nameToScheduledSource=newConcurrentHashMap<>(); /*普通的get/sets省略*/ }
(2) 使用后處理器攔截SpringBoot原本的定時(shí)任務(wù)
實(shí)現(xiàn)ApplicationContextAware接口拿到SpringBoot的上下文
實(shí)現(xiàn)BeanPostProcessor接口,將這個(gè)類(lèi)標(biāo)記為后處理器,后處理器會(huì)在每個(gè)bean實(shí)例化之后執(zhí)行
使用@DependsOn注解強(qiáng)制依賴(lài)SuperScheduledConfig類(lèi),讓SpringBoot實(shí)例化SuperScheduledPostProcessor類(lèi)之前先實(shí)例化SuperScheduledConfig類(lèi)
主要實(shí)現(xiàn)邏輯在postProcessAfterInitialization()方法中
@DependsOn({"superScheduledConfig"}) @Component @Order publicclassSuperScheduledPostProcessorimplementsBeanPostProcessor,ApplicationContextAware{ protectedfinalLoglogger=LogFactory.getLog(getClass()); privateApplicationContextapplicationContext; /** *實(shí)例化bean之前的操作 *@parambeanbean實(shí)例 *@parambeanNamebean的Name */ @Override publicObjectpostProcessBeforeInitialization(Objectbean,StringbeanName)throwsBeansException{ returnbean; } /** *實(shí)例化bean之后的操作 *@parambeanbean實(shí)例 *@parambeanNamebean的Name */ @Override publicObjectpostProcessAfterInitialization(Objectbean, StringbeanName)throwsBeansException{ //1.獲取配置管理器 SuperScheduledConfigsuperScheduledConfig=applicationContext.getBean(SuperScheduledConfig.class); //2.獲取當(dāng)前實(shí)例化完成的bean的所有方法 Method[]methods=bean.getClass().getDeclaredMethods(); //循環(huán)處理對(duì)每個(gè)方法逐一處理 if(methods.length>0){ for(Methodmethod:methods){ //3.嘗試在該方法上獲取@Scheduled注解(SpringBoot的定時(shí)任務(wù)注解) Scheduledannotation=method.getAnnotation(Scheduled.class); //如果無(wú)法獲取到@Scheduled注解,就跳過(guò)這個(gè)方法 if(annotation==null){ continue; } //4.創(chuàng)建定時(shí)任務(wù)的源屬性 //創(chuàng)建定時(shí)任務(wù)的源屬性(用來(lái)記錄定時(shí)任務(wù)的配置,初始化的時(shí)候記錄的是注解上原本的屬性) ScheduledSourcescheduledSource=newScheduledSource(annotation,method,bean); //對(duì)注解上獲取到源屬性中的屬性進(jìn)行檢測(cè) if(!scheduledSource.check()){ thrownewSuperScheduledException("在"+beanName+"Bean中"+method.getName()+"方法的注解參數(shù)錯(cuò)誤"); } //生成定時(shí)任務(wù)的名稱(chēng)(id),使用beanName+“.”+方法名 Stringname=beanName+"."+method.getName(); //將以key-value的形式,將源數(shù)據(jù)存入配置管理器中,key:定時(shí)任務(wù)的名稱(chēng)value:源數(shù)據(jù) superScheduledConfig.addScheduledSource(name,scheduledSource); try{ //5.將原本SpringBoot的定時(shí)任務(wù)取消掉 clearOriginalScheduled(annotation); }catch(Exceptione){ thrownewSuperScheduledException("在關(guān)閉原始方法"+beanName+method.getName()+"時(shí)出現(xiàn)錯(cuò)誤"); } } } //最后bean保持原有返回 returnbean; } /** *修改注解原先的屬性 *@paramannotation注解實(shí)例對(duì)象 *@throwsException */ privatevoidclearOriginalScheduled(Scheduledannotation)throwsException{ changeAnnotationValue(annotation,"cron",Scheduled.CRON_DISABLED); changeAnnotationValue(annotation,"fixedDelay",-1L); changeAnnotationValue(annotation,"fixedDelayString",""); changeAnnotationValue(annotation,"fixedRate",-1L); changeAnnotationValue(annotation,"fixedRateString",""); changeAnnotationValue(annotation,"initialDelay",-1L); changeAnnotationValue(annotation,"initialDelayString",""); } /** *獲取SpringBoot的上下文 *@paramapplicationContextSpringBoot的上下文 */ @Override publicvoidsetApplicationContext(ApplicationContextapplicationContext)throwsBeansException{ this.applicationContext=applicationContext; } }
(3) 使用ApplicationRunner初始化自定義的定時(shí)任務(wù)運(yùn)行器
實(shí)現(xiàn)ApplicationContextAware接口拿到SpringBoot的上下文
使用@DependsOn注解強(qiáng)制依賴(lài)threadPoolTaskScheduler類(lèi)
實(shí)現(xiàn)ApplicationRunner接口,在所有bean初始化結(jié)束之后,運(yùn)行自定義邏輯
主要實(shí)現(xiàn)邏輯在run()方法中
@DependsOn("threadPoolTaskScheduler") @Component publicclassSuperScheduledApplicationRunnerimplementsApplicationRunner,ApplicationContextAware{ protectedfinalLoglogger=LogFactory.getLog(getClass()); privateDateTimeFormatterdf=DateTimeFormatter.ofPattern("yyyy-MM-ddHHss"); privateApplicationContextapplicationContext; /** *定時(shí)任務(wù)配置管理器 */ @Autowired privateSuperScheduledConfigsuperScheduledConfig; /** *定時(shí)任務(wù)執(zhí)行線(xiàn)程 */ @Autowired privateThreadPoolTaskSchedulerthreadPoolTaskScheduler; @Override publicvoidrun(ApplicationArgumentsargs){ //1.定時(shí)任務(wù)配置管理器中緩存定時(shí)任務(wù)執(zhí)行線(xiàn)程 superScheduledConfig.setTaskScheduler(threadPoolTaskScheduler); //2.獲取所有定時(shí)任務(wù)源數(shù)據(jù) MapnameToScheduledSource=superScheduledConfig.getNameToScheduledSource(); //逐一處理定時(shí)任務(wù) for(Stringname:nameToScheduledSource.keySet()){ //3.獲取定時(shí)任務(wù)源數(shù)據(jù) ScheduledSourcescheduledSource=nameToScheduledSource.get(name); //4.獲取所有增強(qiáng)類(lèi) String[]baseStrengthenBeanNames=applicationContext.getBeanNamesForType(BaseStrengthen.class); //5.創(chuàng)建執(zhí)行控制器 SuperScheduledRunnablerunnable=newSuperScheduledRunnable(); //配置執(zhí)行控制器 runnable.setMethod(scheduledSource.getMethod()); runnable.setBean(scheduledSource.getBean()); //6.逐一處理增強(qiáng)類(lèi)(增強(qiáng)器實(shí)現(xiàn)原理后面具體分析) List points=newArrayList<>(baseStrengthenBeanNames.length); for(StringbaseStrengthenBeanName:baseStrengthenBeanNames){ //7.將增強(qiáng)器代理成point ObjectbaseStrengthenBean=applicationContext.getBean(baseStrengthenBeanName); //創(chuàng)建代理 Pointproxy=ProxyUtils.getInstance(Point.class,newRunnableBaseInterceptor(baseStrengthenBean,runnable)); proxy.setSuperScheduledName(name); //8.所有的points連成起來(lái) points.add(proxy); } //將point形成調(diào)用鏈 runnable.setChain(newChain(points)); //將執(zhí)行邏輯封裝并緩存到定時(shí)任務(wù)配置管理器中 superScheduledConfig.addRunnable(name,runnable::invoke); try{ //8.啟動(dòng)定時(shí)任務(wù) ScheduledFuture>schedule=ScheduledFutureFactory.create(threadPoolTaskScheduler ,scheduledSource,runnable::invoke); //將線(xiàn)程回調(diào)鉤子存到任務(wù)配置管理器中 superScheduledConfig.addScheduledFuture(name,schedule); logger.info(df.format(LocalDateTime.now())+"任務(wù)"+name+"已經(jīng)啟動(dòng)..."); }catch(Exceptione){ thrownewSuperScheduledException("任務(wù)"+name+"啟動(dòng)失敗,錯(cuò)誤信息:"+e.getLocalizedMessage()); } } } @Override publicvoidsetApplicationContext(ApplicationContextapplicationContext)throwsBeansException{ this.applicationContext=applicationContext; } }
(4) 進(jìn)行動(dòng)態(tài)管理
@Component publicclassSuperScheduledManager{ protectedfinalLoglogger=LogFactory.getLog(getClass()); privateDateTimeFormatterdf=DateTimeFormatter.ofPattern("yyyy-MM-ddHHss"); @Autowired privateSuperScheduledConfigsuperScheduledConfig; /** *修改Scheduled的執(zhí)行周期 * *@paramnamescheduled的名稱(chēng) *@paramcroncron表達(dá)式 */ publicvoidsetScheduledCron(Stringname,Stringcron){ //終止原先的任務(wù) cancelScheduled(name); //創(chuàng)建新的任務(wù) ScheduledSourcescheduledSource=superScheduledConfig.getScheduledSource(name); scheduledSource.clear(); scheduledSource.setCron(cron); addScheduled(name,scheduledSource); } /** *修改Scheduled的fixedDelay * *@paramnamescheduled的名稱(chēng) *@paramfixedDelay上一次執(zhí)行完畢時(shí)間點(diǎn)之后多長(zhǎng)時(shí)間再執(zhí)行 */ publicvoidsetScheduledFixedDelay(Stringname,LongfixedDelay){ //終止原先的任務(wù) cancelScheduled(name); //創(chuàng)建新的任務(wù) ScheduledSourcescheduledSource=superScheduledConfig.getScheduledSource(name); scheduledSource.clear(); scheduledSource.setFixedDelay(fixedDelay); addScheduled(name,scheduledSource); } /** *修改Scheduled的fixedRate * *@paramnamescheduled的名稱(chēng) *@paramfixedRate上一次開(kāi)始執(zhí)行之后多長(zhǎng)時(shí)間再執(zhí)行 */ publicvoidsetScheduledFixedRate(Stringname,LongfixedRate){ //終止原先的任務(wù) cancelScheduled(name); //創(chuàng)建新的任務(wù) ScheduledSourcescheduledSource=superScheduledConfig.getScheduledSource(name); scheduledSource.clear(); scheduledSource.setFixedRate(fixedRate); addScheduled(name,scheduledSource); } /** *查詢(xún)所有啟動(dòng)的Scheduled */ publicListgetRunScheduledName(){ Set names=superScheduledConfig.getNameToScheduledFuture().keySet(); returnnewArrayList<>(names); } /** *查詢(xún)所有的Scheduled */ publicList getAllSuperScheduledName(){ Set names=superScheduledConfig.getNameToRunnable().keySet(); returnnewArrayList<>(names); } /** *終止Scheduled * *@paramnamescheduled的名稱(chēng) */ publicvoidcancelScheduled(Stringname){ ScheduledFuturescheduledFuture=superScheduledConfig.getScheduledFuture(name); scheduledFuture.cancel(true); superScheduledConfig.removeScheduledFuture(name); logger.info(df.format(LocalDateTime.now())+"任務(wù)"+name+"已經(jīng)終止..."); } /** *啟動(dòng)Scheduled * *@paramnamescheduled的名稱(chēng) *@paramscheduledSource定時(shí)任務(wù)的源信息 */ publicvoidaddScheduled(Stringname,ScheduledSourcescheduledSource){ if(getRunScheduledName().contains(name)){ thrownewSuperScheduledException("定時(shí)任務(wù)"+name+"已經(jīng)被啟動(dòng)過(guò)了"); } if(!scheduledSource.check()){ thrownewSuperScheduledException("定時(shí)任務(wù)"+name+"源數(shù)據(jù)內(nèi)容錯(cuò)誤"); } scheduledSource.refreshType(); Runnablerunnable=superScheduledConfig.getRunnable(name); ThreadPoolTaskSchedulertaskScheduler=superScheduledConfig.getTaskScheduler(); ScheduledFuture>schedule=ScheduledFutureFactory.create(taskScheduler,scheduledSource,runnable); logger.info(df.format(LocalDateTime.now())+"任務(wù)"+name+"已經(jīng)啟動(dòng)..."); superScheduledConfig.addScheduledSource(name,scheduledSource); superScheduledConfig.addScheduledFuture(name,schedule); } /** *以cron類(lèi)型啟動(dòng)Scheduled * *@paramnamescheduled的名稱(chēng) *@paramcroncron表達(dá)式 */ publicvoidaddCronScheduled(Stringname,Stringcron){ ScheduledSourcescheduledSource=newScheduledSource(); scheduledSource.setCron(cron); addScheduled(name,scheduledSource); } /** *以fixedDelay類(lèi)型啟動(dòng)Scheduled * *@paramnamescheduled的名稱(chēng) *@paramfixedDelay上一次執(zhí)行完畢時(shí)間點(diǎn)之后多長(zhǎng)時(shí)間再執(zhí)行 *@paraminitialDelay第一次執(zhí)行的延遲時(shí)間 */ publicvoidaddFixedDelayScheduled(Stringname,LongfixedDelay,Long...initialDelay){ ScheduledSourcescheduledSource=newScheduledSource(); scheduledSource.setFixedDelay(fixedDelay); if(initialDelay!=null&&initialDelay.length==1){ scheduledSource.setInitialDelay(initialDelay[0]); }elseif(initialDelay!=null&&initialDelay.length>1){ thrownewSuperScheduledException("第一次執(zhí)行的延遲時(shí)間只能傳入一個(gè)參數(shù)"); } addScheduled(name,scheduledSource); } /** *以fixedRate類(lèi)型啟動(dòng)Scheduled * *@paramnamescheduled的名稱(chēng) *@paramfixedRate上一次開(kāi)始執(zhí)行之后多長(zhǎng)時(shí)間再執(zhí)行 *@paraminitialDelay第一次執(zhí)行的延遲時(shí)間 */ publicvoidaddFixedRateScheduled(Stringname,LongfixedRate,Long...initialDelay){ ScheduledSourcescheduledSource=newScheduledSource(); scheduledSource.setFixedRate(fixedRate); if(initialDelay!=null&&initialDelay.length==1){ scheduledSource.setInitialDelay(initialDelay[0]); }elseif(initialDelay!=null&&initialDelay.length>1){ thrownewSuperScheduledException("第一次執(zhí)行的延遲時(shí)間只能傳入一個(gè)參數(shù)"); } addScheduled(name,scheduledSource); } /** *手動(dòng)執(zhí)行一次任務(wù) * *@paramnamescheduled的名稱(chēng) */ publicvoidrunScheduled(Stringname){ Runnablerunnable=superScheduledConfig.getRunnable(name); runnable.run(); } }
2、增強(qiáng)接口實(shí)現(xiàn)
增強(qiáng)器實(shí)現(xiàn)的整體思路與SpringAop的思路一致,實(shí)現(xiàn)沒(méi)有Aop復(fù)雜
(1) 增強(qiáng)接口
@Order(Ordered.HIGHEST_PRECEDENCE) publicinterfaceBaseStrengthen{ /** *前置強(qiáng)化方法 * *@parambeanbean實(shí)例(或者是被代理的bean) *@parammethod執(zhí)行的方法對(duì)象 *@paramargs方法參數(shù) */ voidbefore(Objectbean,Methodmethod,Object[]args); /** *后置強(qiáng)化方法 *出現(xiàn)異常不會(huì)執(zhí)行 *如果未出現(xiàn)異常,在afterFinally方法之后執(zhí)行 * *@parambeanbean實(shí)例(或者是被代理的bean) *@parammethod執(zhí)行的方法對(duì)象 *@paramargs方法參數(shù) */ voidafter(Objectbean,Methodmethod,Object[]args); /** *異常強(qiáng)化方法 * *@parambeanbean實(shí)例(或者是被代理的bean) *@parammethod執(zhí)行的方法對(duì)象 *@paramargs方法參數(shù) */ voidexception(Objectbean,Methodmethod,Object[]args); /** *Finally強(qiáng)化方法,出現(xiàn)異常也會(huì)執(zhí)行 * *@parambeanbean實(shí)例(或者是被代理的bean) *@parammethod執(zhí)行的方法對(duì)象 *@paramargs方法參數(shù) */ voidafterFinally(Objectbean,Methodmethod,Object[]args); }
(2) 代理抽象類(lèi)
publicabstractclassPoint{ /** *定時(shí)任務(wù)名 */ privateStringsuperScheduledName; /** *抽象的執(zhí)行方法,使用代理實(shí)現(xiàn) *@paramrunnable定時(shí)任務(wù)執(zhí)行器 */ publicabstractObjectinvoke(SuperScheduledRunnablerunnable); /*普通的get/sets省略*/ }
(3) 調(diào)用鏈類(lèi)
publicclassChain{ privateListlist; privateintindex=-1; /** *索引自增1 */ publicintincIndex(){ return++index; } /** *索引還原 */ publicvoidresetIndex(){ this.index=-1; } }
(4) cglib動(dòng)態(tài)代理實(shí)現(xiàn)
使用cglib代理增強(qiáng)器,將增強(qiáng)器全部代理成調(diào)用鏈節(jié)點(diǎn)Point
publicclassRunnableBaseInterceptorimplementsMethodInterceptor{ /** *定時(shí)任務(wù)執(zhí)行器 */ privateSuperScheduledRunnablerunnable; /** *定時(shí)任務(wù)增強(qiáng)類(lèi) */ privateBaseStrengthenstrengthen; @Override publicObjectintercept(Objectobj,Methodmethod,Object[]args,MethodProxymethodProxy)throwsThrowable{ Objectresult; //如果執(zhí)行的是invoke()方法 if("invoke".equals(method.getName())){ //前置強(qiáng)化方法 strengthen.before(obj,method,args); try{ //調(diào)用執(zhí)行器中的invoke()方法 result=runnable.invoke(); }catch(Exceptione){ //異常強(qiáng)化方法 strengthen.exception(obj,method,args); thrownewSuperScheduledException(strengthen.getClass()+"中強(qiáng)化執(zhí)行時(shí)發(fā)生錯(cuò)誤",e); }finally{ //Finally強(qiáng)化方法,出現(xiàn)異常也會(huì)執(zhí)行 strengthen.afterFinally(obj,method,args); } //后置強(qiáng)化方法 strengthen.after(obj,method,args); }else{ //直接執(zhí)行方法 result=methodProxy.invokeSuper(obj,args); } returnresult; } publicRunnableBaseInterceptor(Objectobject,SuperScheduledRunnablerunnable){ this.runnable=runnable; if(BaseStrengthen.class.isAssignableFrom(object.getClass())){ this.strengthen=(BaseStrengthen)object; }else{ thrownewSuperScheduledException(object.getClass()+"對(duì)象不是BaseStrengthen類(lèi)型"); } } publicRunnableBaseInterceptor(){ } }
(5) 定時(shí)任務(wù)執(zhí)行器實(shí)現(xiàn)
publicclassSuperScheduledRunnable{ /** *原始的方法 */ privateMethodmethod; /** *方法所在的bean */ privateObjectbean; /** *增強(qiáng)器的調(diào)用鏈 */ privateChainchain; publicObjectinvoke(){ Objectresult; //索引自增1 if(chain.incIndex()==chain.getList().size()){ //調(diào)用鏈中的增強(qiáng)方法已經(jīng)全部執(zhí)行結(jié)束 try{ //調(diào)用鏈索引初始化 chain.resetIndex(); //增強(qiáng)器全部執(zhí)行完畢,執(zhí)行原本的方法 result=method.invoke(bean); }catch(IllegalAccessException|InvocationTargetExceptione){ thrownewSuperScheduledException(e.getLocalizedMessage()); } }else{ //獲取被代理后的方法增強(qiáng)器 Pointpoint=chain.getList().get(chain.getIndex()); //執(zhí)行增強(qiáng)器代理 //增強(qiáng)器代理中,會(huì)回調(diào)方法執(zhí)行器,形成調(diào)用鏈,逐一運(yùn)行調(diào)用鏈中的增強(qiáng)器 result=point.invoke(this); } returnresult; } /*普通的get/sets省略*/ }
(6) 增強(qiáng)器代理邏輯
com.gyx.superscheduled.core.SuperScheduledApplicationRunner類(lèi)中的代碼片段
//創(chuàng)建執(zhí)行控制器 SuperScheduledRunnablerunnable=newSuperScheduledRunnable(); runnable.setMethod(scheduledSource.getMethod()); runnable.setBean(scheduledSource.getBean()); //用來(lái)存放增強(qiáng)器的代理對(duì)象 Listpoints=newArrayList<>(baseStrengthenBeanNames.length); //循環(huán)所有的增強(qiáng)器的beanName for(StringbaseStrengthenBeanName:baseStrengthenBeanNames){ //獲取增強(qiáng)器的bean對(duì)象 ObjectbaseStrengthenBean=applicationContext.getBean(baseStrengthenBeanName); //將增強(qiáng)器代理成Point節(jié)點(diǎn) Pointproxy=ProxyUtils.getInstance(Point.class,newRunnableBaseInterceptor(baseStrengthenBean,runnable)); proxy.setSuperScheduledName(name); //增強(qiáng)器的代理對(duì)象緩存到list中 points.add(proxy); } //將增強(qiáng)器代理實(shí)例的集合生成調(diào)用鏈 //執(zhí)行控制器中設(shè)置調(diào)用鏈 runnable.setChain(newChain(points));
審核編輯:劉清
-
處理器
+關(guān)注
關(guān)注
68文章
19286瀏覽量
229842 -
控制器
+關(guān)注
關(guān)注
112文章
16361瀏覽量
178050 -
增強(qiáng)器
+關(guān)注
關(guān)注
1文章
46瀏覽量
8261 -
SpringBoot
+關(guān)注
關(guān)注
0文章
173瀏覽量
179
原文標(biāo)題:SpringBoot 定時(shí)任務(wù)動(dòng)態(tài)管理通用解決方案
文章出處:【微信號(hào):AndroidPush,微信公眾號(hào):Android編程精選】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論