首頁>技術>

xxl-job概述

xxl-job程式碼總體分為排程中心和執行器部分,他們直接透過rpc(netty)通訊實現。

執行器部分是執行具體任務的模組。

排程中心主要是實現定時邏輯以及排程執行器任務,如果有多個執行器,執行器啟動的時候會把執行器地址和埠註冊到排程中心的資料庫中,然後排程中心排程的時候可以按照定義的各種排程演算法(比如輪詢負載均衡,隨機,廣播等)來進行排程。

我接下來講解下排程中心部分的核心定時執行邏輯,這部分是xxl-job定時核心程式碼和精髓所在。

定時任務邏輯和時間輪的演算法的應用

xxl-job定時核心邏輯實現

時間輪演算法介紹

先說下定時任務,比如下訂單業務,一般下了訂單後,都有個超時時間,超過這個時間後訂單取消,這個操作你會怎麼設計呢?

有幾種辦法,下面一一舉例說明:

下單後,給每個訂單開啟一個定時排程任務,到達規定的時間後,執行失效方法。我把訂單過期的時間也持久化到資料庫中,然後開一個定時任務每個1s去遍歷一邊資料庫中未過期的訂單資料,然後判斷當前時間是否大於過期時間,如果大於則超時。用時間輪的方式解決。

上面第一種方法肯定是最低效的,太浪費資源了,而且可靠性也不好。

第二種方法改善了好多,但是也有缺點,第一需要頻繁地去讀取資料庫,第二每一秒要遍歷所有的未過期的資料,即使大部分的任務還沒到過期時間,但是也會去遍歷一遍,如果資料量大也是比較浪費時間和效能的。

主角登場了!!!

時間輪演算法就解決了上面的問題,時間輪會先劃分出60個槽(舉例按1分鐘60秒),然後分為資料生產和消費

資料生產:

比如訂單超時時間是60s,然後把產生的訂單資料,按照他們的超時時間去對60求餘,就算出來了一個60s內的時間值key,然後就把訂單資料放入該key對應的一個list中。

資料消費:

每過一秒就去當前秒的key對應的list中取出來對應的訂單,然後執行該訂單的失效操作。

舉例

舉個實際例子來算一遍,比如訂單超時時間是60s,然後時間輪劃分了60個槽對應每一秒,

比如當前時間戳是61s(為了計算方便),首先用61對60求餘為1,然後取出“1”對應槽的佇列,執行失效操作,如果這個時候來了一個訂單,用“當前時間”+60s = 121s,然後121s對60求餘,結果為1,然後就把這個訂單1放入“1”這個key對應的佇列中,時間輪繼續向前執行。

噹噹前時間走到62s的時候,首先用62對60求餘為2,然後取出“2”對應槽的佇列,執行失效操作,如果這個時候來了一個訂單,用“當前時間”+60s = 122s,對60求餘,結果為2,然後就把這個訂單2放入,“2”這個key對應的佇列中,時間輪繼續向前執行。

然後時間輪還是走,當時間走到121s的時候,對60求餘,會取出“1”這個key對應佇列中的資料,就是訂單1,執行失效操作,可以看到這個訂單1放入的時間是61s過了60s被執行了。

當時間走到122s的時候,對60求餘,會取出“2”這個key對應佇列中的資料,就是訂單2,執行失效操作,可以看到這個訂單2,放入的時間是62s,過了60s被執行了。

可以看到時間輪每走一次獲取的訂單資料都是有效的,不會掃描所有的訂單資料。

如果訂單過期時間是120s,那麼這裡的刻度要是120個刻度,刻度和過期時間是對應的。

xxl-job中的時間輪應用

scheduleThread:

主要實現從資料庫中讀取任務觸發時間小於now() + 5s的任務列表,然後遍歷這些任務列表。

如果遍歷到某個任務的時候發現 “當前時間” > “任務觸發時間”+5s,則跳過該任務。如果遍歷到某個任務的時候發現 “當前時間” < “任務觸發時間”+5s,則立刻觸發此任務,讓 後再檢查下次觸發時間是不是距離當前時間的5s內,即判斷“當前時間” + 5s > "下次任務觸發時間",如果是大於,則把該任務加入時間輪的佇列中。如果遍歷到某個任務的時候發現當前時間還沒到任務觸發時間,則把該任務加入時間輪的佇列中

核心程式碼如下:

