從C++切換到Go語言一年多了,有必要深入瞭解一下Go語言內建資料結構的實現原理,本文結合示例與Go原始碼深入到Go語言的底層實現。
陣列定義陣列是切片和對映的基礎資料結構。
陣列是一個長度固定的資料型別,用於儲存一段具有相同型別元素的連續塊。
宣告與初始化var arr [5]int //宣告一個包含五個元素的陣列arr := [5]int{1, 2, 3, 4, 5} //字面量宣告arr := [...]int{1, 2, 3, 4, 5} //編譯期自動推導陣列長度arr := [5]int{1:10,3:30} //初始化索引為1和3的元素,其他預設值為0
陣列的使用arr := [5]int{10, 20, 30, 40, 50}arr[2] = 35 //修改索引為2的元素的值var arr1 [3]stringarr2 := [3]string{"red", "yellow", "green"}arr1 = arr2 //陣列複製(深複製)var arr3 [4]stringarr3 = arr2 //error 陣列個數不同var arr [4][2]int //多維陣列arr[1][0] = 20arr[0] = [2]int{1, 2}
函式間傳遞陣列
var arr [1e6]intfunc foo1(arr [1e6]int) { //每次複製整個陣列 ...}func foo2(arr *[1e6]int) { //傳遞指標,效率更高 ...}
切片
切片是一種資料結構,便於使用和管理資料集合。切片是圍繞動態陣列的概念構建的,可以按需自動增長和縮小。
內部實現type SliceHeader struct { Data uintptr Len int Cap int}
編譯期間的切片是 Slice 型別的,但是在執行時切片由 SliceHeader結構體表示,其中Data 欄位是指向陣列的指標,Len表示當前切片的長度,而 Cap表示當前切片的容量,也就是Data陣列的大小。
建立和初始化slice := make([]string, 5) //建立一個字串切片,長度和容量都是5個元素slice := make([]int, 3, 5) //建立一個整型切片,長度為3個元素,容量為5個元素slice := make([]string, 5, 3) //error 容量需要小於長度slice := []string{"Red", "Yellow", "Green"} //使用字面量宣告var slice []int //建立nil整型切片slice := make([]int, 0) //使用make建立空的整型切片slice := []int{} //使用切片字面量建立空的整型切片不管是使用nil切片還是空切片,對其呼叫內建函式append、len、cap效果都一樣,儘量使用len函式來判斷切片是否為空
賦值和切片
slice := []int{1, 2, 3, 4, 5} //建立一個整型切片,長度和容量都是5slice[2] = 30 //修改索引為2的元素的值,slice當前結果為{1, 2, 30, 4, 5}newSlice := slice[1:3] //建立一個新切片,長度為2個元素,容量為4個元素對底層陣列容量是k的切片slice[i:j]來說:長度:j - i容量:k - inewSlice[0] = 20 //由於newSlice與slice底層共享一個數組,因此slice當前結果為{1, 20, 30, 4, 5}newSlice[3] = 40 //error 雖然容量足夠,但超過長度限制,會觸發panic
切片增長slice := []int{1, 2, 3, 4, 5}newSlice := slice[1:3] //cap(newSlice) : 4 newSlice = append(newSlice, 60) // slice當前結果為{1, 2, 3, 60, 5} //newSlice的append操作影響了slice的結果,可以嘗試 newSlice := slice[1:3:3]再看看結果newSlice = append(newSlice, []int{70, 80}...) //slice當前結果仍為{1, 2, 3, 60, 5} len(newSlice) : 5 cap(newSlice) : 8func growslice(et *_type, old slice, cap int) slice { ... newcap := old.cap doublecap := newcap + newcap if cap > doublecap { newcap = cap } else { if old.cap < 1024 { newcap = doublecap } else { for 0 < newcap && newcap < cap { newcap += newcap / 4 } if newcap <= 0 { newcap = cap } } } ...}
在分配記憶體空間之前需要先確定新的切片容量,Go 語言根據切片的當前容量選擇不同的策略進行擴容:
如果期望容量大於當前容量的兩倍就會使用期望容量; 如果當前切片的長度小於 1024 就會將容量翻倍; 如果當前切片的長度大於 1024 就會每次增加 25% 的容量,直到新容量大於期望容量; 這裡只是確定切片的大致容量,接下來還需要根據切片中元素的大小對它們進行對齊,當陣列中元素所佔的位元組大小為 1、8 或者 2 的倍數時,執行時會使用如下所示的程式碼對其記憶體
func growslice(et *_type, old slice, cap int) slice { ... var overflow bool var lenmem, newlenmem, capmem uintptr switch { case et.size == 1: lenmem = uintptr(old.len) newlenmem = uintptr(cap) capmem = roundupsize(uintptr(newcap)) overflow = uintptr(newcap) > maxAlloc newcap = int(capmem) case et.size == sys.PtrSize: lenmem = uintptr(old.len) * sys.PtrSize newlenmem = uintptr(cap) * sys.PtrSize capmem = roundupsize(uintptr(newcap) * sys.PtrSize) overflow = uintptr(newcap) > maxAlloc/sys.PtrSize newcap = int(capmem / sys.PtrSize) case isPowerOfTwo(et.size): var shift uintptr if sys.PtrSize == 8 { // Mask shift for better code generation. shift = uintptr(sys.Ctz64(uint64(et.size))) & 63 } else { shift = uintptr(sys.Ctz32(uint32(et.size))) & 31 } lenmem = uintptr(old.len) << shift newlenmem = uintptr(cap) << shift capmem = roundupsize(uintptr(newcap) << shift) overflow = uintptr(newcap) > (maxAlloc >> shift) newcap = int(capmem >> shift) default: lenmem = uintptr(old.len) * et.size newlenmem = uintptr(cap) * et.size capmem, overflow = math.MulUintptr(et.size, uintptr(newcap)) capmem = roundupsize(capmem) newcap = int(capmem / et.size) } ...}
runtime.roundupsize函式會將待申請的記憶體向上取整,取整時會使用記憶體分配中介紹的runtime.class_to_size陣列,使用該陣列中的整數可以提高記憶體的分配效率並減少碎片。
func growslice(et *_type, old slice, cap int) slice { ... if overflow || capmem > maxAlloc { panic(errorString("growslice: cap out of range")) } var p unsafe.Pointer if et.ptrdata == 0 { p = mallocgc(capmem, nil, false) memclrNoHeapPointers(add(p, newlenmem), capmem-newlenmem) } else { p = mallocgc(capmem, et, true) if lenmem > 0 && writeBarrier.enabled { bulkBarrierPreWriteSrcOnly(uintptr(p), uintptr(old.array), lenmem-et.size+et.ptrdata) } } memmove(p, old.array, lenmem) return slice{p, old.len, newcap}}
如果切片中元素不是指標型別,那麼就會呼叫memclrNoHeapPointers將超出切片當前長度的位置清空並在最後使用memmove將原陣列記憶體中的內容複製到新申請的記憶體中。
runtime.growslice函式最終會返回一個新的 slice 結構,其中包含了新的陣列指標、大小和容量,這個返回的三元組最終會改變原有的切片,幫助 append 完成元素追加的功能。
示例:
var arr []int64arr = append(arr, 1, 2, 3, 4, 5)fmt.Println(cap(arr))
當我們執行如上所示的程式碼時,會觸發 runtime.growslice 函式擴容 arr 切片並傳入期望的新容量 5,這時期望分配的記憶體大小為 40 位元組;不過因為切片中的元素大小等於 sys.PtrSize,所以執行時會呼叫 runtime.roundupsize 對記憶體的大小向上取整 48 位元組,所以經過計算新切片的容量為 48 / 8 = 6 。
複製切片切片的複製雖然不是一個常見的操作型別,但是卻是我們學習切片實現原理必須要談及的一個問題,當我們使用 copy(a, b) 的形式對切片進行複製時,編譯期間的 cmd/compile/internal/gc.copyany 函式也會分兩種情況進行處理,如果當前 copy 不是在執行時呼叫的,copy(a, b) 會被直接轉換成下面的程式碼:
n := len(a)if n > len(b) { n = len(b)}if a.ptr != b.ptr { memmove(a.ptr, b.ptr, n*sizeof(elem(a))) }
其中 memmove 會負責對記憶體進行複製,在其他情況下,編譯器會使用 runtime.slicecopy 函式替換執行期間呼叫的 copy,例如:go copy(a, b):
func slicecopy(to, fm slice, width uintptr) int { if fm.len == 0 || to.len == 0 { return 0 } n := fm.len if to.len < n { n = to.len } if width == 0 { return n } ... size := uintptr(n) * width if size == 1 { *(*byte)(to.array) = *(*byte)(fm.array) } else { memmove(to.array, fm.array, size) } return n}
上述函式的實現非常直接,兩種不同的複製方式一般都會透過 memmove將整塊記憶體中的內容複製到目標的記憶體區域中。
迭代切片slice := []int{1, 2, 3, 4, 5}for index, value := range slice { value += 10 fmt.Printf("index:%d value:%d\n", index, value)}fmt.Println(slice) // {1, 2, 3, 4, 5}==var value intfor index := 0; index < len(slice); index ++ { value = slice[index] }
函式間傳遞切片slice := make([]int, 1e6)slice = foo(slice)func foo(slice []int) []int { ... return slice}
在64位架構的機器上,一個切片需要24位元組記憶體,陣列指標、長度、容量分別佔用8位元組,在函式間傳遞24位元組資料會非常快速簡單,這也是切片效率高的原因。
對映對映是一種資料結構,用於儲存一系列無序的鍵值對。
對映是一個集合,可以使用類似處理陣列和切片的方式迭代,但對映是無序的集合,每次迭代對映的時候順序也可能不一樣。
建立和初始化dict := make(map[string]int) dict := map[string]string{"Red":"#da1337", "Orange":"#e95a22"} //字面量方式
使用對映
colors := make(map[string]string) //建立一個空對映colors["Red"] = "#da1337" //賦值var colors map[string]stringcolors["Red"] = "#da1337" //errorvalue, ok := colors["Blue"] //判斷鍵是否存在if ok { fmt.Println(value)}value := colors["Blue"] //判斷讀取到的值是否空值if value != "" { fmt.Println(value)}
函式間傳遞對映func removeColor(colors map[string]string, key string) { delete(colors, key)}func main(){ colors := map[string]string{ "AliceBlue" : "#f0f8ff", "Coral" : "#ff7F50", } for key, value := range colors { fmt.Printf("Key: %s Value: %s\n", key, value) } removeColor(colors, "Coral") for key, value := range colors { fmt.Printf("Key: %s Value: %s\n", key, value) }}
記憶體模型
Go語言採用的是雜湊查詢表,並且使用連結串列解決雜湊衝突。
// hashmap的簡稱type hmap struct { count int //元素個數 flags uint8 //標記位 B uint8 //buckets的對數 log_2 noverflow uint16 //overflow的bucket的近似數 hash0 uint32 //hash種子 buckets unsafe.Pointer //指向buckets陣列的指標,陣列個數為2^B oldbuckets unsafe.Pointer //擴容時使用,buckets長度是oldbuckets的兩倍 nevacuate uintptr //擴容進度,小於此地址的buckets已經遷移完成 extra *mapextra //擴充套件資訊}//當map的key和value都不是指標,並且size都小於128位元組的情況下,會把 bmap 標記為不含指標,這樣可以避免gc時掃描整個hmap。但是,我們看bmap其實有一個overflow的欄位,是指標型別的,破壞了bmap不含指標的設想,這時會把overflow移動到extra欄位來。type mapextra struct { overflow *[]*bmap oldoverflow *[]*bmap nextOverflow *bmap}// buckettype bmap struct { tophash [bucketCnt]uint8 //bucketCnt = 8 // keys [8]keytype // values [8]valuetype // pad uintptr // overflow uintptr}
bmap就是桶的資料結構,每個桶最多儲存8個key-value對,所有的key都是經過hash後有相同的尾部,在桶內,根據hash值的高8位來決定桶中的位置。
注意到key和value是各自在一起的,不是key/value/key/value/...的方式,這樣的好處是某些情況下可以省略padding欄位,節省記憶體空間
如map[int64]int8,按照key/value/key/value/... 這樣的模式儲存,每一個key/value對之後都要額外 padding7個位元組;而將所有的key,value 分別繫結到一起,key/key/.../value/value/...,只需在最後新增padding。
每個bucket設計成最多隻能放8個key-value對,如果有第9個 key-value落入當前的bucket,那就需要再構建一個bucket,透過overflow指標連線起來。
map的擴容裝載因子:loadFactor := count / (2^B) count是map中元素的個數,2^B表示bucket數量
符合下面兩個條件會觸發擴容: 1)裝載因子超過閾值:原始碼裡定義的閾值為6.5 2)overflow的bucket數量過多:當B>15時,overflow的bucket數量超過2^15^,否則超過2^B^
增加桶搬遷示例:
const ( bucketCntBits = 3 bucketCnt = 1 << bucketCntBits loadFactorNum = 13 loadFactorDen = 2)func bucketShift(b uint8) uintptr { return uintptr(1) << (b & (sys.PtrSize*8 - 1))}func overLoadFactor(count int, B uint8) bool { return count > bucketCnt && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)}func tooManyOverflowBuckets(noverflow uint16, B uint8) bool { if B > 15 { B = 15 } return noverflow >= uint16(1)<<(B&15)}func hashGrow(t *maptype, h *hmap) { bigger := uint8(1) if !overLoadFactor(h.count+1, h.B) { bigger = 0 h.flags |= sameSizeGrow } oldbuckets := h.buckets newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil) flags := h.flags &^ (iterator | oldIterator) if h.flags&iterator != 0 { flags |= oldIterator } // commit the grow (atomic wrt gc) h.B += bigger h.flags = flags h.oldbuckets = oldbuckets h.buckets = newbuckets h.nevacuate = 0 h.noverflow = 0 if h.extra != nil && h.extra.overflow != nil { if h.extra.oldoverflow != nil { throw("oldoverflow is not nil") } h.extra.oldoverflow = h.extra.overflow h.extra.overflow = nil } if nextOverflow != nil { if h.extra == nil { h.extra = new(mapextra) } h.extra.nextOverflow = nextOverflow } // the actual copying of the hash table data is done incrementally // by growWork() and evacuate().}// 返回是否在搬遷中func (h *hmap) growing() bool { return h.oldbuckets != nil}// 執行搬遷func growWork(t *maptype, h *hmap, bucket uintptr) { // make sure we evacuate the oldbucket corresponding // to the bucket we're about to use evacuate(t, h, bucket&h.oldbucketmask()) //搬遷一個桶 // evacuate one more oldbucket to make progress on growing if h.growing() { evacuate(t, h, h.nevacuate) }}// evacDst 搬遷結構體type evacDst struct { b *bmap // current destination bucket i int // key/elem index into b k unsafe.Pointer // pointer to current key storage e unsafe.Pointer // pointer to current elem storage}// 執行搬遷func evacuate(t *maptype, h *hmap, oldbucket uintptr) { b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))) newbit := h.noldbuckets() if !evacuated(b) { var xy [2]evacDst x := &xy[0] x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize))) x.k = add(unsafe.Pointer(x.b), dataOffset) x.e = add(x.k, bucketCnt*uintptr(t.keysize)) if !h.sameSizeGrow() { // Only calculate y pointers if we're growing bigger. // Otherwise GC can see bad pointers. y := &xy[1] y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize))) y.k = add(unsafe.Pointer(y.b), dataOffset) y.e = add(y.k, bucketCnt*uintptr(t.keysize)) } for ; b != nil; b = b.overflow(t) { k := add(unsafe.Pointer(b), dataOffset) e := add(k, bucketCnt*uintptr(t.keysize)) for i := 0; i < bucketCnt; i, k, e = i+1, add(k, uintptr(t.keysize)), add(e, uintptr(t.elemsize)) { top := b.tophash[i] if isEmpty(top) { b.tophash[i] = evacuatedEmpty continue } if top < minTopHash { throw("bad map state") } k2 := k if t.indirectkey() { k2 = *((*unsafe.Pointer)(k2)) } var useY uint8 if !h.sameSizeGrow() { hash := t.hasher(k2, uintptr(h.hash0)) if h.flags&iterator != 0 && !t.reflexivekey() && !t.key.equal(k2, k2) { useY = top & 1 top = tophash(hash) } else { if hash&newbit != 0 { useY = 1 } } } if evacuatedX+1 != evacuatedY || evacuatedX^1 != evacuatedY { throw("bad evacuatedN") } b.tophash[i] = evacuatedX + useY // evacuatedX + 1 == evacuatedY dst := &xy[useY] // evacuation destination if dst.i == bucketCnt { dst.b = h.newoverflow(t, dst.b) dst.i = 0 dst.k = add(unsafe.Pointer(dst.b), dataOffset) dst.e = add(dst.k, bucketCnt*uintptr(t.keysize)) } dst.b.tophash[dst.i&(bucketCnt-1)] = top // mask dst.i as an optimization, to avoid a bounds check if t.indirectkey() { *(*unsafe.Pointer)(dst.k) = k2 // copy pointer } else { typedmemmove(t.key, dst.k, k) // copy elem } if t.indirectelem() { *(*unsafe.Pointer)(dst.e) = *(*unsafe.Pointer)(e) } else { typedmemmove(t.elem, dst.e, e) } dst.i++ dst.k = add(dst.k, uintptr(t.keysize)) dst.e = add(dst.e, uintptr(t.elemsize)) } } // Unlink the overflow buckets & clear key/elem to help GC. if h.flags&oldIterator == 0 && t.bucket.ptrdata != 0 { b := add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)) ptr := add(b, dataOffset) n := uintptr(t.bucketsize) - dataOffset memclrHasPointers(ptr, n) } } if oldbucket == h.nevacuate { advanceEvacuation(h, t, newbit) }}func advanceEvacuation(h *hmap, t *maptype, newbit uintptr) { h.nevacuate++ stop := h.nevacuate + 1024 if stop > newbit { stop = newbit } for h.nevacuate != stop && bucketEvacuated(t, h, h.nevacuate) { h.nevacuate++ } if h.nevacuate == newbit { // newbit == # of oldbuckets // 搬遷完成,釋放舊的桶資料 h.oldbuckets = nil if h.extra != nil { h.extra.oldoverflow = nil } h.flags &^= sameSizeGrow }}
map的建立//注:go:nosplit該指令指定檔案中宣告的下一個函式不得包含堆疊溢位檢查。簡單來講,就是這個函式跳過堆疊溢位的檢查//go:nosplit 快速得到隨機數 參考論文:https://www.jstatsoft.org/article/view/v008i14/xorshift.pdffunc fastrand() uint32 { mp := getg().m s1, s0 := mp.fastrand[0], mp.fastrand[1] s1 ^= s1 << 17 s1 = s1 ^ s0 ^ s1>>7 ^ s0>>16 mp.fastrand[0], mp.fastrand[1] = s0, s1 return s0 + s1}func makemap(t *maptype, hint int, h *hmap) *hmap { mem, overflow := math.MulUintptr(uintptr(hint), t.bucket.size) if overflow || mem > maxAlloc { hint = 0 } // 初始化 if h == nil { h = new(hmap) } h.hash0 = fastrand() // 計算合適的桶個數B B := uint8(0) for overLoadFactor(hint, B) { B++ } h.B = B // 分配雜湊表 if h.B != 0 { var nextOverflow *bmap h.buckets, nextOverflow = makeBucketArray(t, h.B, nil) if nextOverflow != nil { h.extra = new(mapextra) h.extra.nextOverflow = nextOverflow } } return h}// makeBucketArray initializes a backing array for map buckets.func makeBucketArray(t *maptype, b uint8, dirtyalloc unsafe.Pointer) ( buckets unsafe.Pointer, nextOverflow *bmap) { base := bucketShift(b) nbuckets := base // 對於比較小的b,不額外申請過載空間 if b >= 4 { nbuckets += bucketShift(b - 4) sz := t.bucket.size * nbuckets up := roundupsize(sz) if up != sz { nbuckets = up / t.bucket.size } } if dirtyalloc == nil { buckets = newarray(t.bucket, int(nbuckets)) } else { buckets = dirtyalloc size := t.bucket.size * nbuckets if t.bucket.ptrdata != 0 { memclrHasPointers(buckets, size) } else { memclrNoHeapPointers(buckets, size) } } if base != nbuckets { nextOverflow = (*bmap)(add(buckets, base*uintptr(t.bucketsize))) last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.bucketsize))) last.setoverflow(t, (*bmap)(buckets)) } return buckets, nextOverflow}
key的定位key經過hash後得到一個64位整數,使用最後B位判斷該key落入到哪個桶中,如果桶相同,說明有hash碰撞,在該桶中遍歷尋找,採用hash值的高8位來快速比較,來看一個示例:
再來看一下原始碼:
const ( emptyRest = 0 // this cell is empty, and there are no more non-empty cells at higher indexes or overflows. emptyOne = 1 // this cell is empty evacuatedX = 2 // key/elem is valid. Entry has been evacuated to first half of larger table. evacuatedY = 3 // same as above, but evacuated to second half of larger table. evacuatedEmpty = 4 // cell is empty, bucket is evacuated. minTopHash = 5 // minimum tophash for a normal filled cell)//計算hash值的高8位func tophash(hash uintptr) uint8 { top := uint8(hash >> (sys.PtrSize*8 - 8)) if top < minTopHash { top += minTopHash } return top}//p指標偏移x位func add(p unsafe.Pointer, x uintptr) unsafe.Pointer { return unsafe.Pointer(uintptr(p) + x)}const PtrSize = 4 << (^uintptr(0) >> 63) // 64位環境下是 8//獲取下一個桶節點func (b *bmap) overflow(t *maptype) *bmap { return *(**bmap)(add(unsafe.Pointer(b), uintptr(t.bucketsize)-sys.PtrSize))}//判斷是否搬遷完成func evacuated(b *bmap) bool { h := b.tophash[0] return h > emptyOne && h < minTopHash}func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer{}func mapaccess2(t *maptype, h *hmap, key unsafe.Pointer) (unsafe.Pointer, bool){}func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer { // ...... //h為空或數量為0返回空值 if h == nil || h.count == 0 { if t.hashMightPanic() { t.hasher(key, 0) // see issue https://github.com/golang/go/issues/23734 } return unsafe.Pointer(&zeroVal[0]) } //讀寫衝突 if h.flags&hashWriting != 0 { throw("concurrent map read and map write") } //對請求key計算hash值 hash := t.hasher(key, uintptr(h.hash0)) //獲取桶的掩碼,如B=5,m=31即二進位制(11111) m := bucketMask(h.B) //定位到對應的桶 b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize))) //判斷是否處於擴容中 if c := h.oldbuckets; c != nil { if !h.sameSizeGrow() { // There used to be half as many buckets; mask down one more power of two. m >>= 1 } oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize))) if !evacuated(oldb) { b = oldb } } //計算hash值的前8位 top := tophash(hash)bucketloop: for ; b != nil; b = b.overflow(t) { for i := uintptr(0); i < bucketCnt; i++ { if b.tophash[i] != top { if b.tophash[i] == emptyRest { break bucketloop } continue } //定位到key的位置 k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize)) if t.indirectkey() { //對於指標需要解引用 k = *((*unsafe.Pointer)(k)) } if t.key.equal(key, k) { //定位到value的位置 e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize)) if t.indirectelem() { //對於指標需要解引用 e = *((*unsafe.Pointer)(e)) } return e } } } return unsafe.Pointer(&zeroVal[0])}
map的賦值const hashWriting = 4 // a goroutine is writing to the map// 判斷是否是空節點func isEmpty(x uint8) bool { return x <= emptyOne // x <= 1}func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer { if h == nil { panic(plainError("assignment to entry in nil map")) } //省略條件判斷 if h.flags&hashWriting != 0 { throw("concurrent map writes") } hash := t.hasher(key, uintptr(h.hash0)) // Set hashWriting after calling t.hasher, since t.hasher may panic, // in which case we have not actually done a write. h.flags ^= hashWriting if h.buckets == nil { h.buckets = newobject(t.bucket) // newarray(t.bucket, 1) }again: bucket := hash & bucketMask(h.B) if h.growing() { growWork(t, h, bucket) } b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize))) top := tophash(hash) var inserti *uint8 var insertk unsafe.Pointer var elem unsafe.Pointerbucketloop: for { for i := uintptr(0); i < bucketCnt; i++ { if b.tophash[i] != top { if isEmpty(b.tophash[i]) && inserti == nil { inserti = &b.tophash[i] insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize)) elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize)) } if b.tophash[i] == emptyRest { break bucketloop } continue } k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize)) if t.indirectkey() { k = *((*unsafe.Pointer)(k)) } if !t.key.equal(key, k) { continue } // already have a mapping for key. Update it. if t.needkeyupdate() { typedmemmove(t.key, k, key) } elem = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize)) goto done } ovf := b.overflow(t) if ovf == nil { break } b = ovf } //沒有找到key,需要分配空間,先判斷是否需要擴容,需擴容則重新計算 if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) { hashGrow(t, h) goto again // Growing the table invalidates everything, so try again } if inserti == nil { newb := h.newoverflow(t, b) inserti = &newb.tophash[0] insertk = add(unsafe.Pointer(newb), dataOffset) elem = add(insertk, bucketCnt*uintptr(t.keysize)) } // store new key/elem at insert position if t.indirectkey() { kmem := newobject(t.key) *(*unsafe.Pointer)(insertk) = kmem insertk = kmem } if t.indirectelem() { vmem := newobject(t.elem) *(*unsafe.Pointer)(elem) = vmem } typedmemmove(t.key, insertk, key) *inserti = top h.count++done: if h.flags&hashWriting == 0 { throw("concurrent map writes") } h.flags &^= hashWriting if t.indirectelem() { elem = *((*unsafe.Pointer)(elem)) } return elem}
map的刪除func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) { //... 忽略檢查 if h == nil || h.count == 0 { if t.hashMightPanic() { t.hasher(key, 0) // see issue 23734 } return } if h.flags&hashWriting != 0 { throw("concurrent map writes") } hash := t.hasher(key, uintptr(h.hash0)) // Set hashWriting after calling t.hasher, since t.hasher may panic, // in which case we have not actually done a write (delete). h.flags ^= hashWriting bucket := hash & bucketMask(h.B) if h.growing() { growWork(t, h, bucket) } b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize))) bOrig := b top := tophash(hash)search: for ; b != nil; b = b.overflow(t) { for i := uintptr(0); i < bucketCnt; i++ { if b.tophash[i] != top { if b.tophash[i] == emptyRest { //遍歷到最後仍未找到 break search } continue } k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize)) k2 := k if t.indirectkey() { k2 = *((*unsafe.Pointer)(k2)) } if !t.key.equal(key, k2) { //比較key是否一致 continue } // Only clear key if there are pointers in it. if t.indirectkey() { *(*unsafe.Pointer)(k) = nil } else if t.key.ptrdata != 0 { memclrHasPointers(k, t.key.size) } e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize)) if t.indirectelem() { *(*unsafe.Pointer)(e) = nil } else if t.elem.ptrdata != 0 { memclrHasPointers(e, t.elem.size) } else { memclrNoHeapPointers(e, t.elem.size) } //將tophash標記置為空值 b.tophash[i] = emptyOne if i == bucketCnt-1 { if b.overflow(t) != nil && b.overflow(t).tophash[0] != emptyRest { goto notLast } } else { if b.tophash[i+1] != emptyRest { goto notLast } } for { b.tophash[i] = emptyRest if i == 0 { if b == bOrig { break // beginning of initial bucket, we're done. } // Find previous bucket, continue at its last entry. c := b for b = bOrig; b.overflow(t) != c; b = b.overflow(t) { } i = bucketCnt - 1 } else { i-- } if b.tophash[i] != emptyOne { break } } notLast: h.count-- // Reset the hash seed to make it more difficult for attackers to // repeatedly trigger hash collisions. See issue 25237. if h.count == 0 { h.hash0 = fastrand() } break search } } if h.flags&hashWriting == 0 { throw("concurrent map writes") } h.flags &^= hashWriting}
map的遍歷先來看個示例: 從紅色位置開始,先遍歷3號桶的e,然後是3號桶連結的f和g,接著遍歷到0號桶,由於0號桶未搬遷,只遍歷其中屬於0號桶的b和c,接著到1號桶的h,最後是2號桶的a和d。
遍歷結果:e->f->g->b->c->h->a->d
原始碼執行流程圖:
// 雜湊遍歷結構體type hiter struct { key unsafe.Pointer // Must be in first position. Write nil to indicate iteration end (see cmd/compile/internal/gc/range.go). elem unsafe.Pointer // Must be in second position (see cmd/compile/internal/gc/range.go). t *maptype h *hmap buckets unsafe.Pointer // bucket ptr at hash_iter initialization time bptr *bmap // current bucket overflow *[]*bmap // keeps overflow buckets of hmap.buckets alive oldoverflow *[]*bmap // keeps overflow buckets of hmap.oldbuckets alive startBucket uintptr // bucket iteration started at offset uint8 // intra-bucket offset to start from during iteration (should be big enough to hold bucketCnt-1) wrapped bool // already wrapped around from end of bucket array to beginning B uint8 i uint8 bucket uintptr checkBucket uintptr}func mapiterinit(t *maptype, h *hmap, it *hiter) { if raceenabled && h != nil { callerpc := getcallerpc() racereadpc(unsafe.Pointer(h), callerpc, funcPC(mapiterinit)) } if h == nil || h.count == 0 { return } if unsafe.Sizeof(hiter{})/sys.PtrSize != 12 { throw("hash_iter size incorrect") // see cmd/compile/internal/gc/reflect.go } it.t = t it.h = h it.B = h.B it.buckets = h.buckets if t.bucket.ptrdata == 0 { h.createOverflow() it.overflow = h.extra.overflow it.oldoverflow = h.extra.oldoverflow } // 定位到隨機位置 r := uintptr(fastrand()) if h.B > 31-bucketCntBits { r += uintptr(fastrand()) << 31 } it.startBucket = r & bucketMask(h.B) it.offset = uint8(r >> h.B & (bucketCnt - 1)) it.bucket = it.startBucket if old := h.flags; old&(iterator|oldIterator) != iterator|oldIterator { atomic.Or8(&h.flags, iterator|oldIterator) } mapiternext(it)}func mapiternext(it *hiter) { h := it.h if raceenabled { callerpc := getcallerpc() racereadpc(unsafe.Pointer(h), callerpc, funcPC(mapiternext)) } if h.flags&hashWriting != 0 { throw("concurrent map iteration and map write") } t := it.t bucket := it.bucket b := it.bptr i := it.i checkBucket := it.checkBucketnext: if b == nil { if bucket == it.startBucket && it.wrapped { // end of iteration it.key = nil it.elem = nil return } if h.growing() && it.B == h.B { oldbucket := bucket & it.h.oldbucketmask() b = (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))) if !evacuated(b) { checkBucket = bucket } else { b = (*bmap)(add(it.buckets, bucket*uintptr(t.bucketsize))) checkBucket = noCheck } } else { b = (*bmap)(add(it.buckets, bucket*uintptr(t.bucketsize))) checkBucket = noCheck } bucket++ if bucket == bucketShift(it.B) { bucket = 0 it.wrapped = true } i = 0 } for ; i < bucketCnt; i++ { offi := (i + it.offset) & (bucketCnt - 1) if isEmpty(b.tophash[offi]) || b.tophash[offi] == evacuatedEmpty { continue } k := add(unsafe.Pointer(b), dataOffset+uintptr(offi)*uintptr(t.keysize)) if t.indirectkey() { k = *((*unsafe.Pointer)(k)) } e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+uintptr(offi)*uintptr(t.elemsize)) if checkBucket != noCheck && !h.sameSizeGrow() { if t.reflexivekey() || t.key.equal(k, k) { hash := t.hasher(k, uintptr(h.hash0)) if hash&bucketMask(it.B) != checkBucket { continue } } else { if checkBucket>>(it.B-1) != uintptr(b.tophash[offi]&1) { continue } } } if (b.tophash[offi] != evacuatedX && b.tophash[offi] != evacuatedY) || !(t.reflexivekey() || t.key.equal(k, k)) { it.key = k if t.indirectelem() { e = *((*unsafe.Pointer)(e)) } it.elem = e } else { rk, re := mapaccessK(t, h, k) if rk == nil { continue // key has been deleted } it.key = rk it.elem = re } it.bucket = bucket if it.bptr != b { // avoid unnecessary write barrier; see issue 14921 it.bptr = b } it.i = i + 1 it.checkBucket = checkBucket return } b = b.overflow(t) i = 0 goto next}
管道基本概念不要透過共享記憶體來通訊,而要透過通訊來實現記憶體共享。 Do not communicate by sharing memory; instead, share memory by communicating.
chan T // 宣告一個雙向通道chan<- T // 宣告一個只能用於傳送的通道<-chan T // 宣告一個只能用於接收的通道ch := make(chan int) //初始化一個無緩衝區的int型別通道ch := make(chan int, 3) //初始化一個容量為3有緩衝區的int型別通道
示例
func goRoutineA(a <-chan int) { val := <-a fmt.Println("goRoutineA received the data", val)}func main() { ch := make(chan int) go goRoutineA(ch) time.Sleep(time.Second * 1)}
資料結構
// 管道type hchan struct { qcount uint // 元素的數量 dataqsiz uint // 緩衝區的大小 buf unsafe.Pointer // 指向緩衝區的指標 elemsize uint16 // 每個元素的大小 closed uint32 // 管道狀態 elemtype *_type // 元素型別 sendx uint // 傳送的索引 recvx uint // 接收的索引 recvq waitq // 等待接收的佇列 sendq waitq // 等待發送的佇列 lock mutex //互斥鎖}// 等待佇列type waitq struct { first *sudog last *sudog}type sudog struct { g *g next *sudog prev *sudog elem unsafe.Pointer // data element (may point to stack) ... isSelect bool ... c *hchan // channel}func goroutineA(a <-chan int) { val := <- a fmt.Println("G1 received data: ", val) return}func goroutineB(b <-chan int) { val := <- b fmt.Println("G2 received data: ", val) return}func main() { ch := make(chan int) go goroutineA(ch) go goroutineB(ch) ch <- 3 time.Sleep(time.Second)}
chan的建立const ( maxAlign = 8 hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1)))func makechan(t *chantype, size int) *hchan { elem := t.elem // ... 忽略檢查和對齊 mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } var c *hchan switch { case mem == 0: // 無緩衝區,只分配chan c = (*hchan)(mallocgc(hchanSize, nil, true)) // 用於競態檢測 c.buf = c.raceaddr() case elem.ptrdata == 0: // 元素不包含指標,一次呼叫分配chan和緩衝區 c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: // 元素包含指標,兩次呼叫分配 c = new(hchan) c.buf = mallocgc(mem, elem, true) } c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) lockInit(&c.lock, lockRankHchan) return c}
chan的接收ch := make(chan int, 3)ch <- 1x := <-chfmt.Println(x)ch <- 2y, ok := <-chif ok { fmt.Println(y)}//go:nosplitfunc chanrecv1(c *hchan, elem unsafe.Pointer) { chanrecv(c, elem, true)}//go:nosplitfunc chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) { _, received = chanrecv(c, elem, true) return}// 判斷管道是否為空// 1. 非緩衝型,等待發送列隊sendq裡沒有goroutine在等待// 2. 緩衝型,但 buf 裡沒有元素func empty(c *hchan) bool { if c.dataqsiz == 0 { return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil } return atomic.Loaduint(&c.qcount) == 0}// 找到緩衝區第i個位置func chanbuf(c *hchan, i uint) unsafe.Pointer { return add(c.buf, uintptr(i)*uintptr(c.elemsize))}// 管道接收func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { if c == nil { // 如果不阻塞,直接返回 (false, false) if !block { return } // 否則,接收一個nil的 channel,goroutine掛起 gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) // 不會執行到這裡 throw("unreachable") } // 在非阻塞模式下,快速檢測到失敗,不用獲取鎖,快速返回 if !block && empty(c) { // 如果block == false,且沒有元素就緒,且管道未關閉,則返回(false, false) if atomic.Load(&c.closed) == 0 { return } // 如果c關閉了,將ep置0,返回(true, false) if empty(c) { // ... 忽略競態檢查 if ep != nil { // 從一個已關閉的 channel 執行接收操作,且未忽略返回值 // 那麼接收的值將是一個該型別的零值 // typedmemclr 根據型別清理相應地址的記憶體 typedmemclr(c.elemtype, ep) } // 從一個已關閉的 channel 接收,selected 會返回true return true, false } } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } lock(&c.lock) if c.closed != 0 && c.qcount == 0 { // ... 忽略競態檢查 unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } //等待發送佇列裡有goroutine存在,呼叫recv函式處理 if sg := c.sendq.dequeue(); sg != nil { recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } // 緩衝型,buf裡有元素,可以正常接收 if c.qcount > 0 { // 直接從迴圈數組裡找到要接收的元素 qp := chanbuf(c, c.recvx) // ... 忽略競態檢查 if ep != nil { typedmemmove(c.elemtype, ep, qp) } // 清理掉迴圈數組裡相應位置的值 typedmemclr(c.elemtype, qp) // 接收遊標向前移動 c.recvx++ // 接收遊標歸零 if c.recvx == c.dataqsiz { c.recvx = 0 } // buf 數組裡的元素個數減 1 c.qcount-- unlock(&c.lock) return true, true } if !block { // 非阻塞接收,解鎖。selected返回false,因為沒有接收到值 unlock(&c.lock) return false, false } // 接下來就是處理要被阻塞的情況,構造一個sudog gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // 待接收資料的地址儲存下來 mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil // 進入channel 的等待接收佇列 c.recvq.enqueue(mysg) // 將當前 goroutine 掛起 atomic.Store8(&gp.parkingOnChan, 1) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) // 被喚醒了,接著從這裡繼續執行一些掃尾工作 if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } success := mysg.success gp.param = nil mysg.c = nil releaseSudog(mysg) return true, success}
chanrecv接收管道c,將接收到的資料寫入到ep 如果ep為空,則忽略接收到的資料 如果block == false,且沒有元素就緒,返回(false, false) 否則,如果c關閉了,將ep置0,返回(true, false) 否則,將接收到的資料填充到ep中,返回(true, true)
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if c.dataqsiz == 0 { // ... 忽略競態檢查 if ep != nil { // 直接複製資料,從 sender goroutine -> receiver goroutine recvDirect(c.elemtype, sg, ep) } } else { // 緩衝型的 channel,但 buf 已滿。 // 將迴圈陣列 buf 隊首的元素複製到接收資料的地址 // 將傳送者的資料入隊。實際上這時 revx 和 sendx 值相等 // 找到接收遊標 qp := chanbuf(c, c.recvx) // ... 忽略競態檢查 // 將接收遊標處的資料複製給接收者 if ep != nil { typedmemmove(c.elemtype, ep, qp) } // 將傳送者資料複製到 buf typedmemmove(c.elemtype, qp, sg.elem) // 更新遊標值 c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } // 喚醒傳送的 goroutine。需要等到排程器的光臨 goready(gp, skip+1)}func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) { src := sg.elem typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size) memmove(dst, src, t.size)}
管道的傳送//判斷緩衝區是否滿了,有兩種可能// 1. channel 是非緩衝型的,且等待接收佇列裡沒有 goroutine// 2. channel 是緩衝型的,但迴圈陣列已經裝滿了元素func full(c *hchan) bool { if c.dataqsiz == 0 { return c.recvq.first == nil } return c.qcount == c.dataqsiz}func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { if c == nil { if !block { return false } gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") } // ... 忽略競態檢查 // 對於不阻塞的 send,快速檢測失敗場景 // 如果channel未關閉且channel沒有多餘的緩衝空間,返回false if !block && c.closed == 0 && full(c) { return false } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } //加鎖 lock(&c.lock) //嘗試向已經關閉的channel傳送資料會觸發panic if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) } // 如果接收佇列裡有goroutine,直接將要傳送的資料複製到接收goroutine if sg := c.recvq.dequeue(); sg != nil { send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } // 對於緩衝型的channel,如果還有緩衝空間 if c.qcount < c.dataqsiz { // qp指向buf的sendx位置 qp := chanbuf(c, c.sendx) // ... 忽略競態檢查 // 將資料從ep處複製到qp typedmemmove(c.elemtype, qp, ep) // 傳送遊標值加 1 c.sendx++ // 如果傳送遊標值等於容量值,遊標值歸0 if c.sendx == c.dataqsiz { c.sendx = 0 } // 緩衝區的元素數量加一 c.qcount++ // 解鎖 unlock(&c.lock) return true } if !block { unlock(&c.lock) return false } // channel滿了,傳送方會被阻塞。接下來會構造一個sudog gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil // 當前 goroutine 進入傳送等待佇列 c.sendq.enqueue(mysg) // 當前 goroutine 被掛起 atomic.Store8(&gp.parkingOnChan, 1) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) // 保持活躍 KeepAlive(ep) // 從這裡開始被喚醒了(channel有機會可以傳送了) if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false closed := !mysg.success gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil releaseSudog(mysg) if closed { if c.closed == 0 { throw("chansend: spurious wakeup") } // 被喚醒後,channel關閉了,觸發panic panic(plainError("send on closed channel")) } return true}func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // ... 忽略競態檢查 if sg.elem != nil { sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1)}func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) { // src 在當前 goroutine 的棧上,dst 是另一個 goroutine 的棧 // 直接進行記憶體"搬遷" // 如果目標地址的棧發生了棧收縮,當我們讀出了 sg.elem 後 // 就不能修改真正的 dst 位置的值了 // 因此需要在讀和寫之前加上一個屏障 dst := sg.elem typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size) memmove(dst, src, t.size)}
管道的關閉func closechan(c *hchan) { // 關閉空channel會觸發panic if c == nil { panic(plainError("close of nil channel")) } lock(&c.lock) if c.closed != 0 { unlock(&c.lock) // 關閉已經關閉的channel會觸發panic panic(plainError("close of closed channel")) } // ... 忽略競態檢查 //設定管道已關閉 c.closed = 1 var glist gList // 將channel所有等待接收佇列的裡sudog釋放 for { sg := c.recvq.dequeue() if sg == nil { break } // 如果elem不為空,說明此receiver未忽略接收資料 // 給它賦一個相應型別的零值 if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false // ... 忽略競態檢查 glist.push(gp) } // 將channel所有等待發送佇列的裡sudog釋放 for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false // ... 忽略競態檢查 glist.push(gp) } unlock(&c.lock) // 遍歷連結串列執行釋放 for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) }}
管道的使用參考文獻《Go語言實戰》《Go語言學習筆記》《Go併發程式設計實戰》《Go語言核心程式設計》《Go程式設計語言》go語言原始碼深度解密Go語言之slice深度解密Go語言之map深度解密Go語言之channelGo 語言設計與實現Diving Deep Into The Golang Channels併發與並行 · Concurrency in Go 中文筆記 · 看雲出處:https://zhuanlan.zhihu.com/p/341945051