xxl-job概述
xxl-job程式碼總體分為排程中心和執行器部分,他們直接透過rpc(netty)通訊實現。
執行器部分是執行具體任務的模組。
排程中心主要是實現定時邏輯以及排程執行器任務,如果有多個執行器,執行器啟動的時候會把執行器地址和埠註冊到排程中心的資料庫中,然後排程中心排程的時候可以按照定義的各種排程演算法(比如輪詢負載均衡,隨機,廣播等)來進行排程。
我接下來講解下排程中心部分的核心定時執行邏輯,這部分是xxl-job定時核心程式碼和精髓所在。
定時任務邏輯和時間輪的演算法的應用xxl-job定時核心邏輯實現
時間輪演算法介紹先說下定時任務,比如下訂單業務,一般下了訂單後,都有個超時時間,超過這個時間後訂單取消,這個操作你會怎麼設計呢?
有幾種辦法,下面一一舉例說明:
下單後,給每個訂單開啟一個定時排程任務,到達規定的時間後,執行失效方法。我把訂單過期的時間也持久化到資料庫中,然後開一個定時任務每個1s去遍歷一邊資料庫中未過期的訂單資料,然後判斷當前時間是否大於過期時間,如果大於則超時。用時間輪的方式解決。上面第一種方法肯定是最低效的,太浪費資源了,而且可靠性也不好。
第二種方法改善了好多,但是也有缺點,第一需要頻繁地去讀取資料庫,第二每一秒要遍歷所有的未過期的資料,即使大部分的任務還沒到過期時間,但是也會去遍歷一遍,如果資料量大也是比較浪費時間和效能的。
主角登場了!!!
時間輪演算法就解決了上面的問題,時間輪會先劃分出60個槽(舉例按1分鐘60秒),然後分為資料生產和消費
資料生產:
比如訂單超時時間是60s,然後把產生的訂單資料,按照他們的超時時間去對60求餘,就算出來了一個60s內的時間值key,然後就把訂單資料放入該key對應的一個list中。
資料消費:
每過一秒就去當前秒的key對應的list中取出來對應的訂單,然後執行該訂單的失效操作。
舉例
舉個實際例子來算一遍,比如訂單超時時間是60s,然後時間輪劃分了60個槽對應每一秒,
比如當前時間戳是61s(為了計算方便),首先用61對60求餘為1,然後取出“1”對應槽的佇列,執行失效操作,如果這個時候來了一個訂單,用“當前時間”+60s = 121s,然後121s對60求餘,結果為1,然後就把這個訂單1放入“1”這個key對應的佇列中,時間輪繼續向前執行。
噹噹前時間走到62s的時候,首先用62對60求餘為2,然後取出“2”對應槽的佇列,執行失效操作,如果這個時候來了一個訂單,用“當前時間”+60s = 122s,對60求餘,結果為2,然後就把這個訂單2放入,“2”這個key對應的佇列中,時間輪繼續向前執行。
然後時間輪還是走,當時間走到121s的時候,對60求餘,會取出“1”這個key對應佇列中的資料,就是訂單1,執行失效操作,可以看到這個訂單1放入的時間是61s過了60s被執行了。
當時間走到122s的時候,對60求餘,會取出“2”這個key對應佇列中的資料,就是訂單2,執行失效操作,可以看到這個訂單2,放入的時間是62s,過了60s被執行了。
可以看到時間輪每走一次獲取的訂單資料都是有效的,不會掃描所有的訂單資料。
如果訂單過期時間是120s,那麼這裡的刻度要是120個刻度,刻度和過期時間是對應的。
xxl-job中的時間輪應用scheduleThread:
主要實現從資料庫中讀取任務觸發時間小於now() + 5s的任務列表,然後遍歷這些任務列表。
如果遍歷到某個任務的時候發現 “當前時間” > “任務觸發時間”+5s,則跳過該任務。如果遍歷到某個任務的時候發現 “當前時間” < “任務觸發時間”+5s,則立刻觸發此任務,讓 後再檢查下次觸發時間是不是距離當前時間的5s內,即判斷“當前時間” + 5s > "下次任務觸發時間",如果是大於,則把該任務加入時間輪的佇列中。如果遍歷到某個任務的時候發現當前時間還沒到任務觸發時間,則把該任務加入時間輪的佇列中核心程式碼如下:
// schedule threadscheduleThread = new Thread(new Runnable() { @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 ); } catch (InterruptedException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } logger.info(">>>>>>>>> init xxl-job admin scheduler success."); // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20) int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20; while (!scheduleThreadToStop) { // Scan Job long start = System.currentTimeMillis(); Connection conn = null; Boolean connAutoCommit = null; PreparedStatement preparedStatement = null; boolean preReadSuc = true; try { conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection(); connAutoCommit = conn.getAutoCommit(); conn.setAutoCommit(false); preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" ); preparedStatement.execute(); // tx start // 1、從資料庫中讀出來未來5s內的任務資料列表 long nowTime = System.currentTimeMillis(); List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount); if (scheduleList!=null && scheduleList.size()>0) { // 2、push time-ring for (XxlJobInfo jobInfo: scheduleList) { // 當前時間可能會大於任務觸發時間,比如從資料庫中讀取任務資料然後遍歷到某個任務的時候 // 這個耗時時間可能超過了5s,這就會出現問題了,比如在61000時間戳這個時刻從資料庫中查 // 出來的某個任務的下次觸發時間戳是65000,當讀出來後到遍歷到這個任務耗時6000ms,當前時刻 // 時間戳就到了66000,超過了任務觸發時間。 // “當前時間” > “任務觸發時間”+5s if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) { // 2.1、trigger-expire > 5s:pass && make next-trigger-time logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId()); // 1、misfire match MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING); if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) { // FIRE_ONCE_NOW 》 trigger JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null); logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() ); } // 2、更新下次觸發時間 refreshNextValidTime(jobInfo, new Date()); } else if (nowTime > jobInfo.getTriggerNextTime()) { // “當前時間” < “任務觸發時間”+5s // 1、直接去觸發遠端執行器執行任務 JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null); logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() ); // 2、重新整理下次觸發時間 refreshNextValidTime(jobInfo, new Date()); // 當前時間” + 5s > "下次任務觸發時間",則把該任務加入時間輪的佇列中。 if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) { // 1、make ring second int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60); // 2、push time ring pushTimeRing(ringSecond, jobInfo.getId()); // 3、fresh next refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime())); } } else { // 當前時間還沒到任務觸發時間,則把該任務加入時間輪的佇列中 // 1、計算時間輪的刻度key int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60); // 2、把對應的任務放入到ringSecond對應的任務佇列中 pushTimeRing(ringSecond, jobInfo.getId()); // 3、更新下次觸發時間 refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime())); } } // 3、update trigger info for (XxlJobInfo jobInfo: scheduleList) { XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo); } } else { preReadSuc = false; } } catch (Exception e) { ... } finally { ... } long cost = System.currentTimeMillis()-start; // Wait seconds, align second if (cost < 1000) { // scan-overtime, not wait try { // pre-read period: success > scan each second; fail > skip this period; TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000); } catch (InterruptedException e) { if (!scheduleThreadToStop) { logger.error(e.getMessage(), e); } } } } logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop"); }});
ringThread:
實現了時間輪的核心程式碼,程式碼如下:
ringThread = new Thread(new Runnable() { @Override public void run() { // align second try { TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 ); } catch (InterruptedException e) { if (!ringThreadToStop) { logger.error(e.getMessage(), e); } } //輪詢 while (!ringThreadToStop) { try { // second data List<Integer> ringItemData = new ArrayList<>(); int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免處理耗時太長,跨過刻度,向前校驗一個刻度; for (int i = 0; i < 2; i++) { //從時間輪中取出某一刻度對應的資料並刪除該任務列表,這裡為了避免處理耗時太長, //取了當前刻度和前一個刻度時間槽對應的資料 List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 ); if (tmpData != null) { ringItemData.addAll(tmpData); } } // ring trigger logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) ); if (ringItemData.size() > 0) { // 遍歷取出的任務,並觸發執行(最終會透過rpc呼叫到執行器部分執行) for (int jobId: ringItemData) { // do trigger JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null); } // 清除該佇列中的資料 ringItemData.clear(); } } catch (Exception e) { if (!ringThreadToStop) { logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e); } } // 休眠,休眠時間對齊到秒,意思就是比如當前時間是400ms, // 然後透過1000 - System.currentTimeMillis()%1000計算後算出來是600,所以就休眠 // 600ms,主要是為了實現時間輪1s前進一次的效果。 try { TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000); } catch (InterruptedException e) { if (!ringThreadToStop) { logger.error(e.getMessage(), e); } } } logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop"); }});
xxl-job中的時間輪演算法可以看到xxl-job中的時間輪演算法也是比較有特點的,每次輪詢,它的生產資料端scheduleThread去資料庫中讀取未來5s內的任務列表,然後把它對映到對應的時間輪中,供後續的ringThread去消費。這個未來5s內應該是xxl-job開發者是有自己的考量,最終定的一個值。
但是注意讀取未來的時間間隔不能超過60s,如果超過60s,就會出問題了,可能會存在提前60s去執行某個任務,導致定時任務提前執行了,這是因為時間輪的刻度是60s。
xxl-job的這個時間輪演算法最終是可以支援任意的定時時間。
總結時間輪演算法可以說是時間定時任務的最佳解決方案了,它也是在很多開源專案中有應用,比如Netty、Akka、Quartz、ZooKeeper 、Kafka等元件中都存在時間輪的蹤影。