首頁>技術>

原文:https://juejin.cn/post/6906677772479889422

什麼是訊號量

訊號量是併發程式設計中常見的一種同步機制,在需要控制訪問資源的執行緒數量時就會用到訊號量,關於什麼是訊號量這個問題,我引用一下維基百科對訊號量的解釋,大家就明白了。

訊號量的概念是計算機科學家 Dijkstra (Dijkstra演算法的發明者)提出來的,廣泛應用在不同的作業系統中。系統中,會給每一個程序一個訊號量,代表每個程序當前的狀態,未得到控制權的程序,會在特定的地方被迫停下來,等待可以繼續進行的訊號到來。

如果訊號量是一個任意的整數,通常被稱為計數訊號量(Counting semaphore),或一般訊號量(general semaphore);如果訊號量只有二進位制的0或1,稱為二進位制訊號量(binary semaphore)。在linux系統中,二進位制訊號量(binary semaphore)又稱 互斥鎖 (Mutex)

計數訊號量具備兩種操作動作,稱為V( signal() )與P( wait() )(即部分參考書常稱的“PV操作”)。V操作會增加訊號量S的數值,P操作會減少它。

執行方式:

初始化訊號量,給與它一個非負數的整數值。執行P( wait() ),訊號量S的值將被減少。企圖進入 臨界區 的程序,需要先執行P( wait() )。當訊號量S減為負值時,程序會被阻塞住,不能繼續;當訊號量S不為負值時,程序可以獲准進入臨界區。執行V( signal() ),訊號量S的值會被增加。結束離開 臨界區段 的程序,將會執行V( signal() )。當訊號量S不為負值時,先前被阻塞住的其他程序,將可獲准進入 臨界區 。

我們一般用訊號量保護一組資源,比如資料庫連線池、一組客戶端的連線等等。**每次獲取資源時都會將訊號量中的計數器減去對應的數值,在釋放資源時重新加回來。當訊號量沒資源時嘗試獲取訊號量的執行緒就會進入休眠,等待其他執行緒釋放訊號量。如果訊號量是隻有0和1的二進位訊號量,那麼,它的 P/V 就和互斥鎖的 Lock/Unlock 就一樣了。

Go語言中的訊號量表示

Go 內部使用訊號量來控制 goroutine 的阻塞和喚醒,比如互斥鎖 sync.Mutex 結構體定義的第二個欄位就是一個訊號量。

type Mutex struct {    state int32    sema  uint32}

訊號量的PV操作在 Go 內部是透過下面這幾個底層函式實現的

func runtime_Semacquire(s *uint32)func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)func runtime_Semrelease(s *uint32, handoff bool, skipframes int)

上面幾個函式都是 Go 語言內部使用的,我們不能在程式設計時直接使用。不過 Go 語言的擴充套件併發原語包中提供了帶權重的訊號量 semaphore.Weighted

使用訊號量前,需先在專案裡安裝 golang.org/x/sync/ 包

安裝方法: go get -u golang.org/x/sync

我們可以按照不同的權重對資源的訪問進行管理,這個結構體對外提供了四個方法:

semaphore.NewWeighted用於建立新的訊號量,透過引數(n int64) 指定訊號量的初始值。semaphore.Weighted.Acquire阻塞地獲取指定權重的資源,如果當前沒有空閒資源,就會陷入休眠等待;相當於 P 操作,你可以一次獲取多個資源,如果沒有足夠多的資源,呼叫者就會被阻塞。它的第一個引數是 Context,這就意味著,你可以透過 Context 增加超時或者 cancel 的機制。如果是正常獲取了資源,就返回 nil ;否則,就返回 ctx.Err() ,訊號量不改變。semaphore.Weighted.Release用於釋放指定權重的資源;相當於 V 操作,可以將 n 個資源釋放,返還給訊號量。semaphore.Weighted.TryAcquire非阻塞地獲取指定權重的資源,如果當前沒有空閒資源,就會直接返回 false ;在Go程式設計裡使用訊號量

在實際應用 Go 語言開發程式時,有哪些場景適合使用訊號量呢? 在需要控制訪問資源的執行緒數量時就會需要訊號量 ,我來舉個例子幫助你理解。假設我們有一組要抓取的頁面,資源有限最多允許我們同時執行三個抓取任務,當同時有三個抓取任務在執行時,在執行完一個抓取任務後才能執行下一個排隊等待的任務。當然這個問題用Channel也能解決,不過這次我們使用Go提供的訊號量原語來解決這個問題,程式碼如下:

