Skip to content

Commit b9faa50

Browse files
committed
Tracing: enable tracing for main query execution steps
Signed-off-by: Pedro Tanaka <[email protected]> fixing go mods and docs Signed-off-by: Pedro Tanaka <[email protected]> move docs to go package Signed-off-by: Pedro Tanaka <[email protected]>
1 parent 1c74864 commit b9faa50

File tree

5 files changed

+247
-10
lines changed

5 files changed

+247
-10
lines changed

engine/engine.go

Lines changed: 83 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/thanos-io/promql-engine/query"
3434
engstorage "github.com/thanos-io/promql-engine/storage"
3535
promstorage "github.com/thanos-io/promql-engine/storage/prometheus"
36+
"github.com/thanos-io/promql-engine/tracing"
3637
)
3738

3839
type QueryType int
@@ -237,14 +238,21 @@ type Engine struct {
237238
}
238239

239240
func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, ts time.Time) (promql.Query, error) {
241+
span, ctx := tracing.StartSpanFromContext(ctx, "engine.MakeInstantQuery")
242+
defer span.Finish()
243+
span.SetTag("query", qs)
244+
span.SetTag("timestamp", ts.Unix())
245+
240246
idx, err := e.activeQueryTracker.Insert(ctx, qs)
241247
if err != nil {
248+
tracing.LogError(span, err)
242249
return nil, err
243250
}
244251
defer e.activeQueryTracker.Delete(idx)
245252

246253
expr, err := parser.NewParser(qs, parser.WithFunctions(e.functions)).ParseExpr()
247254
if err != nil {
255+
tracing.LogError(span, err)
248256
return nil, err
249257
}
250258
// determine sorting order before optimizers run, we do this by looking for "sort"
@@ -254,23 +262,35 @@ func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts
254262

255263
qOpts := e.makeQueryOpts(ts, ts, 0, opts)
256264
if qOpts.StepsBatch > 64 {
257-
return nil, ErrStepsBatchTooLarge
265+
err := ErrStepsBatchTooLarge
266+
tracing.LogError(span, err)
267+
return nil, err
258268
}
259269

260270
planOpts := logicalplan.PlanOptions{
261271
DisableDuplicateLabelCheck: e.disableDuplicateLabelChecks,
262272
}
273+
274+
optimizeSpan := tracing.ChildSpan(span, "optimize_plan")
263275
lplan, warns := logicalplan.NewFromAST(expr, qOpts, planOpts).Optimize(e.getLogicalOptimizers(opts))
276+
optimizeSpan.Finish()
264277

278+
ctx = warnings.NewContext(ctx)
279+
defer func() { warns.Merge(warnings.FromContext(ctx)) }()
280+
281+
scannersSpan := tracing.ChildSpan(span, "create_storage_scanners")
265282
scanners, err := e.storageScanners(q, qOpts, lplan)
283+
scannersSpan.Finish()
266284
if err != nil {
285+
tracing.LogError(span, err)
267286
return nil, errors.Wrap(err, "creating storage scanners")
268287
}
269288

270-
ctx = warnings.NewContext(ctx)
271-
defer func() { warns.Merge(warnings.FromContext(ctx)) }()
289+
execSpan := tracing.ChildSpan(span, "create_execution")
272290
exec, err := execution.New(ctx, lplan.Root(), scanners, qOpts)
291+
execSpan.Finish()
273292
if err != nil {
293+
tracing.LogError(span, err)
274294
return nil, err
275295
}
276296
e.metrics.totalQueries.Inc()
@@ -336,39 +356,62 @@ func (e *Engine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryab
336356
}
337357

338358
func (e *Engine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, start, end time.Time, step time.Duration) (promql.Query, error) {
359+
span, ctx := tracing.StartSpanFromContext(ctx, "engine.MakeRangeQuery")
360+
defer span.Finish()
361+
span.SetTag("query", qs)
362+
span.SetTag("start", start.Unix())
363+
span.SetTag("end", end.Unix())
364+
span.SetTag("step", step.String())
365+
339366
idx, err := e.activeQueryTracker.Insert(ctx, qs)
340367
if err != nil {
368+
tracing.LogError(span, err)
341369
return nil, err
342370
}
343371
defer e.activeQueryTracker.Delete(idx)
344372

345373
expr, err := parser.NewParser(qs, parser.WithFunctions(e.functions)).ParseExpr()
346374
if err != nil {
375+
tracing.LogError(span, err)
347376
return nil, err
348377
}
349378

350379
// Use same check as Prometheus for range queries.
351380
if expr.Type() != parser.ValueTypeVector && expr.Type() != parser.ValueTypeScalar {
352-
return nil, errors.Newf("invalid expression type %q for range query, must be Scalar or instant Vector", parser.DocumentedType(expr.Type()))
381+
err := errors.Newf("invalid expression type %q for range query, must be Scalar or instant Vector", parser.DocumentedType(expr.Type()))
382+
tracing.LogError(span, err)
383+
return nil, err
353384
}
354385
qOpts := e.makeQueryOpts(start, end, step, opts)
355386
if qOpts.StepsBatch > 64 {
356-
return nil, ErrStepsBatchTooLarge
387+
err := ErrStepsBatchTooLarge
388+
tracing.LogError(span, err)
389+
return nil, err
357390
}
358391
planOpts := logicalplan.PlanOptions{
359392
DisableDuplicateLabelCheck: e.disableDuplicateLabelChecks,
360393
}
394+
395+
optimizeSpan := tracing.ChildSpan(span, "optimize_plan")
361396
lplan, warns := logicalplan.NewFromAST(expr, qOpts, planOpts).Optimize(e.getLogicalOptimizers(opts))
397+
optimizeSpan.Finish()
362398

363399
ctx = warnings.NewContext(ctx)
364400
defer func() { warns.Merge(warnings.FromContext(ctx)) }()
401+
402+
scannersSpan := tracing.ChildSpan(span, "create_storage_scanners")
365403
scnrs, err := e.storageScanners(q, qOpts, lplan)
404+
scannersSpan.Finish()
366405
if err != nil {
406+
tracing.LogError(span, err)
367407
return nil, errors.Wrap(err, "creating storage scanners")
368408
}
369409

410+
execSpan := tracing.ChildSpan(span, "create_execution")
370411
exec, err := execution.New(ctx, lplan.Root(), scnrs, qOpts)
412+
execSpan.Finish()
371413
if err != nil {
414+
tracing.LogError(span, err)
372415
return nil, err
373416
}
374417
e.metrics.totalQueries.Inc()
@@ -528,8 +571,21 @@ type compatibilityQuery struct {
528571
}
529572

530573
func (q *compatibilityQuery) Exec(ctx context.Context) (ret *promql.Result) {
574+
span, ctx := tracing.StartSpanFromContext(ctx, "compatibilityQuery.Exec")
575+
defer span.Finish()
576+
span.SetTag("query_type", q.t)
577+
span.SetTag("query_string", q.String())
578+
if q.t == RangeQuery {
579+
span.SetTag("start", q.start.Unix())
580+
span.SetTag("end", q.end.Unix())
581+
span.SetTag("step", q.step.String())
582+
} else {
583+
span.SetTag("timestamp", q.ts.Unix())
584+
}
585+
531586
idx, err := q.engine.activeQueryTracker.Insert(ctx, q.String())
532587
if err != nil {
588+
tracing.LogError(span, err)
533589
return &promql.Result{Err: err}
534590
}
535591
defer q.engine.activeQueryTracker.Delete(idx)
@@ -557,23 +613,32 @@ func (q *compatibilityQuery) Exec(ctx context.Context) (ret *promql.Result) {
557613
defer cancel()
558614
q.cancel = cancel
559615

616+
seriesSpan := tracing.ChildSpan(span, "get_series")
560617
resultSeries, err := q.Query.exec.Series(ctx)
618+
seriesSpan.Finish()
561619
if err != nil {
620+
tracing.LogError(span, err)
562621
return newErrResult(ret, err)
563622
}
564623

565624
series := make([]promql.Series, len(resultSeries))
566625
for i, s := range resultSeries {
567626
series[i].Metric = s
568627
}
628+
629+
samplesSpan := tracing.ChildSpan(span, "collect_samples")
569630
loop:
570631
for {
571632
select {
572633
case <-ctx.Done():
634+
tracing.LogError(samplesSpan, ctx.Err())
635+
samplesSpan.Finish()
573636
return newErrResult(ret, ctx.Err())
574637
default:
575638
r, err := q.Query.exec.Next(ctx)
576639
if err != nil {
640+
tracing.LogError(samplesSpan, err)
641+
samplesSpan.Finish()
577642
return newErrResult(ret, err)
578643
}
579644
if r == nil {
@@ -610,6 +675,10 @@ loop:
610675
q.Query.exec.GetPool().PutVectors(r)
611676
}
612677
}
678+
samplesSpan.Finish()
679+
680+
resultSpan := tracing.ChildSpan(span, "prepare_result")
681+
defer resultSpan.Finish()
613682

614683
// For range Query we expect always a Matrix value type.
615684
if q.t == RangeQuery {
@@ -623,7 +692,9 @@ loop:
623692
sort.Sort(matrix)
624693
ret.Value = matrix
625694
if matrix.ContainsSameLabelset() {
626-
return newErrResult(ret, extlabels.ErrDuplicateLabelSet)
695+
err := extlabels.ErrDuplicateLabelSet
696+
tracing.LogError(resultSpan, err)
697+
return newErrResult(ret, err)
627698
}
628699
return ret
629700
}
@@ -657,7 +728,9 @@ loop:
657728
}
658729
sort.Slice(vector, q.resultSort.comparer(&vector))
659730
if vector.ContainsSameLabelset() {
660-
return newErrResult(ret, extlabels.ErrDuplicateLabelSet)
731+
err := extlabels.ErrDuplicateLabelSet
732+
tracing.LogError(resultSpan, err)
733+
return newErrResult(ret, err)
661734
}
662735
result = vector
663736
case parser.ValueTypeScalar:
@@ -667,7 +740,9 @@ loop:
667740
}
668741
result = promql.Scalar{V: v, T: q.ts.UnixMilli()}
669742
default:
670-
panic(errors.Newf("new.Engine.exec: unexpected expression type %q", q.plan.Root().ReturnType()))
743+
err := errors.Newf("new.Engine.exec: unexpected expression type %q", q.plan.Root().ReturnType())
744+
tracing.LogError(resultSpan, err)
745+
panic(err)
671746
}
672747

673748
ret.Value = result

execution/execution.go

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,16 @@ import (
4040
"github.com/thanos-io/promql-engine/logicalplan"
4141
"github.com/thanos-io/promql-engine/query"
4242
"github.com/thanos-io/promql-engine/storage"
43+
"github.com/thanos-io/promql-engine/tracing"
4344
)
4445

4546
// New creates new physical query execution for a given query expression which represents logical plan.
4647
// TODO(bwplotka): Add definition (could be parameters for each execution operator) we can optimize - it would represent physical plan.
4748
func New(ctx context.Context, expr logicalplan.Node, storage storage.Scanners, opts *query.Options) (model.VectorOperator, error) {
49+
span, ctx := tracing.StartSpanFromContext(ctx, "execution.New")
50+
defer span.Finish()
51+
span.SetTag("expr_type", parser.DocumentedType(expr.ReturnType()))
52+
4853
hints := promstorage.SelectHints{
4954
Start: opts.Start.UnixMilli(),
5055
End: opts.End.UnixMilli(),
@@ -54,43 +59,79 @@ func New(ctx context.Context, expr logicalplan.Node, storage storage.Scanners, o
5459
}
5560

5661
func newOperator(ctx context.Context, expr logicalplan.Node, storage storage.Scanners, opts *query.Options, hints promstorage.SelectHints) (model.VectorOperator, error) {
62+
span, ctx := tracing.StartSpanFromContext(ctx, "execution.newOperator")
63+
defer span.Finish()
64+
span.SetTag("expr_type", parser.DocumentedType(expr.ReturnType()))
65+
span.SetTag("expr", expr.String())
66+
5767
switch e := expr.(type) {
5868
case *logicalplan.NumberLiteral:
69+
span.SetTag("operator_type", "number_literal")
5970
return scan.NewNumberLiteralSelector(model.NewVectorPool(opts.StepsBatch), opts, e.Val), nil
6071
case *logicalplan.VectorSelector:
72+
span.SetTag("operator_type", "vector_selector")
6173
return newVectorSelector(ctx, e, storage, opts, hints)
6274
case *logicalplan.FunctionCall:
75+
span.SetTag("operator_type", "function_call")
76+
span.SetTag("function_name", e.Func.Name)
6377
return newCall(ctx, e, storage, opts, hints)
6478
case *logicalplan.Aggregation:
79+
span.SetTag("operator_type", "aggregation")
80+
span.SetTag("aggregation_op", e.Op.String())
6581
return newAggregateExpression(ctx, e, storage, opts, hints)
6682
case *logicalplan.Binary:
83+
span.SetTag("operator_type", "binary")
84+
span.SetTag("binary_op", e.Op.String())
6785
return newBinaryExpression(ctx, e, storage, opts, hints)
6886
case *logicalplan.Parens:
87+
span.SetTag("operator_type", "parens")
6988
return newOperator(ctx, e.Expr, storage, opts, hints)
7089
case *logicalplan.Unary:
90+
span.SetTag("operator_type", "unary")
91+
span.SetTag("unary_op", e.Op.String())
7192
return newUnaryExpression(ctx, e, storage, opts, hints)
7293
case *logicalplan.StepInvariantExpr:
94+
span.SetTag("operator_type", "step_invariant")
7395
return newStepInvariantExpression(ctx, e, storage, opts, hints)
7496
case logicalplan.Deduplicate:
97+
span.SetTag("operator_type", "deduplicate")
7598
return newDeduplication(ctx, e, storage, opts, hints)
7699
case logicalplan.RemoteExecution:
100+
span.SetTag("operator_type", "remote_execution")
77101
return newRemoteExecution(ctx, e, opts, hints)
78102
case *logicalplan.CheckDuplicateLabels:
103+
span.SetTag("operator_type", "check_duplicate_labels")
79104
return newDuplicateLabelCheck(ctx, e, storage, opts, hints)
80105
case logicalplan.Noop:
106+
span.SetTag("operator_type", "noop")
81107
return noop.NewOperator(opts), nil
82108
case logicalplan.UserDefinedExpr:
109+
span.SetTag("operator_type", "user_defined")
83110
return e.MakeExecutionOperator(ctx, model.NewVectorPool(opts.StepsBatch), opts, hints)
84111
default:
85-
return nil, errors.Wrapf(parse.ErrNotSupportedExpr, "got: %s (%T)", e, e)
112+
err := errors.Wrapf(parse.ErrNotSupportedExpr, "got: %s (%T)", e, e)
113+
tracing.LogError(span, err)
114+
return nil, err
86115
}
87116
}
88117

89118
func newVectorSelector(ctx context.Context, e *logicalplan.VectorSelector, scanners storage.Scanners, opts *query.Options, hints promstorage.SelectHints) (model.VectorOperator, error) {
119+
span, ctx := tracing.StartSpanFromContext(ctx, "execution.newVectorSelector")
120+
defer span.Finish()
121+
span.SetTag("vector_selector", e.String())
122+
90123
start, end := getTimeRangesForVectorSelector(e, opts, 0)
91124
hints.Start = start
92125
hints.End = end
93-
return scanners.NewVectorSelector(ctx, opts, hints, *e)
126+
127+
span.SetTag("start_time", start)
128+
span.SetTag("end_time", end)
129+
130+
op, err := scanners.NewVectorSelector(ctx, opts, hints, *e)
131+
if err != nil {
132+
tracing.LogError(span, err)
133+
}
134+
return op, err
94135
}
95136

96137
func newCall(ctx context.Context, e *logicalplan.FunctionCall, scanners storage.Scanners, opts *query.Options, hints promstorage.SelectHints) (model.VectorOperator, error) {

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ require (
88
github.com/efficientgo/core v1.0.0-rc.2
99
github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb
1010
github.com/google/go-cmp v0.7.0
11+
github.com/opentracing/opentracing-go v1.2.0
1112
github.com/prometheus/client_golang v1.20.5
1213
github.com/prometheus/common v0.61.0
1314
github.com/prometheus/prometheus v0.301.0

go.sum

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,8 @@ github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8
207207
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
208208
github.com/opencontainers/image-spec v1.0.2 h1:9yCKha/T5XdGtO0q9Q9a6T5NUCsTn/DrBg0D7ufOcFM=
209209
github.com/opencontainers/image-spec v1.0.2/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0=
210+
github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs=
211+
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
210212
github.com/ovh/go-ovh v1.6.0 h1:ixLOwxQdzYDx296sXcgS35TOPEahJkpjMGtzPadCjQI=
211213
github.com/ovh/go-ovh v1.6.0/go.mod h1:cTVDnl94z4tl8pP1uZ/8jlVxntjSIf09bNcQ5TJSC7c=
212214
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ=
@@ -243,6 +245,7 @@ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSS
243245
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
244246
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
245247
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
248+
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
246249
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
247250
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
248251
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=

0 commit comments

Comments
 (0)