首頁>技術>

本文主要研究一下dapr的Limiter

Limiter

dapr/pkg/concurrency/limiter.go

const (    // DefaultLimit is the default concurrency limit    DefaultLimit = 100)// Limiter objecttype Limiter struct {    limit         int    tickets       chan int    numInProgress int32}

Limiter定義了limit、tickets、numInProgress屬性

NewLimiter

dapr/pkg/concurrency/limiter.go

// NewLimiter allocates a new ConcurrencyLimiterfunc NewLimiter(limit int) *Limiter {    if limit <= 0 {        limit = DefaultLimit    }    // allocate a limiter instance    c := &Limiter{        limit:   limit,        tickets: make(chan int, limit),    }    // allocate the tickets:    for i := 0; i < c.limit; i++ {        c.tickets <- i    }    return c}

NewLimiter方法根據limit來建立Limiter,並挨個分配ticket

Execute

dapr/pkg/concurrency/limiter.go

// Execute adds a function to the execution queue.// if num of go routines allocated by this instance is < limit// launch a new go routine to execute job// else wait until a go routine becomes availablefunc (c *Limiter) Execute(job func(param interface{}), param interface{}) int {    ticket := <-c.tickets    atomic.AddInt32(&c.numInProgress, 1)    go func(param interface{}) {        defer func() {            c.tickets <- ticket            atomic.AddInt32(&c.numInProgress, -1)        }()        // run the job        job(param)    }(param)    return ticket}

Execute方法首先獲取ticket,然後遞增numInProgress,之後非同步執行job,執行完後歸還ticket

Wait

dapr/pkg/concurrency/limiter.go

// Wait will block all the previously Executed jobs completed running.//// IMPORTANT: calling the Wait function while keep calling Execute leads to//            un-desired race conditionsfunc (c *Limiter) Wait() {    for i := 0; i < c.limit; i++ {        <-c.tickets    }}

Wait方法遍歷limit,挨個等待tickets返回

小結

dapr的Limiter定義了limit、tickets、numInProgress屬性;它定義了Execute、Wait方法,同時提供NewLimiter的工廠方法。

docdapr

10
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • 資料庫面試題什麼是事務以及四個特性,你能說出來嗎?