序
本文主要研究一下dapr的Limiter
Limiterdapr/pkg/concurrency/limiter.go
const ( // DefaultLimit is the default concurrency limit DefaultLimit = 100)// Limiter objecttype Limiter struct { limit int tickets chan int numInProgress int32}
Limiter定義了limit、tickets、numInProgress屬性
NewLimiterdapr/pkg/concurrency/limiter.go
// NewLimiter allocates a new ConcurrencyLimiterfunc NewLimiter(limit int) *Limiter { if limit <= 0 { limit = DefaultLimit } // allocate a limiter instance c := &Limiter{ limit: limit, tickets: make(chan int, limit), } // allocate the tickets: for i := 0; i < c.limit; i++ { c.tickets <- i } return c}
NewLimiter方法根據limit來建立Limiter,並挨個分配ticket
Executedapr/pkg/concurrency/limiter.go
// Execute adds a function to the execution queue.// if num of go routines allocated by this instance is < limit// launch a new go routine to execute job// else wait until a go routine becomes availablefunc (c *Limiter) Execute(job func(param interface{}), param interface{}) int { ticket := <-c.tickets atomic.AddInt32(&c.numInProgress, 1) go func(param interface{}) { defer func() { c.tickets <- ticket atomic.AddInt32(&c.numInProgress, -1) }() // run the job job(param) }(param) return ticket}
Execute方法首先獲取ticket,然後遞增numInProgress,之後非同步執行job,執行完後歸還ticket
Waitdapr/pkg/concurrency/limiter.go
// Wait will block all the previously Executed jobs completed running.//// IMPORTANT: calling the Wait function while keep calling Execute leads to// un-desired race conditionsfunc (c *Limiter) Wait() { for i := 0; i < c.limit; i++ { <-c.tickets }}
Wait方法遍歷limit,挨個等待tickets返回
小結dapr的Limiter定義了limit、tickets、numInProgress屬性;它定義了Execute、Wait方法,同時提供NewLimiter的工廠方法。
docdapr
最新評論