首頁>技術>

本篇文章釋出於luozhiyun的部落格: https://www.luozhiyun.com/archives/444

最近在工作中有一個需求,簡單來說就是在短時間內會建立上百萬個定時任務,建立的時候會將對應的金額相加,防止超售,需要過半個小時再去核對資料,如果資料對不上就需要將加上的金額再減回去。

對於時間輪來說,我以前寫過一篇java版的時間輪演算法分析: https://www.luozhiyun.com/archives/59,這次來看看Go語言的時間輪實現,順便大家有興趣的也可以對比一下兩者的區別,以及我寫文章的水平和一年多前有沒有提升,哈哈哈。

時間輪的運用其實是非常的廣泛的,在 Netty、Akka、Quartz、ZooKeeper、Kafka 等元件中都存在時間輪的蹤影。下面用Go實現的時間輪是以Kafka的程式碼為原型來實現的,完整程式碼: https://github.com/devYun/timingwheel。

介紹簡單時間輪

在時間輪中儲存任務的是一個環形佇列,底層採用陣列實現,陣列中的每個元素可以存放一個定時任務列表。定時任務列表是一個環形的雙向連結串列,連結串列中的每一項表示的都是定時任務項,其中封裝了真正的定時任務。

時間輪由多個時間格組成,每個時間格代表當前時間輪的基本時間跨度(tickMs)。時間輪的時間格個數是固定的,可用 wheelSize 來表示,那麼整個時間輪的總體時間跨度(interval)可以透過公式 tickMs×wheelSize 計算得出。

時間輪還有一個錶盤指標(currentTime),用來表示時間輪當前所處的時間,currentTime 是 tickMs 的整數倍。currentTime指向的地方是表示到期的時間格,表示需要處理的時間格所對應的連結串列中的所有任務。

如下圖是一個tickMs為1s,wheelSize等於10的時間輪,每一格里面放的是一個定時任務連結串列,連結串列裡面存有真正的任務項:

初始情況下表盤指標 currentTime 指向時間格0,若時間輪的 tickMs 為 1ms 且 wheelSize 等於10,那麼interval則等於10s。如下圖此時有一個定時為2s的任務插進來會存放到時間格為2的任務連結串列中,用紅色標記。隨著時間的不斷推移,指標 currentTime 不斷向前推進,如果過了2s,那麼 currentTime 會指向時間格2的位置,會將此時間格的任務連結串列獲取出來處理。

如果當前的指標 currentTime 指向的是2,此時如果插入一個9s的任務進來,那麼新來的任務會服用原來的時間格連結串列,會存放到時間格1中

這裡所講的時間輪都是簡單時間輪,只有一層,總體時間範圍在 currentTime 和 currentTime+interval 之間。如果現在有一個15s的定時任務是需要重新開啟一個時間輪,設定一個時間跨度至少為15s的時間輪才夠用。但是這樣擴充是沒有底線的,如果需要一個1萬秒的時間輪,那麼就需要一個這麼大的陣列去存放,不僅佔用很大的記憶體空間,而且也會因為需要遍歷這麼大的陣列從而拉低效率。

因此引入了層級時間輪的概念。

層級時間輪

如圖是一個兩層的時間輪,第二層時間輪也是由10個時間格組成,每個時間格的跨度是10s。第二層的時間輪的 tickMs 為第一層時間輪的 interval,即10s。每一層時間輪的 wheelSize 是固定的,都是10,那麼第二層的時間輪的總體時間跨度 interval 為100s。

圖中展示了每個時間格對應的過期時間範圍, 我們可以清晰地看到, 第二層時間輪的第0個時間格的過期時間範圍是 [0,9]。也就是說, 第二層時間輪的一個時間格就可以表示第一層時間輪的所有(10個)時間格;

如果向該時間輪中新增一個15s的任務,那麼當第一層時間輪容納不下時,進入第二層時間輪,並插入到過期時間為[10,19]的時間格中。

隨著時間的流逝,當原本15s的任務還剩下5s的時候,這裡就有一個時間輪降級的操作,此時第一層時間輪的總體時間跨度已足夠,此任務被新增到第一層時間輪到期時間為5的時間格中,之後再經歷5s後,此任務真正到期,最終執行相應的到期操作。

