首頁>技術>

本文主要研究一下cortex的Ingester

Ingester

cortex/pkg/api/api.go

// Ingester is defined as an interface to allow for alternative implementations// of ingesters to be passed into the API.RegisterIngester() method.type Ingester interface {    client.IngesterServer    FlushHandler(http.ResponseWriter, *http.Request)    ShutdownHandler(http.ResponseWriter, *http.Request)    Push(context.Context, *client.WriteRequest) (*client.WriteResponse, error)}

Ingester介面內嵌了client.IngesterServer,定義了FlushHandler、ShutdownHandler、Push方法

client.IngesterServer

cortex/pkg/ingester/client/cortex.pb.go

// IngesterServer is the server API for Ingester service.type IngesterServer interface {    Push(context.Context, *WriteRequest) (*WriteResponse, error)    Query(context.Context, *QueryRequest) (*QueryResponse, error)    QueryStream(*QueryRequest, Ingester_QueryStreamServer) error    LabelValues(context.Context, *LabelValuesRequest) (*LabelValuesResponse, error)    LabelNames(context.Context, *LabelNamesRequest) (*LabelNamesResponse, error)    UserStats(context.Context, *UserStatsRequest) (*UserStatsResponse, error)    AllUserStats(context.Context, *UserStatsRequest) (*UsersStatsResponse, error)    MetricsForLabelMatchers(context.Context, *MetricsForLabelMatchersRequest) (*MetricsForLabelMatchersResponse, error)    MetricsMetadata(context.Context, *MetricsMetadataRequest) (*MetricsMetadataResponse, error)    // TransferChunks allows leaving ingester (client) to stream chunks directly to joining ingesters (server).    TransferChunks(Ingester_TransferChunksServer) error}

client.IngesterServer介面定義了Push、Query、QueryStream、LabelValues、LabelNames、UserStats、AllUserStats、MetricsForLabelMatchers、MetricsMetadata、TransferChunks方法

Push

cortex/pkg/ingester/ingester.go

// Push implements client.IngesterServerfunc (i *Ingester) Push(ctx context.Context, req *client.WriteRequest) (*client.WriteResponse, error) {    if err := i.checkRunningOrStopping(); err != nil {        return nil, err    }    if i.cfg.BlocksStorageEnabled {        return i.v2Push(ctx, req)    }    // NOTE: because we use `unsafe` in deserialisation, we must not    // retain anything from `req` past the call to ReuseSlice    defer client.ReuseSlice(req.Timeseries)    userID, err := tenant.TenantID(ctx)    if err != nil {        return nil, fmt.Errorf("no user id")    }    // Given metadata is a best-effort approach, and we don't halt on errors    // process it before samples. Otherwise, we risk returning an error before ingestion.    i.pushMetadata(ctx, userID, req.GetMetadata())    var firstPartialErr *validationError    var record *WALRecord    if i.cfg.WALConfig.WALEnabled {        record = recordPool.Get().(*WALRecord)        record.UserID = userID        // Assuming there is not much churn in most cases, there is no use        // keeping the record.Labels slice hanging around.        record.Series = nil        if cap(record.Samples) < len(req.Timeseries) {            record.Samples = make([]tsdb_record.RefSample, 0, len(req.Timeseries))        } else {            record.Samples = record.Samples[:0]        }    }    for _, ts := range req.Timeseries {        seriesSamplesIngested := 0        for _, s := range ts.Samples {            // append() copies the memory in `ts.Labels` except on the error path            err := i.append(ctx, userID, ts.Labels, model.Time(s.TimestampMs), model.SampleValue(s.Value), req.Source, record)            if err == nil {                seriesSamplesIngested++                continue            }            i.metrics.ingestedSamplesFail.Inc()            if ve, ok := err.(*validationError); ok {                if firstPartialErr == nil {                    firstPartialErr = ve                }                continue            }            // non-validation error: abandon this request            return nil, grpcForwardableError(userID, http.StatusInternalServerError, err)        }        if i.cfg.ActiveSeriesMetricsEnabled && seriesSamplesIngested > 0 {            // updateActiveSeries will copy labels if necessary.            i.updateActiveSeries(userID, time.Now(), ts.Labels)        }    }    if record != nil {        // Log the record only if there was no error in ingestion.        if err := i.wal.Log(record); err != nil {            return nil, err        }        recordPool.Put(record)    }    if firstPartialErr != nil {        // grpcForwardableError turns the error into a string so it no longer references `req`        return &client.WriteResponse{}, grpcForwardableError(userID, firstPartialErr.code, firstPartialErr)    }    return &client.WriteResponse{}, nil}

