Skip to content

Tracing: enable tracing for main query execution steps #544

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 81 additions & 8 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/thanos-io/promql-engine/query"
engstorage "github.com/thanos-io/promql-engine/storage"
promstorage "github.com/thanos-io/promql-engine/storage/prometheus"
"github.com/thanos-io/promql-engine/tracing"

"github.com/efficientgo/core/errors"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -50,6 +51,16 @@ const (
stepsBatch = 10
)

func (q QueryType) String() string {
switch q {
case InstantQuery:
return "InstantQuery"
case RangeQuery:
return "RangeQuery"
}

return "Unknown"
}
func IsUnimplemented(err error) bool {
return errors.Is(err, parse.ErrNotSupportedExpr) || errors.Is(err, parse.ErrNotImplemented)
}
Expand Down Expand Up @@ -237,14 +248,21 @@ type Engine struct {
}

func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, ts time.Time) (promql.Query, error) {
span, ctx := tracing.StartSpanFromContext(ctx, "engine.MakeInstantQuery")
defer span.Finish()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we expect the engine to take a lot of time before query execution phase? Does these spans add value?

span.SetTag("query", qs)
span.SetTag("timestamp", ts.Unix())

idx, err := e.activeQueryTracker.Insert(ctx, qs)
if err != nil {
tracing.LogError(span, err)
return nil, err
}
defer e.activeQueryTracker.Delete(idx)

expr, err := parser.NewParser(qs, parser.WithFunctions(e.functions)).ParseExpr()
if err != nil {
tracing.LogError(span, err)
return nil, err
}
// determine sorting order before optimizers run, we do this by looking for "sort"
Expand All @@ -254,23 +272,29 @@ func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts

qOpts := e.makeQueryOpts(ts, ts, 0, opts)
if qOpts.StepsBatch > 64 {
return nil, ErrStepsBatchTooLarge
err := ErrStepsBatchTooLarge
tracing.LogError(span, err)
return nil, err
}

planOpts := logicalplan.PlanOptions{
DisableDuplicateLabelCheck: e.disableDuplicateLabelChecks,
}

lplan, warns := logicalplan.NewFromAST(expr, qOpts, planOpts).Optimize(e.getLogicalOptimizers(opts))

ctx = warnings.NewContext(ctx)
defer func() { warns.Merge(warnings.FromContext(ctx)) }()

scanners, err := e.storageScanners(q, qOpts, lplan)
if err != nil {
tracing.LogError(span, err)
return nil, errors.Wrap(err, "creating storage scanners")
}

ctx = warnings.NewContext(ctx)
defer func() { warns.Merge(warnings.FromContext(ctx)) }()
exec, err := execution.New(ctx, lplan.Root(), scanners, qOpts)
if err != nil {
tracing.LogError(span, err)
return nil, err
}
e.metrics.totalQueries.Inc()
Expand Down Expand Up @@ -336,39 +360,56 @@ func (e *Engine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryab
}

func (e *Engine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, start, end time.Time, step time.Duration) (promql.Query, error) {
span, ctx := tracing.StartSpanFromContext(ctx, "engine.MakeRangeQuery")
defer span.Finish()
span.SetTag("query", qs)
span.SetTag("start", start.Unix())
span.SetTag("end", end.Unix())
span.SetTag("step", step.String())

idx, err := e.activeQueryTracker.Insert(ctx, qs)
if err != nil {
tracing.LogError(span, err)
return nil, err
}
defer e.activeQueryTracker.Delete(idx)

expr, err := parser.NewParser(qs, parser.WithFunctions(e.functions)).ParseExpr()
if err != nil {
tracing.LogError(span, err)
return nil, err
}

// Use same check as Prometheus for range queries.
if expr.Type() != parser.ValueTypeVector && expr.Type() != parser.ValueTypeScalar {
return nil, errors.Newf("invalid expression type %q for range query, must be Scalar or instant Vector", parser.DocumentedType(expr.Type()))
err := errors.Newf("invalid expression type %q for range query, must be Scalar or instant Vector", parser.DocumentedType(expr.Type()))
tracing.LogError(span, err)
return nil, err
}
qOpts := e.makeQueryOpts(start, end, step, opts)
if qOpts.StepsBatch > 64 {
return nil, ErrStepsBatchTooLarge
err := ErrStepsBatchTooLarge
tracing.LogError(span, err)
return nil, err
}
planOpts := logicalplan.PlanOptions{
DisableDuplicateLabelCheck: e.disableDuplicateLabelChecks,
}

lplan, warns := logicalplan.NewFromAST(expr, qOpts, planOpts).Optimize(e.getLogicalOptimizers(opts))

ctx = warnings.NewContext(ctx)
defer func() { warns.Merge(warnings.FromContext(ctx)) }()

scnrs, err := e.storageScanners(q, qOpts, lplan)
if err != nil {
tracing.LogError(span, err)
return nil, errors.Wrap(err, "creating storage scanners")
}

exec, err := execution.New(ctx, lplan.Root(), scnrs, qOpts)
if err != nil {
tracing.LogError(span, err)
return nil, err
}
e.metrics.totalQueries.Inc()
Expand Down Expand Up @@ -528,8 +569,21 @@ type compatibilityQuery struct {
}

func (q *compatibilityQuery) Exec(ctx context.Context) (ret *promql.Result) {
span, ctx := tracing.StartSpanFromContext(ctx, "compatibilityQuery.Exec")
defer span.Finish()
span.SetTag("query_type", q.t.String())
span.SetTag("query_string", q.String())
if q.t == RangeQuery {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should we just set start=end and step=0 for instant query?

span.SetTag("start", q.start)
span.SetTag("end", q.end)
span.SetTag("step", q.step)
} else {
span.SetTag("timestamp", q.ts)
}

idx, err := q.engine.activeQueryTracker.Insert(ctx, q.String())
if err != nil {
tracing.LogError(span, err)
return &promql.Result{Err: err}
}
defer q.engine.activeQueryTracker.Delete(idx)
Expand Down Expand Up @@ -557,23 +611,32 @@ func (q *compatibilityQuery) Exec(ctx context.Context) (ret *promql.Result) {
defer cancel()
q.cancel = cancel

seriesSpan := tracing.ChildSpan(span, "get_series")
resultSeries, err := q.Query.exec.Series(ctx)
seriesSpan.Finish()
if err != nil {
tracing.LogError(span, err)
return newErrResult(ret, err)
}

series := make([]promql.Series, len(resultSeries))
for i, s := range resultSeries {
series[i].Metric = s
}

samplesSpan := tracing.ChildSpan(span, "collect_samples")
loop:
for {
select {
case <-ctx.Done():
tracing.LogError(samplesSpan, ctx.Err())
samplesSpan.Finish()
return newErrResult(ret, ctx.Err())
default:
r, err := q.Query.exec.Next(ctx)
if err != nil {
tracing.LogError(samplesSpan, err)
samplesSpan.Finish()
return newErrResult(ret, err)
}
if r == nil {
Expand Down Expand Up @@ -610,6 +673,10 @@ loop:
q.Query.exec.GetPool().PutVectors(r)
}
}
samplesSpan.Finish()

resultSpan := tracing.ChildSpan(span, "prepare_result")
defer resultSpan.Finish()

// For range Query we expect always a Matrix value type.
if q.t == RangeQuery {
Expand All @@ -623,7 +690,9 @@ loop:
sort.Sort(matrix)
ret.Value = matrix
if matrix.ContainsSameLabelset() {
return newErrResult(ret, extlabels.ErrDuplicateLabelSet)
err := extlabels.ErrDuplicateLabelSet
tracing.LogError(resultSpan, err)
return newErrResult(ret, err)
}
return ret
}
Expand Down Expand Up @@ -657,7 +726,9 @@ loop:
}
sort.Slice(vector, q.resultSort.comparer(&vector))
if vector.ContainsSameLabelset() {
return newErrResult(ret, extlabels.ErrDuplicateLabelSet)
err := extlabels.ErrDuplicateLabelSet
tracing.LogError(resultSpan, err)
return newErrResult(ret, err)
}
result = vector
case parser.ValueTypeScalar:
Expand All @@ -667,7 +738,9 @@ loop:
}
result = promql.Scalar{V: v, T: q.ts.UnixMilli()}
default:
panic(errors.Newf("new.Engine.exec: unexpected expression type %q", q.plan.Root().ReturnType()))
err := errors.Newf("new.Engine.exec: unexpected expression type %q", q.plan.Root().ReturnType())
tracing.LogError(resultSpan, err)
panic(err)
}

ret.Value = result
Expand Down
6 changes: 6 additions & 0 deletions execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/thanos-io/promql-engine/logicalplan"
"github.com/thanos-io/promql-engine/query"
"github.com/thanos-io/promql-engine/storage"
"github.com/thanos-io/promql-engine/tracing"

"github.com/efficientgo/core/errors"
"github.com/prometheus/prometheus/promql"
Expand All @@ -45,6 +46,10 @@ import (
// New creates new physical query execution for a given query expression which represents logical plan.
// TODO(bwplotka): Add definition (could be parameters for each execution operator) we can optimize - it would represent physical plan.
func New(ctx context.Context, expr logicalplan.Node, storage storage.Scanners, opts *query.Options) (model.VectorOperator, error) {
span, ctx := tracing.StartSpanFromContext(ctx, "execution.New")
defer span.Finish()
span.SetTag("expr_type", parser.DocumentedType(expr.ReturnType()))

hints := promstorage.SelectHints{
Start: opts.Start.UnixMilli(),
End: opts.End.UnixMilli(),
Expand Down Expand Up @@ -90,6 +95,7 @@ func newVectorSelector(ctx context.Context, e *logicalplan.VectorSelector, scann
start, end := getTimeRangesForVectorSelector(e, opts, 0)
hints.Start = start
hints.End = end

return scanners.NewVectorSelector(ctx, opts, hints, *e)
}

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/efficientgo/core v1.0.0-rc.2
github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb
github.com/google/go-cmp v0.7.0
github.com/opentracing/opentracing-go v1.2.0
github.com/prometheus/client_golang v1.20.5
github.com/prometheus/common v0.61.0
github.com/prometheus/prometheus v0.301.0
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/image-spec v1.0.2 h1:9yCKha/T5XdGtO0q9Q9a6T5NUCsTn/DrBg0D7ufOcFM=
github.com/opencontainers/image-spec v1.0.2/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0=
github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs=
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
github.com/ovh/go-ovh v1.6.0 h1:ixLOwxQdzYDx296sXcgS35TOPEahJkpjMGtzPadCjQI=
github.com/ovh/go-ovh v1.6.0/go.mod h1:cTVDnl94z4tl8pP1uZ/8jlVxntjSIf09bNcQ5TJSC7c=
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ=
Expand Down Expand Up @@ -243,6 +245,7 @@ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSS
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
Expand Down
Loading
Loading