原文:https://juejin.cn/post/6906677772479889422
什麼是訊號量訊號量是併發程式設計中常見的一種同步機制,在需要控制訪問資源的執行緒數量時就會用到訊號量,關於什麼是訊號量這個問題,我引用一下維基百科對訊號量的解釋,大家就明白了。
訊號量的概念是計算機科學家 Dijkstra (Dijkstra演算法的發明者)提出來的,廣泛應用在不同的作業系統中。系統中,會給每一個程序一個訊號量,代表每個程序當前的狀態,未得到控制權的程序,會在特定的地方被迫停下來,等待可以繼續進行的訊號到來。
如果訊號量是一個任意的整數,通常被稱為計數訊號量(Counting semaphore),或一般訊號量(general semaphore);如果訊號量只有二進位制的0或1,稱為二進位制訊號量(binary semaphore)。在linux系統中,二進位制訊號量(binary semaphore)又稱 互斥鎖 (Mutex)
計數訊號量具備兩種操作動作,稱為V( signal() )與P( wait() )(即部分參考書常稱的“PV操作”)。V操作會增加訊號量S的數值,P操作會減少它。
執行方式:
初始化訊號量,給與它一個非負數的整數值。執行P( wait() ),訊號量S的值將被減少。企圖進入 臨界區 的程序,需要先執行P( wait() )。當訊號量S減為負值時,程序會被阻塞住,不能繼續;當訊號量S不為負值時,程序可以獲准進入臨界區。執行V( signal() ),訊號量S的值會被增加。結束離開 臨界區段 的程序,將會執行V( signal() )。當訊號量S不為負值時,先前被阻塞住的其他程序,將可獲准進入 臨界區 。我們一般用訊號量保護一組資源,比如資料庫連線池、一組客戶端的連線等等。**每次獲取資源時都會將訊號量中的計數器減去對應的數值,在釋放資源時重新加回來。當訊號量沒資源時嘗試獲取訊號量的執行緒就會進入休眠,等待其他執行緒釋放訊號量。如果訊號量是隻有0和1的二進位訊號量,那麼,它的 P/V 就和互斥鎖的 Lock/Unlock 就一樣了。
Go語言中的訊號量表示Go 內部使用訊號量來控制 goroutine 的阻塞和喚醒,比如互斥鎖 sync.Mutex 結構體定義的第二個欄位就是一個訊號量。
type Mutex struct { state int32 sema uint32}
訊號量的PV操作在 Go 內部是透過下面這幾個底層函式實現的
func runtime_Semacquire(s *uint32)func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
上面幾個函式都是 Go 語言內部使用的,我們不能在程式設計時直接使用。不過 Go 語言的擴充套件併發原語包中提供了帶權重的訊號量 semaphore.Weighted
使用訊號量前,需先在專案裡安裝 golang.org/x/sync/ 包
安裝方法: go get -u golang.org/x/sync
我們可以按照不同的權重對資源的訪問進行管理,這個結構體對外提供了四個方法:
semaphore.NewWeighted用於建立新的訊號量,透過引數(n int64) 指定訊號量的初始值。semaphore.Weighted.Acquire阻塞地獲取指定權重的資源,如果當前沒有空閒資源,就會陷入休眠等待;相當於 P 操作,你可以一次獲取多個資源,如果沒有足夠多的資源,呼叫者就會被阻塞。它的第一個引數是 Context,這就意味著,你可以透過 Context 增加超時或者 cancel 的機制。如果是正常獲取了資源,就返回 nil ;否則,就返回 ctx.Err() ,訊號量不改變。semaphore.Weighted.Release用於釋放指定權重的資源;相當於 V 操作,可以將 n 個資源釋放,返還給訊號量。semaphore.Weighted.TryAcquire非阻塞地獲取指定權重的資源,如果當前沒有空閒資源,就會直接返回 false ;在Go程式設計裡使用訊號量在實際應用 Go 語言開發程式時,有哪些場景適合使用訊號量呢? 在需要控制訪問資源的執行緒數量時就會需要訊號量 ,我來舉個例子幫助你理解。假設我們有一組要抓取的頁面,資源有限最多允許我們同時執行三個抓取任務,當同時有三個抓取任務在執行時,在執行完一個抓取任務後才能執行下一個排隊等待的任務。當然這個問題用Channel也能解決,不過這次我們使用Go提供的訊號量原語來解決這個問題,程式碼如下:
package mainimport ( "context" "fmt" "sync" "time" "golang.org/x/sync/semaphore")func doSomething(u string) {// 模擬抓取任務的執行 fmt.Println(u) time.Sleep(2 * time.Second)}const ( Limit = 3 // 同時並行執行的goroutine上限 Weight = 1 // 每個goroutine獲取訊號量資源的權重)func main() { urls := []string{ "http://www.example.com", "http://www.example.net", "http://www.example.net/foo", "http://www.example.net/bar", "http://www.example.net/baz", } s := semaphore.NewWeighted(Limit) var w sync.WaitGroup for _, u := range urls { w.Add(1) go func(u string) { s.Acquire(context.Background(), Weight) doSomething(u) s.Release(Weight) w.Done() }(u) } w.Wait() fmt.Println("All Done")}
Go語言訊號量的實現原理Go 語言 擴充套件庫中的訊號量是使用互斥鎖和List 實現的。互斥鎖實現其它欄位的保護,而 List 實現了一個等待佇列,等待者的通知是透過 Channel 的通知機制實現的。
訊號量的資料結構我們來看一下訊號量 semaphore.Weighted 的資料結構:
type Weighted struct { size int64 // 最大資源數 cur int64 // 當前已被使用的資源 mu sync.Mutex // 互斥鎖,對欄位的保護 waiters list.List // 等待佇列}複製程式碼
sizecurmuwaiters
Acquire請求訊號量資源
Acquire 方法會監控資源是否可用,而且還要檢測傳遞進來的 context.Context 物件是否傳送了超時過期或者取消的訊號,我們來看一下它的程式碼實現:
func (s *Weighted) Acquire(ctx context.Context, n int64) error { s.mu.Lock() // 如果恰好有足夠的資源,也沒有排隊等待獲取資源的goroutine, // 將cur加上n後直接返回 if s.size-s.cur >= n && s.waiters.Len() == 0 { s.cur += n s.mu.Unlock() return nil } // 請求的資源數大於能提供的最大的資源數 // 這個任務處理不了,走錯誤處理邏輯 if n > s.size { s.mu.Unlock() // 依賴ctx的狀態返回,否則一直等待 <-ctx.Done() return ctx.Err() } // 現存資源不夠, 需要把呼叫者加入到等待佇列中 // 建立了一個ready chan,以便被通知喚醒 ready := make(chan struct{}) w := waiter{n: n, ready: ready} elem := s.waiters.PushBack(w) s.mu.Unlock() // 等待 select { case <-ctx.Done(): // context的Done被關閉 err := ctx.Err() s.mu.Lock() select { case <-ready: // 如果被喚醒了,忽略ctx的狀態 err = nil default: // 通知waiter isFront := s.waiters.Front() == elem s.waiters.Remove(elem) // 通知其它的waiters,檢查是否有足夠的資源 if isFront && s.size > s.cur { s.notifyWaiters() } } s.mu.Unlock() return err case <-ready: // 等待者被喚醒了 return nil } }
如果呼叫者請求不到訊號量的資源就會被加入等待者列表裡,這裡等待者列表的結構體定義是:
type waiter struct { n int64 ready chan<- struct{} // 當呼叫者可以獲取到訊號量資源時, close調這個chan}複製程式碼
包含了兩個欄位,呼叫者請求的資源數,以及一個ready 通道。ready通道會在呼叫者可以被重新喚醒的時候被 close 調,從而起到通知正在阻塞讀取ready通道的等待者的作用。
NotifyWaiters 通知等待者notifyWaiters 方法會逐個檢查佇列裡等待的呼叫者,如果現存資源夠等待者請求的數量n,或者是沒有等待者了,就返回:
func (s *Weighted) notifyWaiters() { for { next := s.waiters.Front() if next == nil { break // 沒有等待者了,直接返回 } w := next.Value.(waiter) if s.size-s.cur < w.n { // 如果現有資源不夠佇列頭呼叫者請求的資源數,就退出所有等待者會繼續等待 // 這裡還是按照先入先出的方式處理是為了避免飢餓 break } s.cur += w.n s.waiters.Remove(next) close(w.ready) } }
notifyWaiters 方法是按照先入先出的方式喚醒呼叫者。當釋放 100 個資源的時候,如果第一個等待者需要 101 個資源,那麼,佇列中的所有等待者都會繼續等待,即使佇列後面有的等待者只需要 1 個資源。這樣做的目的是避免飢餓,否則的話,資源可能總是被那些請求資源數小的呼叫者獲取,這樣一來,請求資源數巨大的呼叫者,就沒有機會獲得資源了。
Release歸還訊號量資源Release 方法就很簡單了,它將當前計數值減去釋放的資源數 n,並呼叫 notifyWaiters 方法,嘗試喚醒等待佇列中的呼叫者,看是否有足夠的資源被獲取。
func (s *Weighted) Release(n int64) { s.mu.Lock() s.cur -= n if s.cur < 0 { s.mu.Unlock() panic("semaphore: released more than held") } s.notifyWaiters() s.mu.Unlock()}
總結
在 Go 語言中訊號量有時候也會被 Channel 型別所取代,因為一個 buffered chan 也可以代表 n 個資源。不過既然 Go 語言透過 golang.orgx/sync 擴充套件庫對外提供了 semaphore.Weight 這一種訊號量實現,遇到使用訊號量的場景時還是儘量使用官方提供的實現。在使用的過程中我們需要注意以下的幾個問題:
Acquire 和 TryAcquire 方法都可以用於獲取資源,前者會阻塞地獲取訊號量。後者會非阻塞地獲取訊號量,如果獲取不到就返回 false 。Release 歸還訊號量後,會以先進先出的順序喚醒等待佇列中的呼叫者。如果現有資源不夠處於等待佇列前面的呼叫者請求的資源數,所有等待者會繼續等待。如果一個 goroutine 申請較多的資源,由於上面說的歸還後喚醒等待者的策略,它可能會等待比較長的時間。作者:kevinyan
原文:https://juejin.cn/post/6906677772479889422