Push方法首先執行checkRunningOrStopping,若i.cfg.BlocksStorageEnabled則執行i.v2Push(ctx, req);否則遍歷req.Timeseries執行i.append

FlushHandler

cortex/pkg/ingester/flush.go

// FlushHandler triggers a flush of all in memory chunks.  Mainly used for// local testing.func (i *Ingester) FlushHandler(w http.ResponseWriter, r *http.Request) {    if i.cfg.BlocksStorageEnabled {        i.v2FlushHandler(w, r)        return    }    level.Info(util.Logger).Log("msg", "starting to flush all the chunks")    i.sweepUsers(true)    level.Info(util.Logger).Log("msg", "chunks queued for flushing")    w.WriteHeader(http.StatusNoContent)}

FlushHandler方法在i.cfg.BlocksStorageEnabled為true時執行i.v2FlushHandler(w, r)

ShutdownHandler

cortex/pkg/ingester/ingester.go

// ShutdownHandler triggers the following set of operations in order://     * Change the state of ring to stop accepting writes.//     * Flush all the chunks.func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) {    originalFlush := i.lifecycler.FlushOnShutdown()    // We want to flush the chunks if transfer fails irrespective of original flag.    i.lifecycler.SetFlushOnShutdown(true)    // In the case of an HTTP shutdown, we want to unregister no matter what.    originalUnregister := i.lifecycler.ShouldUnregisterOnShutdown()    i.lifecycler.SetUnregisterOnShutdown(true)    _ = services.StopAndAwaitTerminated(context.Background(), i)    // Set state back to original.    i.lifecycler.SetFlushOnShutdown(originalFlush)    i.lifecycler.SetUnregisterOnShutdown(originalUnregister)    w.WriteHeader(http.StatusNoContent)}

ShutdownHandler方法執行i.lifecycler.FlushOnShutdown()、i.lifecycler.ShouldUnregisterOnShutdown()以及services.StopAndAwaitTerminated

Query

cortex/pkg/ingester/ingester.go

// Query implements service.IngesterServerfunc (i *Ingester) Query(ctx context.Context, req *client.QueryRequest) (*client.QueryResponse, error) {    if err := i.checkRunningOrStopping(); err != nil {        return nil, err    }    if i.cfg.BlocksStorageEnabled {        return i.v2Query(ctx, req)    }    userID, err := tenant.TenantID(ctx)    if err != nil {        return nil, err    }    from, through, matchers, err := client.FromQueryRequest(req)    if err != nil {        return nil, err    }    i.metrics.queries.Inc()    i.userStatesMtx.RLock()    state, ok, err := i.userStates.getViaContext(ctx)    i.userStatesMtx.RUnlock()    if err != nil {        return nil, err    } else if !ok {        return &client.QueryResponse{}, nil    }    result := &client.QueryResponse{}    numSeries, numSamples := 0, 0    maxSamplesPerQuery := i.limits.MaxSamplesPerQuery(userID)    err = state.forSeriesMatching(ctx, matchers, func(ctx context.Context, _ model.Fingerprint, series *memorySeries) error {        values, err := series.samplesForRange(from, through)        if err != nil {            return err        }        if len(values) == 0 {            return nil        }        numSeries++        numSamples += len(values)        if numSamples > maxSamplesPerQuery {            return httpgrpc.Errorf(http.StatusRequestEntityTooLarge, "exceeded maximum number of samples in a query (%d)", maxSamplesPerQuery)        }        ts := client.TimeSeries{            Labels:  client.FromLabelsToLabelAdapters(series.metric),            Samples: make([]client.Sample, 0, len(values)),        }        for _, s := range values {            ts.Samples = append(ts.Samples, client.Sample{                Value:       float64(s.Value),                TimestampMs: int64(s.Timestamp),            })        }        result.Timeseries = append(result.Timeseries, ts)        return nil    }, nil, 0)    i.metrics.queriedSeries.Observe(float64(numSeries))    i.metrics.queriedSamples.Observe(float64(numSamples))    return result, err}

Query方法先判斷i.checkRunningOrStopping();若i.cfg.BlocksStorageEnabled則執行i.v2Query(ctx, req);否則透過series.samplesForRange(from, through)獲取資料

小結

cortex的Ingester介面內嵌了client.IngesterServer,定義了FlushHandler、ShutdownHandler、Push方法;Ingester實現了Ingester介面。

doccortex

15
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • 中介軟體:ES元件RestHighLevel用法詳解