1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        如何動態(tài)管理Spring Boot定時任務(wù)

        共 17733字,需瀏覽 36分鐘

         ·

        2022-04-19 11:19

        ??Java大聯(lián)盟

        ? 致力于最高效的Java學(xué)習(xí)

        關(guān)注





        原文鏈接
        blog.csdn.net/qq_34886352/article/details/106494637


        B 站搜索:楠哥教你學(xué)Java

        獲取更多優(yōu)質(zhì)視頻教程



        1、功能說明

        SpringBoot的定時任務(wù)的加強(qiáng)工具,實現(xiàn)對SpringBoot原生的定時任務(wù)進(jìn)行動態(tài)管理,完全兼容原生@Scheduled注解,無需對原本的定時任務(wù)進(jìn)行修改。


        2、快速使用

        具體的功能已經(jīng)封裝成SpringBoot-starter即插即用

        <dependency>    <groupId>com.github.guoyixinggroupId>    <artifactId>spring-boot-starter-super-scheduledartifactId>    <version>0.3.1version>dependency>

        使用方法和源碼:

        碼云:https://gitee.com/qiaodaimadewangcai/super-scheduled

        github:https://github.com/guoyixing/super-scheduled


        3、實現(xiàn)原理

        1、動態(tài)管理實現(xiàn)

        (1) 配置管理介紹

        @Component("superScheduledConfig")public class SuperScheduledConfig {    /**     * 執(zhí)行定時任務(wù)的線程池     */    private ThreadPoolTaskScheduler taskScheduler;
        /** * 定時任務(wù)名稱與定時任務(wù)回調(diào)鉤子 的關(guān)聯(lián)關(guān)系容器 */ private Map<String, ScheduledFuture> nameToScheduledFuture = new ConcurrentHashMap<>();
        /** * 定時任務(wù)名稱與定時任務(wù)需要執(zhí)行的邏輯 的關(guān)聯(lián)關(guān)系容器 */ private Map<String, Runnable> nameToRunnable = new ConcurrentHashMap<>();
        /** * 定時任務(wù)名稱與定時任務(wù)的源信息 的關(guān)聯(lián)關(guān)系容器 */ private Map<String, ScheduledSource> nameToScheduledSource = new ConcurrentHashMap<>(); /* 普通的get/sets省略 */}

        (2) 使用后處理器攔截SpringBoot原本的定時任務(wù)

        • 實現(xiàn)ApplicationContextAware接口拿到SpringBoot的上下文
        • 實現(xiàn)BeanPostProcessor接口,將這個類標(biāo)記為后處理器,后處理器會在每個bean實例化之后執(zhí)行
        • 使用@DependsOn注解強(qiáng)制依賴SuperScheduledConfig類,讓SpringBoot實例化SuperScheduledPostProcessor類之前先實例化SuperScheduledConfig類
        • 主要實現(xiàn)邏輯在postProcessAfterInitialization()方法中

        @DependsOn({"superScheduledConfig"})@Component@Orderpublic class SuperScheduledPostProcessor implements BeanPostProcessor, ApplicationContextAware {    protected final Log logger = LogFactory.getLog(getClass());
        private ApplicationContext applicationContext;
        /** * 實例化bean之前的操作 * @param bean bean實例 * @param beanName bean的Name */ @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { return bean; }
        /** * 實例化bean之后的操作 * @param bean bean實例 * @param beanName bean的Name */ @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { //1.獲取配置管理器 SuperScheduledConfig superScheduledConfig = applicationContext.getBean(SuperScheduledConfig.class);
        //2.獲取當(dāng)前實例化完成的bean的所有方法 Method[] methods = bean.getClass().getDeclaredMethods(); //循環(huán)處理對每個方法逐一處理 if (methods.length > 0) { for (Method method : methods) { //3.嘗試在該方法上獲取@Scheduled注解(SpringBoot的定時任務(wù)注解) Scheduled annotation = method.getAnnotation(Scheduled.class); //如果無法獲取到@Scheduled注解,就跳過這個方法 if (annotation == null) { continue; } //4.創(chuàng)建定時任務(wù)的源屬性 //創(chuàng)建定時任務(wù)的源屬性(用來記錄定時任務(wù)的配置,初始化的時候記錄的是注解上原本的屬性) ScheduledSource scheduledSource = new ScheduledSource(annotation, method, bean); //對注解上獲取到源屬性中的屬性進(jìn)行檢測 if (!scheduledSource.check()) { throw new SuperScheduledException("在" + beanName + "Bean中" + method.getName() + "方法的注解參數(shù)錯誤"); } //生成定時任務(wù)的名稱(id),使用beanName+“.”+方法名 String name = beanName + "." + method.getName(); //將以key-value的形式,將源數(shù)據(jù)存入配置管理器中,key:定時任務(wù)的名稱 value:源數(shù)據(jù) superScheduledConfig.addScheduledSource(name, scheduledSource); try { //5.將原本SpringBoot的定時任務(wù)取消掉 clearOriginalScheduled(annotation); } catch (Exception e) { throw new SuperScheduledException("在關(guān)閉原始方法" + beanName + method.getName() + "時出現(xiàn)錯誤"); } } } //最后bean保持原有返回 return bean; }
        /** * 修改注解原先的屬性 * @param annotation 注解實例對象 * @throws Exception */ private void clearOriginalScheduled(Scheduled annotation) throws Exception { changeAnnotationValue(annotation, "cron", Scheduled.CRON_DISABLED); changeAnnotationValue(annotation, "fixedDelay", -1L); changeAnnotationValue(annotation, "fixedDelayString", ""); changeAnnotationValue(annotation, "fixedRate", -1L); changeAnnotationValue(annotation, "fixedRateString", ""); changeAnnotationValue(annotation, "initialDelay", -1L); changeAnnotationValue(annotation, "initialDelayString", ""); }

        /** * 獲取SpringBoot的上下文 * @param applicationContext SpringBoot的上下文 */ @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; }}

        (3) 使用ApplicationRunner初始化自定義的定時任務(wù)運(yùn)行器

        • 實現(xiàn)ApplicationContextAware接口拿到SpringBoot的上下文
        • 使用@DependsOn注解強(qiáng)制依賴threadPoolTaskScheduler類
        • 實現(xiàn)ApplicationRunner接口,在所有bean初始化結(jié)束之后,運(yùn)行自定義邏輯
        • 主要實現(xiàn)邏輯在run()方法中

        @DependsOn("threadPoolTaskScheduler")@Componentpublic class SuperScheduledApplicationRunner implements ApplicationRunner, ApplicationContextAware {    protected final Log logger = LogFactory.getLog(getClass());    private DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");    private ApplicationContext applicationContext;  /**     * 定時任務(wù)配置管理器     */    @Autowired    private SuperScheduledConfig superScheduledConfig;    /**     * 定時任務(wù)執(zhí)行線程     */    @Autowired    private ThreadPoolTaskScheduler threadPoolTaskScheduler;
        @Override public void run(ApplicationArguments args) { //1.定時任務(wù)配置管理器中緩存 定時任務(wù)執(zhí)行線程 superScheduledConfig.setTaskScheduler(threadPoolTaskScheduler); //2.獲取所有定時任務(wù)源數(shù)據(jù) Map<String, ScheduledSource> nameToScheduledSource = superScheduledConfig.getNameToScheduledSource(); //逐一處理定時任務(wù) for (String name : nameToScheduledSource.keySet()) { //3.獲取定時任務(wù)源數(shù)據(jù) ScheduledSource scheduledSource = nameToScheduledSource.get(name); //4.獲取所有增強(qiáng)類 String[] baseStrengthenBeanNames = applicationContext.getBeanNamesForType(BaseStrengthen.class); //5.創(chuàng)建執(zhí)行控制器 SuperScheduledRunnable runnable = new SuperScheduledRunnable(); //配置執(zhí)行控制器 runnable.setMethod(scheduledSource.getMethod()); runnable.setBean(scheduledSource.getBean()); //6.逐一處理增強(qiáng)類(增強(qiáng)器實現(xiàn)原理后面具體分析) List points = new ArrayList<>(baseStrengthenBeanNames.length); for (String baseStrengthenBeanName : baseStrengthenBeanNames) { //7.將增強(qiáng)器代理成point Object baseStrengthenBean = applicationContext.getBean(baseStrengthenBeanName); //創(chuàng)建代理 Point proxy = ProxyUtils.getInstance(Point.class, new RunnableBaseInterceptor(baseStrengthenBean, runnable)); proxy.setSuperScheduledName(name); //8.所有的points連成起來 points.add(proxy); } //將point形成調(diào)用鏈 runnable.setChain(new Chain(points)); //將執(zhí)行邏輯封裝并緩存到定時任務(wù)配置管理器中 superScheduledConfig.addRunnable(name, runnable::invoke); try { //8.啟動定時任務(wù) ScheduledFuture schedule = ScheduledFutureFactory.create(threadPoolTaskScheduler , scheduledSource, runnable::invoke); //將線程回調(diào)鉤子存到任務(wù)配置管理器中 superScheduledConfig.addScheduledFuture(name, schedule); logger.info(df.format(LocalDateTime.now()) + "任務(wù)" + name + "已經(jīng)啟動...");
        } catch (Exception e) { throw new SuperScheduledException("任務(wù)" + name + "啟動失敗,錯誤信息:" + e.getLocalizedMessage()); } } }
        @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; }}

        (4) 進(jìn)行動態(tài)管理

        @Componentpublic class SuperScheduledManager {    protected final Log logger = LogFactory.getLog(getClass());    private DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        @Autowired private SuperScheduledConfig superScheduledConfig;
        /** * 修改Scheduled的執(zhí)行周期 * * @param name scheduled的名稱 * @param cron cron表達(dá)式 */ public void setScheduledCron(String name, String cron) { //終止原先的任務(wù) cancelScheduled(name); //創(chuàng)建新的任務(wù) ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name); scheduledSource.clear(); scheduledSource.setCron(cron); addScheduled(name, scheduledSource); }
        /** * 修改Scheduled的fixedDelay * * @param name scheduled的名稱 * @param fixedDelay 上一次執(zhí)行完畢時間點(diǎn)之后多長時間再執(zhí)行 */ public void setScheduledFixedDelay(String name, Long fixedDelay) { //終止原先的任務(wù) cancelScheduled(name); //創(chuàng)建新的任務(wù) ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name); scheduledSource.clear(); scheduledSource.setFixedDelay(fixedDelay); addScheduled(name, scheduledSource); }
        /** * 修改Scheduled的fixedRate * * @param name scheduled的名稱 * @param fixedRate 上一次開始執(zhí)行之后多長時間再執(zhí)行 */ public void setScheduledFixedRate(String name, Long fixedRate) { //終止原先的任務(wù) cancelScheduled(name); //創(chuàng)建新的任務(wù) ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name); scheduledSource.clear(); scheduledSource.setFixedRate(fixedRate); addScheduled(name, scheduledSource); }
        /** * 查詢所有啟動的Scheduled */ public List getRunScheduledName() { Set names = superScheduledConfig.getNameToScheduledFuture().keySet(); return new ArrayList<>(names); }
        /** * 查詢所有的Scheduled */ public List getAllSuperScheduledName() { Set names = superScheduledConfig.getNameToRunnable().keySet(); return new ArrayList<>(names); }
        /** * 終止Scheduled * * @param name scheduled的名稱 */ public void cancelScheduled(String name) { ScheduledFuture scheduledFuture = superScheduledConfig.getScheduledFuture(name); scheduledFuture.cancel(true); superScheduledConfig.removeScheduledFuture(name); logger.info(df.format(LocalDateTime.now()) + "任務(wù)" + name + "已經(jīng)終止..."); }
        /** * 啟動Scheduled * * @param name scheduled的名稱 * @param scheduledSource 定時任務(wù)的源信息 */ public void addScheduled(String name, ScheduledSource scheduledSource) { if (getRunScheduledName().contains(name)) { throw new SuperScheduledException("定時任務(wù)" + name + "已經(jīng)被啟動過了"); } if (!scheduledSource.check()) { throw new SuperScheduledException("定時任務(wù)" + name + "源數(shù)據(jù)內(nèi)容錯誤"); }
        scheduledSource.refreshType();
        Runnable runnable = superScheduledConfig.getRunnable(name); ThreadPoolTaskScheduler taskScheduler = superScheduledConfig.getTaskScheduler();

        ScheduledFuture schedule = ScheduledFutureFactory.create(taskScheduler, scheduledSource, runnable); logger.info(df.format(LocalDateTime.now()) + "任務(wù)" + name + "已經(jīng)啟動...");
        superScheduledConfig.addScheduledSource(name, scheduledSource); superScheduledConfig.addScheduledFuture(name, schedule); }
        /** * 以cron類型啟動Scheduled * * @param name scheduled的名稱 * @param cron cron表達(dá)式 */ public void addCronScheduled(String name, String cron) { ScheduledSource scheduledSource = new ScheduledSource(); scheduledSource.setCron(cron);
        addScheduled(name, scheduledSource); }
        /** * 以fixedDelay類型啟動Scheduled * * @param name scheduled的名稱 * @param fixedDelay 上一次執(zhí)行完畢時間點(diǎn)之后多長時間再執(zhí)行 * @param initialDelay 第一次執(zhí)行的延遲時間 */ public void addFixedDelayScheduled(String name, Long fixedDelay, Long... initialDelay) { ScheduledSource scheduledSource = new ScheduledSource(); scheduledSource.setFixedDelay(fixedDelay); if (initialDelay != null && initialDelay.length == 1) { scheduledSource.setInitialDelay(initialDelay[0]); } else if (initialDelay != null && initialDelay.length > 1) { throw new SuperScheduledException("第一次執(zhí)行的延遲時間只能傳入一個參數(shù)"); }
        addScheduled(name, scheduledSource); }
        /** * 以fixedRate類型啟動Scheduled * * @param name scheduled的名稱 * @param fixedRate 上一次開始執(zhí)行之后多長時間再執(zhí)行 * @param initialDelay 第一次執(zhí)行的延遲時間 */ public void addFixedRateScheduled(String name, Long fixedRate, Long... initialDelay) { ScheduledSource scheduledSource = new ScheduledSource(); scheduledSource.setFixedRate(fixedRate); if (initialDelay != null && initialDelay.length == 1) { scheduledSource.setInitialDelay(initialDelay[0]); } else if (initialDelay != null && initialDelay.length > 1) { throw new SuperScheduledException("第一次執(zhí)行的延遲時間只能傳入一個參數(shù)"); }
        addScheduled(name, scheduledSource); }
        /** * 手動執(zhí)行一次任務(wù) * * @param name scheduled的名稱 */ public void runScheduled(String name) { Runnable runnable = superScheduledConfig.getRunnable(name); runnable.run(); }}

        2、增強(qiáng)接口實現(xiàn)

        增強(qiáng)器實現(xiàn)的整體思路與SpringAop的思路一致,實現(xiàn)沒有Aop復(fù)雜

        (1) 增強(qiáng)接口

        @Order(Ordered.HIGHEST_PRECEDENCE)public interface BaseStrengthen {    /**     * 前置強(qiáng)化方法     *     * @param bean   bean實例(或者是被代理的bean)     * @param method 執(zhí)行的方法對象     * @param args   方法參數(shù)     */    void before(Object bean, Method method, Object[] args);
        /** * 后置強(qiáng)化方法 * 出現(xiàn)異常不會執(zhí)行 * 如果未出現(xiàn)異常,在afterFinally方法之后執(zhí)行 * * @param bean bean實例(或者是被代理的bean) * @param method 執(zhí)行的方法對象 * @param args 方法參數(shù) */ void after(Object bean, Method method, Object[] args);
        /** * 異常強(qiáng)化方法 * * @param bean bean實例(或者是被代理的bean) * @param method 執(zhí)行的方法對象 * @param args 方法參數(shù) */ void exception(Object bean, Method method, Object[] args);
        /** * Finally強(qiáng)化方法,出現(xiàn)異常也會執(zhí)行 * * @param bean bean實例(或者是被代理的bean) * @param method 執(zhí)行的方法對象 * @param args 方法參數(shù) */ void afterFinally(Object bean, Method method, Object[] args);}

        (2) 代理抽象類

        public abstract class Point {    /**     * 定時任務(wù)名     */    private String superScheduledName;
        /** * 抽象的執(zhí)行方法,使用代理實現(xiàn) * @param runnable 定時任務(wù)執(zhí)行器 */ public abstract Object invoke(SuperScheduledRunnable runnable); /* 普通的get/sets省略 */}

        (3) 調(diào)用鏈類

        public class Chain {    private List list;    private int index = -1;    /**     * 索引自增1     */    public int incIndex() {        return ++index;    }
        /** * 索引還原 */ public void resetIndex() { this.index = -1; }}

        (4) cglib動態(tài)代理實現(xiàn)

        使用cglib代理增強(qiáng)器,將增強(qiáng)器全部代理成調(diào)用鏈節(jié)點(diǎn)Point

        public class RunnableBaseInterceptor implements MethodInterceptor {    /**     * 定時任務(wù)執(zhí)行器     */    private SuperScheduledRunnable runnable;    /**     * 定時任務(wù)增強(qiáng)類     */    private BaseStrengthen strengthen;
        @Override public Object intercept(Object obj, Method method, Object[] args, MethodProxy methodProxy) throws Throwable { Object result; //如果執(zhí)行的是invoke()方法 if ("invoke".equals(method.getName())) { //前置強(qiáng)化方法 strengthen.before(obj, method, args); try { //調(diào)用執(zhí)行器中的invoke()方法 result = runnable.invoke(); } catch (Exception e) { //異常強(qiáng)化方法 strengthen.exception(obj, method, args); throw new SuperScheduledException(strengthen.getClass() + "中強(qiáng)化執(zhí)行時發(fā)生錯誤", e); } finally { //Finally強(qiáng)化方法,出現(xiàn)異常也會執(zhí)行 strengthen.afterFinally(obj, method, args); } //后置強(qiáng)化方法 strengthen.after(obj, method, args);
        } else { //直接執(zhí)行方法 result = methodProxy.invokeSuper(obj, args); } return result; }
        public RunnableBaseInterceptor(Object object, SuperScheduledRunnable runnable) { this.runnable = runnable; if (BaseStrengthen.class.isAssignableFrom(object.getClass())) { this.strengthen = (BaseStrengthen) object; } else { throw new SuperScheduledException(object.getClass() + "對象不是BaseStrengthen類型"); } }
        public RunnableBaseInterceptor() {
        }}

        (5) 定時任務(wù)執(zhí)行器實現(xiàn)

        public class SuperScheduledRunnable {    /**     * 原始的方法     */    private Method method;    /**     * 方法所在的bean     */    private Object bean;    /**     * 增強(qiáng)器的調(diào)用鏈     */    private Chain chain;

        public Object invoke() { Object result; //索引自增1 if (chain.incIndex() == chain.getList().size()) { //調(diào)用鏈中的增強(qiáng)方法已經(jīng)全部執(zhí)行結(jié)束 try { //調(diào)用鏈索引初始化 chain.resetIndex(); //增強(qiáng)器全部執(zhí)行完畢,執(zhí)行原本的方法 result = method.invoke(bean); } catch (IllegalAccessException | InvocationTargetException e) { throw new SuperScheduledException(e.getLocalizedMessage()); } } else { //獲取被代理后的方法增強(qiáng)器 Point point = chain.getList().get(chain.getIndex()); //執(zhí)行增強(qiáng)器代理 //增強(qiáng)器代理中,會回調(diào)方法執(zhí)行器,形成調(diào)用鏈,逐一運(yùn)行調(diào)用鏈中的增強(qiáng)器 result = point.invoke(this); } return result; } /* 普通的get/sets省略 */}

        (6) 增強(qiáng)器代理邏輯

        com.gyx.superscheduled.core.SuperScheduledApplicationRunner類中的代碼片段

        //創(chuàng)建執(zhí)行控制器SuperScheduledRunnable runnable = new SuperScheduledRunnable();runnable.setMethod(scheduledSource.getMethod());runnable.setBean(scheduledSource.getBean());//用來存放 增強(qiáng)器的代理對象List points = new ArrayList<>(baseStrengthenBeanNames.length);//循環(huán)所有的增強(qiáng)器的beanNamefor (String baseStrengthenBeanName : baseStrengthenBeanNames) { //獲取增強(qiáng)器的bean對象    Object baseStrengthenBean = applicationContext.getBean(baseStrengthenBeanName);    //將增強(qiáng)器代理成Point節(jié)點(diǎn)    Point proxy = ProxyUtils.getInstance(Point.class, new RunnableBaseInterceptor(baseStrengthenBean, runnable));    proxy.setSuperScheduledName(name);    //增強(qiáng)器的代理對象緩存到list中    points.add(proxy);}//將增強(qiáng)器代理實例的集合生成調(diào)用鏈//執(zhí)行控制器中設(shè)置調(diào)用鏈runnable.setChain(new Chain(points));



        推薦閱讀

        1、Spring Boot+Vue項目實戰(zhàn)

        2、B站:4小時上手MyBatis Plus

        3、一文搞懂前后端分離

        4、快速上手Spring Boot+Vue前后端分離


        楠哥簡介

        資深 Java 工程師,微信號 nnsouthwind

        《Java零基礎(chǔ)實戰(zhàn)》一書作者

        騰訊課程官方 Java 面試官今日頭條認(rèn)證大V

        GitChat認(rèn)證作者,B站認(rèn)證UP主(楠哥教你學(xué)Java)

        致力于幫助萬千 Java 學(xué)習(xí)者持續(xù)成長。




        有收獲,就點(diǎn)在看?
        瀏覽 52
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            嗯嗯啊啊啊啊 | 亚洲V国产v欧美v久久久久久 | 好爽啊啊啊啊 | 大鸡八网站| 一区欧美| 日韩成人在线播放 | 国产一级特黄AAAAA片一 99热播放 | 高h乱l高辣h文乱校花筱筱 | www.骚逼.com | 成人超碰是摸夜舔天天操 |