// schedule threadscheduleThread = new Thread(new Runnable() {    @Override    public void run() {        try {            TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );        } catch (InterruptedException e) {            if (!scheduleThreadToStop) {                logger.error(e.getMessage(), e);            }        }        logger.info(">>>>>>>>> init xxl-job admin scheduler success.");        // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)        int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;        while (!scheduleThreadToStop) {            // Scan Job            long start = System.currentTimeMillis();            Connection conn = null;            Boolean connAutoCommit = null;            PreparedStatement preparedStatement = null;            boolean preReadSuc = true;            try {                conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();                connAutoCommit = conn.getAutoCommit();                conn.setAutoCommit(false);                preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );                preparedStatement.execute();                // tx start                // 1、從資料庫中讀出來未來5s內的任務資料列表                long nowTime = System.currentTimeMillis();                List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);                if (scheduleList!=null && scheduleList.size()>0) {                    // 2、push time-ring                    for (XxlJobInfo jobInfo: scheduleList) {                        // 當前時間可能會大於任務觸發時間,比如從資料庫中讀取任務資料然後遍歷到某個任務的時候                        // 這個耗時時間可能超過了5s,這就會出現問題了,比如在61000時間戳這個時刻從資料庫中查                        // 出來的某個任務的下次觸發時間戳是65000,當讀出來後到遍歷到這個任務耗時6000ms,當前時刻                        // 時間戳就到了66000,超過了任務觸發時間。                                             // “當前時間” > “任務觸發時間”+5s                        if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {                            // 2.1、trigger-expire > 5s:pass && make next-trigger-time                            logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());                            // 1、misfire match                            MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);                            if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {                                // FIRE_ONCE_NOW 》 trigger                                JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);                                logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );                            }                            // 2、更新下次觸發時間                            refreshNextValidTime(jobInfo, new Date());                        } else if (nowTime > jobInfo.getTriggerNextTime()) {                            // “當前時間” < “任務觸發時間”+5s                            // 1、直接去觸發遠端執行器執行任務                            JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);                            logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );                            // 2、重新整理下次觸發時間                            refreshNextValidTime(jobInfo, new Date());                            // 當前時間” + 5s > "下次任務觸發時間",則把該任務加入時間輪的佇列中。                            if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {                                // 1、make ring second                                int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);                                // 2、push time ring                                pushTimeRing(ringSecond, jobInfo.getId());                                // 3、fresh next                                refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));                            }                        } else {                            // 當前時間還沒到任務觸發時間,則把該任務加入時間輪的佇列中                            // 1、計算時間輪的刻度key                            int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);                            // 2、把對應的任務放入到ringSecond對應的任務佇列中                            pushTimeRing(ringSecond, jobInfo.getId());                            // 3、更新下次觸發時間                            refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));                        }                    }                    // 3、update trigger info                    for (XxlJobInfo jobInfo: scheduleList) {                        XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);                    }                } else {                    preReadSuc = false;                }            } catch (Exception e) {               ...            } finally {								...            }            long cost = System.currentTimeMillis()-start;            // Wait seconds, align second            if (cost < 1000) {  // scan-overtime, not wait                try {                    // pre-read period: success > scan each second; fail > skip this period;                    TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);                } catch (InterruptedException e) {                    if (!scheduleThreadToStop) {                        logger.error(e.getMessage(), e);                    }                }            }        }        logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");    }});

ringThread:

實現了時間輪的核心程式碼,程式碼如下:

ringThread = new Thread(new Runnable() {    @Override    public void run() {        // align second        try {            TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 );        } catch (InterruptedException e) {            if (!ringThreadToStop) {                logger.error(e.getMessage(), e);            }        }				//輪詢        while (!ringThreadToStop) {            try {                // second data                List<Integer> ringItemData = new ArrayList<>();                int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免處理耗時太長,跨過刻度,向前校驗一個刻度;                for (int i = 0; i < 2; i++) {                    //從時間輪中取出某一刻度對應的資料並刪除該任務列表,這裡為了避免處理耗時太長,                    //取了當前刻度和前一個刻度時間槽對應的資料                    List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );                    if (tmpData != null) {                        ringItemData.addAll(tmpData);                    }                }                // ring trigger                logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );                if (ringItemData.size() > 0) {                    // 遍歷取出的任務,並觸發執行(最終會透過rpc呼叫到執行器部分執行)                    for (int jobId: ringItemData) {                        // do trigger                        JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);                    }                    // 清除該佇列中的資料                    ringItemData.clear();                }            } catch (Exception e) {                if (!ringThreadToStop) {                    logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);                }            }            // 休眠,休眠時間對齊到秒,意思就是比如當前時間是400ms,            // 然後透過1000 - System.currentTimeMillis()%1000計算後算出來是600,所以就休眠            // 600ms,主要是為了實現時間輪1s前進一次的效果。            try {                TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);            } catch (InterruptedException e) {                if (!ringThreadToStop) {                    logger.error(e.getMessage(), e);                }            }        }        logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");    }});
xxl-job中的時間輪演算法

可以看到xxl-job中的時間輪演算法也是比較有特點的,每次輪詢,它的生產資料端scheduleThread去資料庫中讀取未來5s內的任務列表,然後把它對映到對應的時間輪中,供後續的ringThread去消費。這個未來5s內應該是xxl-job開發者是有自己的考量,最終定的一個值。

但是注意讀取未來的時間間隔不能超過60s,如果超過60s,就會出問題了,可能會存在提前60s去執行某個任務,導致定時任務提前執行了,這是因為時間輪的刻度是60s。

xxl-job的這個時間輪演算法最終是可以支援任意的定時時間。

總結

時間輪演算法可以說是時間定時任務的最佳解決方案了,它也是在很多開源專案中有應用,比如Netty、Akka、Quartz、ZooKeeper 、Kafka等元件中都存在時間輪的蹤影。

13
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • 基於RBAC的許可權訪問系統基礎