package mainimport (    "context"    "fmt"    "sync"    "time"    "golang.org/x/sync/semaphore")func doSomething(u string) {// 模擬抓取任務的執行    fmt.Println(u)    time.Sleep(2 * time.Second)}const (    Limit  = 3 // 同時並行執行的goroutine上限    Weight = 1 // 每個goroutine獲取訊號量資源的權重)func main() {    urls := []string{        "http://www.example.com",        "http://www.example.net",        "http://www.example.net/foo",        "http://www.example.net/bar",        "http://www.example.net/baz",    }    s := semaphore.NewWeighted(Limit)    var w sync.WaitGroup    for _, u := range urls {        w.Add(1)        go func(u string) {            s.Acquire(context.Background(), Weight)            doSomething(u)            s.Release(Weight)            w.Done()        }(u)    }    w.Wait()        fmt.Println("All Done")}
Go語言訊號量的實現原理

Go 語言 擴充套件庫中的訊號量是使用互斥鎖和List 實現的。互斥鎖實現其它欄位的保護,而 List 實現了一個等待佇列,等待者的通知是透過 Channel 的通知機制實現的。

訊號量的資料結構

我們來看一下訊號量 semaphore.Weighted 的資料結構:

type Weighted struct {    size    int64         // 最大資源數    cur     int64         // 當前已被使用的資源    mu      sync.Mutex    // 互斥鎖,對欄位的保護    waiters list.List     // 等待佇列}複製程式碼
sizecurmuwaiters
Acquire請求訊號量資源

Acquire 方法會監控資源是否可用,而且還要檢測傳遞進來的 context.Context 物件是否傳送了超時過期或者取消的訊號,我們來看一下它的程式碼實現:

func (s *Weighted) Acquire(ctx context.Context, n int64) error {    s.mu.Lock()    // 如果恰好有足夠的資源,也沒有排隊等待獲取資源的goroutine,    // 將cur加上n後直接返回    if s.size-s.cur >= n && s.waiters.Len() == 0 {      s.cur += n      s.mu.Unlock()      return nil    }      // 請求的資源數大於能提供的最大的資源數    // 這個任務處理不了,走錯誤處理邏輯    if n > s.size {      s.mu.Unlock()      // 依賴ctx的狀態返回,否則一直等待      <-ctx.Done()      return ctx.Err()    }    // 現存資源不夠, 需要把呼叫者加入到等待佇列中    // 建立了一個ready chan,以便被通知喚醒    ready := make(chan struct{})    w := waiter{n: n, ready: ready}    elem := s.waiters.PushBack(w)    s.mu.Unlock()      // 等待    select {    case <-ctx.Done(): // context的Done被關閉      err := ctx.Err()      s.mu.Lock()      select {      case <-ready: // 如果被喚醒了,忽略ctx的狀態        err = nil      default: // 通知waiter        isFront := s.waiters.Front() == elem        s.waiters.Remove(elem)        // 通知其它的waiters,檢查是否有足夠的資源        if isFront && s.size > s.cur {          s.notifyWaiters()        }      }      s.mu.Unlock()      return err    case <-ready: // 等待者被喚醒了      return nil    }  }

如果呼叫者請求不到訊號量的資源就會被加入等待者列表裡,這裡等待者列表的結構體定義是:

type waiter struct {	n     int64	ready chan<- struct{} // 當呼叫者可以獲取到訊號量資源時, close調這個chan}複製程式碼

包含了兩個欄位,呼叫者請求的資源數,以及一個ready 通道。ready通道會在呼叫者可以被重新喚醒的時候被 close 調,從而起到通知正在阻塞讀取ready通道的等待者的作用。

NotifyWaiters 通知等待者

notifyWaiters 方法會逐個檢查佇列裡等待的呼叫者,如果現存資源夠等待者請求的數量n,或者是沒有等待者了,就返回:

func (s *Weighted) notifyWaiters() {    for {      next := s.waiters.Front()      if next == nil {        break // 沒有等待者了,直接返回      }        w := next.Value.(waiter)      if s.size-s.cur < w.n {        // 如果現有資源不夠佇列頭呼叫者請求的資源數,就退出所有等待者會繼續等待        // 這裡還是按照先入先出的方式處理是為了避免飢餓        break      }      s.cur += w.n      s.waiters.Remove(next)      close(w.ready)    }  }

notifyWaiters 方法是按照先入先出的方式喚醒呼叫者。當釋放 100 個資源的時候,如果第一個等待者需要 101 個資源,那麼,佇列中的所有等待者都會繼續等待,即使佇列後面有的等待者只需要 1 個資源。這樣做的目的是避免飢餓,否則的話,資源可能總是被那些請求資源數小的呼叫者獲取,這樣一來,請求資源數巨大的呼叫者,就沒有機會獲得資源了。

Release歸還訊號量資源

Release 方法就很簡單了,它將當前計數值減去釋放的資源數 n,並呼叫 notifyWaiters 方法,嘗試喚醒等待佇列中的呼叫者,看是否有足夠的資源被獲取。

func (s *Weighted) Release(n int64) {    s.mu.Lock()    s.cur -= n    if s.cur < 0 {      s.mu.Unlock()      panic("semaphore: released more than held")    }    s.notifyWaiters()    s.mu.Unlock()}
總結

在 Go 語言中訊號量有時候也會被 Channel 型別所取代,因為一個 buffered chan 也可以代表 n 個資源。不過既然 Go 語言透過 golang.orgx/sync 擴充套件庫對外提供了 semaphore.Weight 這一種訊號量實現,遇到使用訊號量的場景時還是儘量使用官方提供的實現。在使用的過程中我們需要注意以下的幾個問題:

Acquire 和 TryAcquire 方法都可以用於獲取資源,前者會阻塞地獲取訊號量。後者會非阻塞地獲取訊號量,如果獲取不到就返回 false 。Release 歸還訊號量後,會以先進先出的順序喚醒等待佇列中的呼叫者。如果現有資源不夠處於等待佇列前面的呼叫者請求的資源數,所有等待者會繼續等待。如果一個 goroutine 申請較多的資源,由於上面說的歸還後喚醒等待者的策略,它可能會等待比較長的時間。

作者:kevinyan

原文:https://juejin.cn/post/6906677772479889422

17
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • Spring Boot 2.4 對多環境配置的支援更改