程式碼實現

因為我們這個Go語言版本的時間輪程式碼是仿照Kafka寫的,所以在具體實現時間輪 TimingWheel 時還有一些小細節:

時間輪的時間格中每個連結串列會有一個root節點用於簡化邊界條件。它是一個附加的連結串列節點,該節點作為第一個節點,它的值域中並不儲存任何東西,只是為了操作的方便而引入的;除了第一層時間輪,其餘高層時間輪的起始時間(startMs)都設定為建立此層時間輪時前面第一輪的 currentTime。每一層的 currentTime 都必須是 tickMs 的整數倍,如果不滿足則會將 currentTime 修剪為 tickMs 的整數倍。修剪方法為:currentTime = startMs - (startMs % tickMs);Kafka 中的定時器只需持有 TimingWheel 的第一層時間輪的引用,並不會直接持有其他高層的時間輪,但每一層時間輪都會有一個引用(overflowWheel)指向更高一層的應用;Kafka 中的定時器使用了 DelayQueue 來協助推進時間輪。在操作中會將每個使用到的時間格中每個連結串列都加入 DelayQueue,DelayQueue 會根據時間輪對應的過期時間 expiration 來排序,最短 expiration 的任務會被排在 DelayQueue 的隊頭,透過單獨執行緒來獲取 DelayQueue 中到期的任務;結構體
type TimingWheel struct {	// 時間跨度,單位是毫秒	tick      int64 // in milliseconds	// 時間輪個數	wheelSize int64	// 總跨度	interval    int64 // in milliseconds	// 當前指標指向時間	currentTime int64 // in milliseconds	// 時間格列表	buckets     []*bucket	// 延遲佇列	queue       *delayqueue.DelayQueue 	// 上級的時間輪引用	overflowWheel unsafe.Pointer // type: *TimingWheel	exitC     chan struct{}	waitGroup waitGroupWrapper}

tick、wheelSize、interval、currentTime都比較好理解,buckets欄位代表的是時間格列表,queue是一個延遲佇列,所有的任務都是透過延遲佇列來進行觸發,overflowWheel是上層時間輪的引用。

type bucket struct {	// 任務的過期時間	expiration int64	mu     sync.Mutex	// 相同過期時間的任務佇列	timers *list.List}

bucket裡面實際上封裝的是時間格里面的任務佇列,裡面放入的是相同過期時間的任務,到期後會將佇列timers拿出來進行處理。這裡有個有意思的地方是由於會有多個執行緒併發的訪問bucket,所以需要用到原子類來獲取int64位的值,為了保證32位系統上面讀取64位資料的一致性,需要進行64位對齊。具體的可以看這篇: https://www.luozhiyun.com/archives/429,講的是對記憶體對齊的思考。

type Timer struct {  // 到期時間	expiration int64 // in milliseconds  // 要被執行的具體任務	task       func()	// Timer所在bucket的指標	b unsafe.Pointer // type: *bucket	// bucket列表中對應的元素	element *list.Element}

Timer是時間輪的最小執行單元,是定時任務的封裝,到期後會呼叫task來執行任務。

初始化時間輪

例如現在初始化一個tick是1s,wheelSize是10的時間輪:

func main() {	tw := timingwheel.NewTimingWheel(time.Second, 10)	tw.Start() }func NewTimingWheel(tick time.Duration, wheelSize int64) *TimingWheel {  // 將傳入的tick轉化成毫秒	tickMs := int64(tick / time.Millisecond)  // 如果小於零,那麼panic	if tickMs <= 0 {		panic(errors.New("tick must be greater than or equal to 1ms"))	}	// 設定開始時間	startMs := timeToMs(time.Now().UTC())	// 初始化TimingWheel	return newTimingWheel(		tickMs,		wheelSize,		startMs,		delayqueue.New(int(wheelSize)),	)}func newTimingWheel(tickMs int64, wheelSize int64, startMs int64, queue *delayqueue.DelayQueue) *TimingWheel {  // 初始化buckets的大小	buckets := make([]*bucket, wheelSize)	for i := range buckets {		buckets[i] = newBucket()	}  // 例項化TimingWheel	return &TimingWheel{		tick:        tickMs,		wheelSize:   wheelSize,    // currentTime必須是tickMs的倍數,所以這裡使用truncate進行修剪		currentTime: truncate(startMs, tickMs),		interval:    tickMs * wheelSize,		buckets:     buckets,		queue:       queue,		exitC:       make(chan struct{}),	}}

初始化十分簡單,大家可以看看上面的程式碼註釋即可。

啟動時間輪

下面我們看看start方法:

func (tw *TimingWheel) Start() {	// Poll會執行一個無限迴圈,將到期的元素放入到queue的C管道中	tw.waitGroup.Wrap(func() {		tw.queue.Poll(tw.exitC, func() int64 {			return timeToMs(time.Now().UTC())		})	})	// 開啟無限迴圈獲取queue中C的資料	tw.waitGroup.Wrap(func() {		for {			select {			// 從佇列裡面出來的資料都是到期的bucket			case elem := <-tw.queue.C:				b := elem.(*bucket)				// 時間輪會將當前時間 currentTime 往前移動到 bucket的到期時間				tw.advanceClock(b.Expiration())				// 取出bucket佇列的資料,並呼叫addOrRun方法執行				b.Flush(tw.addOrRun)			case <-tw.exitC:				return			}		}	})}

這裡使用了util封裝的一個Wrap方法,這個方法會起一個goroutines非同步執行傳入的函式,具體的可以到我上面給出的連結去看原始碼。

Start方法會啟動兩個goroutines。第一個goroutines用來呼叫延遲佇列的queue的Poll方法,這個方法會一直迴圈獲取佇列裡面的資料,然後將到期的資料放入到queue的C管道中;第二個goroutines會無限迴圈獲取queue中C的資料,如果C中有資料表示已經到期,那麼會先呼叫advanceClock方法將當前時間 currentTime 往前移動到 bucket的到期時間,然後再呼叫Flush方法取出bucket中的佇列,並呼叫addOrRun方法執行。

func (tw *TimingWheel) advanceClock(expiration int64) {	currentTime := atomic.LoadInt64(&tw.currentTime)	// 過期時間大於等於(當前時間+tick)	if expiration >= currentTime+tw.tick {		// 將currentTime設定為expiration,從而推進currentTime		currentTime = truncate(expiration, tw.tick)		atomic.StoreInt64(&tw.currentTime, currentTime)		// Try to advance the clock of the overflow wheel if present		// 如果有上層時間輪,那麼遞迴呼叫上層時間輪的引用		overflowWheel := atomic.LoadPointer(&tw.overflowWheel)		if overflowWheel != nil {			(*TimingWheel)(overflowWheel).advanceClock(currentTime)		}	}}

advanceClock方法會根據到期時間來從新設定currentTime,從而推進時間輪前進。

func (b *bucket) Flush(reinsert func(*Timer)) {	var ts []*Timer	b.mu.Lock()	// 迴圈獲取bucket佇列節點	for e := b.timers.Front(); e != nil; {		next := e.Next()		t := e.Value.(*Timer)		// 將頭節點移除bucket佇列		b.remove(t)		ts = append(ts, t)		e = next	}	b.mu.Unlock()	b.SetExpiration(-1) // TODO: Improve the coordination with b.Add()	for _, t := range ts {		reinsert(t)	}}

Flush方法會根據bucket裡面timers列表進行遍歷插入到ts陣列中,然後呼叫reinsert方法,這裡是呼叫的addOrRun方法。

func (tw *TimingWheel) addOrRun(t *Timer) {	// 如果已經過期,那麼直接執行	if !tw.add(t) { 		// 非同步執行定時任務		go t.task()	}}

addOrRun會呼叫add方法檢查傳入的定時任務Timer是否已經到期,如果到期那麼非同步呼叫task方法直接執行。add方法我們下面會接著分析。

整個start執行流程如圖:

start方法回啟動一個goroutines呼叫poll來處理DelayQueue中到期的資料,並將資料放入到管道C中;start方法啟動第二個goroutines方法會迴圈獲取DelayQueue中管道C的資料,管道C中實際上存放的是一個bucket,然後遍歷bucket的timers列表,如果任務已經到期,那麼非同步執行,沒有到期則重新放入到DelayQueue中。add task
func main() {	tw := timingwheel.NewTimingWheel(time.Second, 10)	tw.Start() 	// 新增任務	tw.AfterFunc(time.Second*15, func() {		fmt.Println("The timer fires")		exitC <- time.Now().UTC()	})}

我們透過AfterFunc方法新增一個15s的定時任務,如果到期了,那麼執行傳入的函式。

func (tw *TimingWheel) AfterFunc(d time.Duration, f func()) *Timer {	t := &Timer{		expiration: timeToMs(time.Now().UTC().Add(d)),		task:       f,	}	tw.addOrRun(t)	return t}

AfterFunc方法回根據傳入的任務到期時間,以及到期需要執行的函式封裝成Timer,呼叫addOrRun方法。addOrRun方法我們上面已經看過了,會根據到期時間來決定是否需要執行定時任務。

下面我們來看一下add方法:

func (tw *TimingWheel) add(t *Timer) bool {	currentTime := atomic.LoadInt64(&tw.currentTime)	// 已經過期	if t.expiration < currentTime+tw.tick {		// Already expired		return false	// 	到期時間在第一層環內	} else if t.expiration < currentTime+tw.interval {		// Put it into its own bucket		// 獲取時間輪的位置		virtualID := t.expiration / tw.tick		b := tw.buckets[virtualID%tw.wheelSize]		// 將任務放入到bucket佇列中		b.Add(t) 		// 如果是相同的時間,那麼返回false,防止被多次插入到佇列中		if b.SetExpiration(virtualID * tw.tick) { 			// 將該bucket加入到延遲佇列中			tw.queue.Offer(b, b.Expiration())		}		return true	} else {		// Out of the interval. Put it into the overflow wheel		// 如果放入的到期時間超過第一層時間輪,那麼放到上一層中去		overflowWheel := atomic.LoadPointer(&tw.overflowWheel)		if overflowWheel == nil {			atomic.CompareAndSwapPointer(				&tw.overflowWheel,				nil,				// 需要注意的是,這裡tick變成了interval				unsafe.Pointer(newTimingWheel(					tw.interval,					tw.wheelSize,					currentTime,					tw.queue,				)),			)			overflowWheel = atomic.LoadPointer(&tw.overflowWheel)		}		// 往上遞迴		return (*TimingWheel)(overflowWheel).add(t)	}}

add方法根據到期時間來分成了三部分,第一部分是小於當前時間+tick,表示已經到期,那麼返回false執行任務即可;

第二部分的判斷會根據expiration是否小於時間輪的跨度,如果小於的話表示該定時任務可以放入到當前時間輪中,透過取模找到buckets對應的時間格並放入到bucket佇列中,SetExpiration方法會根據傳入的引數來判斷是否已經執行過延遲佇列的Offer方法,防止重複插入;

第三部分表示該定時任務的時間跨度超過了當前時間輪,需要升級到上一層的時間輪中。需要注意的是,上一層的時間輪的tick是當前時間輪的interval,延遲佇列還是同一個,然後設定為指標overflowWheel,並呼叫add方法往上層遞迴。

到這裡時間輪已經講完了,不過還有需要注意的地方,我們在用上面的時間輪實現中,使用了DelayQueue加環形佇列的方式實現了時間輪。對定時任務項的插入和刪除操作而言,TimingWheel時間複雜度為 O(1),在DelayQueue中的佇列使用的是優先佇列,時間複雜度是O(log n),但是由於buckets列表實際上是非常小的,所以並不會影響效能。

Reference

https://github.com/RussellLuo/timingwheel

https://zhuanlan.zhihu.com/p/121483218

https://github.com/apache/kafka/tree/3cdc78e6bb1f83973a14ce1550fe3874f7348b05/core/src/main/scala/kafka/utils/timer

8
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • BookKeeper 原理淺談