首頁>技術>

本文主要研究一下tempo的ExclusiveQueues

ExclusiveQueues

tempo/pkg/flushqueues/exclusivequeues.go

type ExclusiveQueues struct {    queues     []*util.PriorityQueue    index      *atomic.Int32    activeKeys sync.Map}

ExclusiveQueues定義了queues、index、activeKeys屬性

New

tempo/pkg/flushqueues/exclusivequeues.go

// New creates a new set of flush queues with a prom gauge to track current depthfunc New(queues int, metric prometheus.Gauge) *ExclusiveQueues {    f := &ExclusiveQueues{        queues: make([]*util.PriorityQueue, queues),        index:  atomic.NewInt32(0),    }    for j := 0; j < queues; j++ {        f.queues[j] = util.NewPriorityQueue(metric)    }    return f}

New方法先建立ExclusiveQueues,然後根據指定的queue個數透過util.NewPriorityQueue(metric)建立PriorityQueue

Enqueue

tempo/pkg/flushqueues/exclusivequeues.go

// Enqueue adds the op to the next queue and prevents any other items to be added with this keyfunc (f *ExclusiveQueues) Enqueue(op util.Op) {    _, ok := f.activeKeys.Load(op.Key())    if ok {        return    }    f.activeKeys.Store(op.Key(), struct{}{})    f.Requeue(op)}

Enqueue方法先從activeKeys查詢指定的key,若已經存在則提前返回,不存在則放入activeKeys中,然後執行f.Requeue(op)

Requeue

tempo/pkg/flushqueues/exclusivequeues.go

// Requeue adds an op that is presumed to already be covered by activeKeysfunc (f *ExclusiveQueues) Requeue(op util.Op) {    flushQueueIndex := int(f.index.Inc()) % len(f.queues)    f.queues[flushQueueIndex].Enqueue(op)}

Requeue方法首先透過int(f.index.Inc()) % len(f.queues)計算flushQueueIndex,然後找到對應的queue,執行Enqueue方法

Dequeue

tempo/pkg/flushqueues/exclusivequeues.go

// Dequeue removes the next op from the requested queue.  After dequeueing the calling//  process either needs to call ClearKey or Requeuefunc (f *ExclusiveQueues) Dequeue(q int) util.Op {    return f.queues[q].Dequeue()}

Dequeue方法執行f.queues[q]對應queue的Dequeue

Clear

tempo/pkg/flushqueues/exclusivequeues.go

// Clear unblocks the requested op.  This should be called only after a flush has been successfulfunc (f *ExclusiveQueues) Clear(op util.Op) {    f.activeKeys.Delete(op.Key())}

Clear方法將指定key從activeKeys中移除

Stop

tempo/pkg/flushqueues/exclusivequeues.go

// Stop closes all queuesfunc (f *ExclusiveQueues) Stop() {    for _, q := range f.queues {        q.Close()    }}

Stop方法遍歷f.queues,挨個執行q.Close()

小結

tempo的ExclusiveQueues定義了queues、index、activeKeys屬性;它提供了Enqueue、Requeue、Dequeue、Clear、Stop方法。

doctempo

8
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • Elasticsearch從0到千萬級資料查詢實踐