序
本文主要研究一下tempo的ExclusiveQueues
ExclusiveQueuestempo/pkg/flushqueues/exclusivequeues.go
type ExclusiveQueues struct { queues []*util.PriorityQueue index *atomic.Int32 activeKeys sync.Map}
ExclusiveQueues定義了queues、index、activeKeys屬性
Newtempo/pkg/flushqueues/exclusivequeues.go
// New creates a new set of flush queues with a prom gauge to track current depthfunc New(queues int, metric prometheus.Gauge) *ExclusiveQueues { f := &ExclusiveQueues{ queues: make([]*util.PriorityQueue, queues), index: atomic.NewInt32(0), } for j := 0; j < queues; j++ { f.queues[j] = util.NewPriorityQueue(metric) } return f}
New方法先建立ExclusiveQueues,然後根據指定的queue個數透過util.NewPriorityQueue(metric)建立PriorityQueue
Enqueuetempo/pkg/flushqueues/exclusivequeues.go
// Enqueue adds the op to the next queue and prevents any other items to be added with this keyfunc (f *ExclusiveQueues) Enqueue(op util.Op) { _, ok := f.activeKeys.Load(op.Key()) if ok { return } f.activeKeys.Store(op.Key(), struct{}{}) f.Requeue(op)}
Enqueue方法先從activeKeys查詢指定的key,若已經存在則提前返回,不存在則放入activeKeys中,然後執行f.Requeue(op)
Requeuetempo/pkg/flushqueues/exclusivequeues.go
// Requeue adds an op that is presumed to already be covered by activeKeysfunc (f *ExclusiveQueues) Requeue(op util.Op) { flushQueueIndex := int(f.index.Inc()) % len(f.queues) f.queues[flushQueueIndex].Enqueue(op)}
Requeue方法首先透過int(f.index.Inc()) % len(f.queues)計算flushQueueIndex,然後找到對應的queue,執行Enqueue方法
Dequeuetempo/pkg/flushqueues/exclusivequeues.go
// Dequeue removes the next op from the requested queue. After dequeueing the calling// process either needs to call ClearKey or Requeuefunc (f *ExclusiveQueues) Dequeue(q int) util.Op { return f.queues[q].Dequeue()}
Dequeue方法執行f.queues[q]對應queue的Dequeue
Cleartempo/pkg/flushqueues/exclusivequeues.go
// Clear unblocks the requested op. This should be called only after a flush has been successfulfunc (f *ExclusiveQueues) Clear(op util.Op) { f.activeKeys.Delete(op.Key())}
Clear方法將指定key從activeKeys中移除
Stoptempo/pkg/flushqueues/exclusivequeues.go
// Stop closes all queuesfunc (f *ExclusiveQueues) Stop() { for _, q := range f.queues { q.Close() }}
Stop方法遍歷f.queues,挨個執行q.Close()
小結tempo的ExclusiveQueues定義了queues、index、activeKeys屬性;它提供了Enqueue、Requeue、Dequeue、Clear、Stop方法。
doctempo