diff --git a/engine/engine.go b/engine/engine.go index c2cc0b50..654c7d48 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -23,6 +23,7 @@ import ( "github.com/thanos-io/promql-engine/query" engstorage "github.com/thanos-io/promql-engine/storage" promstorage "github.com/thanos-io/promql-engine/storage/prometheus" + "github.com/thanos-io/promql-engine/tracing" "github.com/efficientgo/core/errors" "github.com/prometheus/client_golang/prometheus" @@ -50,6 +51,16 @@ const ( stepsBatch = 10 ) +func (q QueryType) String() string { + switch q { + case InstantQuery: + return "InstantQuery" + case RangeQuery: + return "RangeQuery" + } + + return "Unknown" +} func IsUnimplemented(err error) bool { return errors.Is(err, parse.ErrNotSupportedExpr) || errors.Is(err, parse.ErrNotImplemented) } @@ -237,14 +248,21 @@ type Engine struct { } func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, ts time.Time) (promql.Query, error) { + span, ctx := tracing.StartSpanFromContext(ctx, "engine.MakeInstantQuery") + defer span.Finish() + span.SetTag("query", qs) + span.SetTag("timestamp", ts.Unix()) + idx, err := e.activeQueryTracker.Insert(ctx, qs) if err != nil { + tracing.LogError(span, err) return nil, err } defer e.activeQueryTracker.Delete(idx) expr, err := parser.NewParser(qs, parser.WithFunctions(e.functions)).ParseExpr() if err != nil { + tracing.LogError(span, err) return nil, err } // determine sorting order before optimizers run, we do this by looking for "sort" @@ -254,23 +272,29 @@ func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts qOpts := e.makeQueryOpts(ts, ts, 0, opts) if qOpts.StepsBatch > 64 { - return nil, ErrStepsBatchTooLarge + err := ErrStepsBatchTooLarge + tracing.LogError(span, err) + return nil, err } planOpts := logicalplan.PlanOptions{ DisableDuplicateLabelCheck: e.disableDuplicateLabelChecks, } + lplan, warns := logicalplan.NewFromAST(expr, qOpts, planOpts).Optimize(e.getLogicalOptimizers(opts)) + ctx = warnings.NewContext(ctx) + defer func() { warns.Merge(warnings.FromContext(ctx)) }() + scanners, err := e.storageScanners(q, qOpts, lplan) if err != nil { + tracing.LogError(span, err) return nil, errors.Wrap(err, "creating storage scanners") } - ctx = warnings.NewContext(ctx) - defer func() { warns.Merge(warnings.FromContext(ctx)) }() exec, err := execution.New(ctx, lplan.Root(), scanners, qOpts) if err != nil { + tracing.LogError(span, err) return nil, err } e.metrics.totalQueries.Inc() @@ -336,39 +360,56 @@ func (e *Engine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryab } func (e *Engine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, start, end time.Time, step time.Duration) (promql.Query, error) { + span, ctx := tracing.StartSpanFromContext(ctx, "engine.MakeRangeQuery") + defer span.Finish() + span.SetTag("query", qs) + span.SetTag("start", start.Unix()) + span.SetTag("end", end.Unix()) + span.SetTag("step", step.String()) + idx, err := e.activeQueryTracker.Insert(ctx, qs) if err != nil { + tracing.LogError(span, err) return nil, err } defer e.activeQueryTracker.Delete(idx) expr, err := parser.NewParser(qs, parser.WithFunctions(e.functions)).ParseExpr() if err != nil { + tracing.LogError(span, err) return nil, err } // Use same check as Prometheus for range queries. if expr.Type() != parser.ValueTypeVector && expr.Type() != parser.ValueTypeScalar { - return nil, errors.Newf("invalid expression type %q for range query, must be Scalar or instant Vector", parser.DocumentedType(expr.Type())) + err := errors.Newf("invalid expression type %q for range query, must be Scalar or instant Vector", parser.DocumentedType(expr.Type())) + tracing.LogError(span, err) + return nil, err } qOpts := e.makeQueryOpts(start, end, step, opts) if qOpts.StepsBatch > 64 { - return nil, ErrStepsBatchTooLarge + err := ErrStepsBatchTooLarge + tracing.LogError(span, err) + return nil, err } planOpts := logicalplan.PlanOptions{ DisableDuplicateLabelCheck: e.disableDuplicateLabelChecks, } + lplan, warns := logicalplan.NewFromAST(expr, qOpts, planOpts).Optimize(e.getLogicalOptimizers(opts)) ctx = warnings.NewContext(ctx) defer func() { warns.Merge(warnings.FromContext(ctx)) }() + scnrs, err := e.storageScanners(q, qOpts, lplan) if err != nil { + tracing.LogError(span, err) return nil, errors.Wrap(err, "creating storage scanners") } exec, err := execution.New(ctx, lplan.Root(), scnrs, qOpts) if err != nil { + tracing.LogError(span, err) return nil, err } e.metrics.totalQueries.Inc() @@ -528,8 +569,21 @@ type compatibilityQuery struct { } func (q *compatibilityQuery) Exec(ctx context.Context) (ret *promql.Result) { + span, ctx := tracing.StartSpanFromContext(ctx, "compatibilityQuery.Exec") + defer span.Finish() + span.SetTag("query_type", q.t.String()) + span.SetTag("query_string", q.String()) + if q.t == RangeQuery { + span.SetTag("start", q.start) + span.SetTag("end", q.end) + span.SetTag("step", q.step) + } else { + span.SetTag("timestamp", q.ts) + } + idx, err := q.engine.activeQueryTracker.Insert(ctx, q.String()) if err != nil { + tracing.LogError(span, err) return &promql.Result{Err: err} } defer q.engine.activeQueryTracker.Delete(idx) @@ -557,8 +611,11 @@ func (q *compatibilityQuery) Exec(ctx context.Context) (ret *promql.Result) { defer cancel() q.cancel = cancel + seriesSpan := tracing.ChildSpan(span, "get_series") resultSeries, err := q.Query.exec.Series(ctx) + seriesSpan.Finish() if err != nil { + tracing.LogError(span, err) return newErrResult(ret, err) } @@ -566,14 +623,20 @@ func (q *compatibilityQuery) Exec(ctx context.Context) (ret *promql.Result) { for i, s := range resultSeries { series[i].Metric = s } + + samplesSpan := tracing.ChildSpan(span, "collect_samples") loop: for { select { case <-ctx.Done(): + tracing.LogError(samplesSpan, ctx.Err()) + samplesSpan.Finish() return newErrResult(ret, ctx.Err()) default: r, err := q.Query.exec.Next(ctx) if err != nil { + tracing.LogError(samplesSpan, err) + samplesSpan.Finish() return newErrResult(ret, err) } if r == nil { @@ -610,6 +673,10 @@ loop: q.Query.exec.GetPool().PutVectors(r) } } + samplesSpan.Finish() + + resultSpan := tracing.ChildSpan(span, "prepare_result") + defer resultSpan.Finish() // For range Query we expect always a Matrix value type. if q.t == RangeQuery { @@ -623,7 +690,9 @@ loop: sort.Sort(matrix) ret.Value = matrix if matrix.ContainsSameLabelset() { - return newErrResult(ret, extlabels.ErrDuplicateLabelSet) + err := extlabels.ErrDuplicateLabelSet + tracing.LogError(resultSpan, err) + return newErrResult(ret, err) } return ret } @@ -657,7 +726,9 @@ loop: } sort.Slice(vector, q.resultSort.comparer(&vector)) if vector.ContainsSameLabelset() { - return newErrResult(ret, extlabels.ErrDuplicateLabelSet) + err := extlabels.ErrDuplicateLabelSet + tracing.LogError(resultSpan, err) + return newErrResult(ret, err) } result = vector case parser.ValueTypeScalar: @@ -667,7 +738,9 @@ loop: } result = promql.Scalar{V: v, T: q.ts.UnixMilli()} default: - panic(errors.Newf("new.Engine.exec: unexpected expression type %q", q.plan.Root().ReturnType())) + err := errors.Newf("new.Engine.exec: unexpected expression type %q", q.plan.Root().ReturnType()) + tracing.LogError(resultSpan, err) + panic(err) } ret.Value = result diff --git a/execution/execution.go b/execution/execution.go index 496b2940..fc5e83ff 100644 --- a/execution/execution.go +++ b/execution/execution.go @@ -35,6 +35,7 @@ import ( "github.com/thanos-io/promql-engine/logicalplan" "github.com/thanos-io/promql-engine/query" "github.com/thanos-io/promql-engine/storage" + "github.com/thanos-io/promql-engine/tracing" "github.com/efficientgo/core/errors" "github.com/prometheus/prometheus/promql" @@ -45,6 +46,10 @@ import ( // New creates new physical query execution for a given query expression which represents logical plan. // TODO(bwplotka): Add definition (could be parameters for each execution operator) we can optimize - it would represent physical plan. func New(ctx context.Context, expr logicalplan.Node, storage storage.Scanners, opts *query.Options) (model.VectorOperator, error) { + span, ctx := tracing.StartSpanFromContext(ctx, "execution.New") + defer span.Finish() + span.SetTag("expr_type", parser.DocumentedType(expr.ReturnType())) + hints := promstorage.SelectHints{ Start: opts.Start.UnixMilli(), End: opts.End.UnixMilli(), @@ -90,6 +95,7 @@ func newVectorSelector(ctx context.Context, e *logicalplan.VectorSelector, scann start, end := getTimeRangesForVectorSelector(e, opts, 0) hints.Start = start hints.End = end + return scanners.NewVectorSelector(ctx, opts, hints, *e) } diff --git a/go.mod b/go.mod index 6eacdc7f..02e2264b 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/efficientgo/core v1.0.0-rc.2 github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb github.com/google/go-cmp v0.7.0 + github.com/opentracing/opentracing-go v1.2.0 github.com/prometheus/client_golang v1.20.5 github.com/prometheus/common v0.61.0 github.com/prometheus/prometheus v0.301.0 diff --git a/go.sum b/go.sum index 69665a00..342c03e6 100644 --- a/go.sum +++ b/go.sum @@ -207,6 +207,8 @@ github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8 github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.0.2 h1:9yCKha/T5XdGtO0q9Q9a6T5NUCsTn/DrBg0D7ufOcFM= github.com/opencontainers/image-spec v1.0.2/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0= +github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= +github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= github.com/ovh/go-ovh v1.6.0 h1:ixLOwxQdzYDx296sXcgS35TOPEahJkpjMGtzPadCjQI= github.com/ovh/go-ovh v1.6.0/go.mod h1:cTVDnl94z4tl8pP1uZ/8jlVxntjSIf09bNcQ5TJSC7c= github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ= @@ -243,6 +245,7 @@ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSS github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= diff --git a/tracing/tracing.go b/tracing/tracing.go new file mode 100644 index 00000000..094b07fd --- /dev/null +++ b/tracing/tracing.go @@ -0,0 +1,118 @@ +// Copyright (c) The Thanos Community Authors. +// Licensed under the Apache License 2.0. + +// Package tracing adds OpenTracing support to the promql-engine library. +// It allows you to trace the execution of PromQL queries to understand performance bottlenecks and debug query issues. +// For now, we are using the OpenTracing API, but we plan to move to OpenTelemetry in the future (once Thanos also does the move). +// See https://github.com/thanos-io/thanos/issues/1972 for more details. +// +// # Setup +// +// To use tracing with promql-engine, you need to: +// +// 1. Initialize an OpenTracing compatible tracer (such as Jaeger, Zipkin, etc.) +// 2. Set the tracer using the SetTracer function +// 3. Use the engine normally - all query operations will automatically be traced +// +// import ( +// "github.com/opentracing/opentracing-go" +// "github.com/thanos-io/promql-engine/engine" +// "github.com/thanos-io/promql-engine/tracing" +// // Your tracer implementation, e.g., jaeger +// ) +// +// func main() { +// // Initialize your tracer +// tracer := initTracer() +// +// // Set the tracer to be used by promql-engine +// tracing.SetTracer(tracer) +// +// // Initialize the engine as usual +// eng := engine.New(engine.Opts{ +// // Your engine options here +// }) +// +// // Use the engine as normal - all operations will now be traced +// // ... +// } +// +// # Traced Operations +// +// The following operations are automatically traced: +// +// 1. Main query creation (MakeInstantQuery and MakeRangeQuery) +// 2. Query execution (Exec) +// 3. Plan optimization +// 4. Operator creation +// 5. Storage operations + +package tracing + +import ( + "context" + "sync" + + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" + "github.com/opentracing/opentracing-go/log" +) + +var ( + // tracer is the global tracer used by the promql-engine. + tracer = opentracing.GlobalTracer() + mu sync.RWMutex +) + +// SetTracer sets the global tracer to be used by promql-engine. +// If nil is passed, it reverts to using the opentracing.GlobalTracer(). +func SetTracer(t opentracing.Tracer) { + mu.Lock() + defer mu.Unlock() + if t == nil { + tracer = opentracing.GlobalTracer() + return + } + tracer = t +} + +// GetTracer returns the currently active tracer. +func GetTracer() opentracing.Tracer { + mu.RLock() + defer mu.RUnlock() + return tracer +} + +// StartSpan starts a new span with the given operation name. +// It uses the global tracer set by SetTracer or opentracing.GlobalTracer() if none was set. +func StartSpan(operationName string, opts ...opentracing.StartSpanOption) opentracing.Span { + return GetTracer().StartSpan(operationName, opts...) +} + +// StartSpanFromContext starts a new span from the given context. +// It returns the span and a context that includes the span. +func StartSpanFromContext(ctx context.Context, operationName string, opts ...opentracing.StartSpanOption) (opentracing.Span, context.Context) { + return opentracing.StartSpanFromContextWithTracer(ctx, GetTracer(), operationName, opts...) +} + +// InjectSpan injects a span into a carrier for transport across process boundaries. +func InjectSpan(span opentracing.Span, format interface{}, carrier interface{}) error { + return GetTracer().Inject(span.Context(), format, carrier) +} + +// ExtractSpan extracts a span from a carrier. +func ExtractSpan(format interface{}, carrier interface{}) (opentracing.SpanContext, error) { + return GetTracer().Extract(format, carrier) +} + +// LogError logs an error to the span and sets the span to error state. +func LogError(span opentracing.Span, err error) { + ext.Error.Set(span, true) + span.LogFields(log.Error(err)) +} + +// ChildSpan creates a child span with the given operation name from a parent span. +func ChildSpan(parent opentracing.Span, operationName string, opts ...opentracing.StartSpanOption) opentracing.Span { + childOpts := append([]opentracing.StartSpanOption{opentracing.ChildOf(parent.Context())}, opts...) + return StartSpan(operationName, childOpts...) +}