Skip to content

Commit 7a180a6

Browse files
committed
Tracing: enable tracing for main query execution steps
Signed-off-by: Pedro Tanaka <[email protected]> fixing go mods and docs Signed-off-by: Pedro Tanaka <[email protected]> move docs to go package Signed-off-by: Pedro Tanaka <[email protected]>
1 parent 4055a11 commit 7a180a6

File tree

5 files changed

+340
-10
lines changed

5 files changed

+340
-10
lines changed

engine/engine.go

Lines changed: 93 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/prometheus/prometheus/storage"
3434
"github.com/prometheus/prometheus/util/annotations"
3535
"github.com/prometheus/prometheus/util/stats"
36+
"github.com/thanos-io/promql-engine/tracing"
3637
)
3738

3839
type QueryType int
@@ -50,6 +51,16 @@ const (
5051
stepsBatch = 10
5152
)
5253

54+
func (q QueryType) String() string {
55+
switch q {
56+
case InstantQuery:
57+
return "InstantQuery"
58+
case RangeQuery:
59+
return "RangeQuery"
60+
}
61+
62+
return "Unknown"
63+
}
5364
func IsUnimplemented(err error) bool {
5465
return errors.Is(err, parse.ErrNotSupportedExpr) || errors.Is(err, parse.ErrNotImplemented)
5566
}
@@ -237,14 +248,21 @@ type Engine struct {
237248
}
238249

239250
func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, ts time.Time) (promql.Query, error) {
251+
span, ctx := tracing.StartSpanFromContext(ctx, "engine.MakeInstantQuery")
252+
defer span.Finish()
253+
span.SetTag("query", qs)
254+
span.SetTag("timestamp", ts.Unix())
255+
240256
idx, err := e.activeQueryTracker.Insert(ctx, qs)
241257
if err != nil {
258+
tracing.LogError(span, err)
242259
return nil, err
243260
}
244261
defer e.activeQueryTracker.Delete(idx)
245262

246263
expr, err := parser.NewParser(qs, parser.WithFunctions(e.functions)).ParseExpr()
247264
if err != nil {
265+
tracing.LogError(span, err)
248266
return nil, err
249267
}
250268
// determine sorting order before optimizers run, we do this by looking for "sort"
@@ -254,23 +272,35 @@ func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts
254272

255273
qOpts := e.makeQueryOpts(ts, ts, 0, opts)
256274
if qOpts.StepsBatch > 64 {
257-
return nil, ErrStepsBatchTooLarge
275+
err := ErrStepsBatchTooLarge
276+
tracing.LogError(span, err)
277+
return nil, err
258278
}
259279

260280
planOpts := logicalplan.PlanOptions{
261281
DisableDuplicateLabelCheck: e.disableDuplicateLabelChecks,
262282
}
283+
284+
optimizeSpan := tracing.ChildSpan(span, "optimize_plan")
263285
lplan, warns := logicalplan.NewFromAST(expr, qOpts, planOpts).Optimize(e.getLogicalOptimizers(opts))
286+
optimizeSpan.Finish()
287+
288+
ctx = warnings.NewContext(ctx)
289+
defer func() { warns.Merge(warnings.FromContext(ctx)) }()
264290

291+
scannersSpan := tracing.ChildSpan(span, "create_storage_scanners")
265292
scanners, err := e.storageScanners(q, qOpts, lplan)
293+
scannersSpan.Finish()
266294
if err != nil {
295+
tracing.LogError(span, err)
267296
return nil, errors.Wrap(err, "creating storage scanners")
268297
}
269298

270-
ctx = warnings.NewContext(ctx)
271-
defer func() { warns.Merge(warnings.FromContext(ctx)) }()
299+
execSpan := tracing.ChildSpan(span, "create_execution")
272300
exec, err := execution.New(ctx, lplan.Root(), scanners, qOpts)
301+
execSpan.Finish()
273302
if err != nil {
303+
tracing.LogError(span, err)
274304
return nil, err
275305
}
276306
e.metrics.totalQueries.Inc()
@@ -336,39 +366,62 @@ func (e *Engine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryab
336366
}
337367

338368
func (e *Engine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, start, end time.Time, step time.Duration) (promql.Query, error) {
369+
span, ctx := tracing.StartSpanFromContext(ctx, "engine.MakeRangeQuery")
370+
defer span.Finish()
371+
span.SetTag("query", qs)
372+
span.SetTag("start", start.Unix())
373+
span.SetTag("end", end.Unix())
374+
span.SetTag("step", step.String())
375+
339376
idx, err := e.activeQueryTracker.Insert(ctx, qs)
340377
if err != nil {
378+
tracing.LogError(span, err)
341379
return nil, err
342380
}
343381
defer e.activeQueryTracker.Delete(idx)
344382

345383
expr, err := parser.NewParser(qs, parser.WithFunctions(e.functions)).ParseExpr()
346384
if err != nil {
385+
tracing.LogError(span, err)
347386
return nil, err
348387
}
349388

350389
// Use same check as Prometheus for range queries.
351390
if expr.Type() != parser.ValueTypeVector && expr.Type() != parser.ValueTypeScalar {
352-
return nil, errors.Newf("invalid expression type %q for range query, must be Scalar or instant Vector", parser.DocumentedType(expr.Type()))
391+
err := errors.Newf("invalid expression type %q for range query, must be Scalar or instant Vector", parser.DocumentedType(expr.Type()))
392+
tracing.LogError(span, err)
393+
return nil, err
353394
}
354395
qOpts := e.makeQueryOpts(start, end, step, opts)
355396
if qOpts.StepsBatch > 64 {
356-
return nil, ErrStepsBatchTooLarge
397+
err := ErrStepsBatchTooLarge
398+
tracing.LogError(span, err)
399+
return nil, err
357400
}
358401
planOpts := logicalplan.PlanOptions{
359402
DisableDuplicateLabelCheck: e.disableDuplicateLabelChecks,
360403
}
404+
405+
optimizeSpan := tracing.ChildSpan(span, "optimize_plan")
361406
lplan, warns := logicalplan.NewFromAST(expr, qOpts, planOpts).Optimize(e.getLogicalOptimizers(opts))
407+
optimizeSpan.Finish()
362408

363409
ctx = warnings.NewContext(ctx)
364410
defer func() { warns.Merge(warnings.FromContext(ctx)) }()
411+
412+
scannersSpan := tracing.ChildSpan(span, "create_storage_scanners")
365413
scnrs, err := e.storageScanners(q, qOpts, lplan)
414+
scannersSpan.Finish()
366415
if err != nil {
416+
tracing.LogError(span, err)
367417
return nil, errors.Wrap(err, "creating storage scanners")
368418
}
369419

420+
execSpan := tracing.ChildSpan(span, "create_execution")
370421
exec, err := execution.New(ctx, lplan.Root(), scnrs, qOpts)
422+
execSpan.Finish()
371423
if err != nil {
424+
tracing.LogError(span, err)
372425
return nil, err
373426
}
374427
e.metrics.totalQueries.Inc()
@@ -528,8 +581,21 @@ type compatibilityQuery struct {
528581
}
529582

530583
func (q *compatibilityQuery) Exec(ctx context.Context) (ret *promql.Result) {
584+
span, ctx := tracing.StartSpanFromContext(ctx, "compatibilityQuery.Exec")
585+
defer span.Finish()
586+
span.SetTag("query_type", q.t.String())
587+
span.SetTag("query_string", q.String())
588+
if q.t == RangeQuery {
589+
span.SetTag("start", q.start)
590+
span.SetTag("end", q.end)
591+
span.SetTag("step", q.step)
592+
} else {
593+
span.SetTag("timestamp", q.ts)
594+
}
595+
531596
idx, err := q.engine.activeQueryTracker.Insert(ctx, q.String())
532597
if err != nil {
598+
tracing.LogError(span, err)
533599
return &promql.Result{Err: err}
534600
}
535601
defer q.engine.activeQueryTracker.Delete(idx)
@@ -557,23 +623,32 @@ func (q *compatibilityQuery) Exec(ctx context.Context) (ret *promql.Result) {
557623
defer cancel()
558624
q.cancel = cancel
559625

626+
seriesSpan := tracing.ChildSpan(span, "get_series")
560627
resultSeries, err := q.Query.exec.Series(ctx)
628+
seriesSpan.Finish()
561629
if err != nil {
630+
tracing.LogError(span, err)
562631
return newErrResult(ret, err)
563632
}
564633

565634
series := make([]promql.Series, len(resultSeries))
566635
for i, s := range resultSeries {
567636
series[i].Metric = s
568637
}
638+
639+
samplesSpan := tracing.ChildSpan(span, "collect_samples")
569640
loop:
570641
for {
571642
select {
572643
case <-ctx.Done():
644+
tracing.LogError(samplesSpan, ctx.Err())
645+
samplesSpan.Finish()
573646
return newErrResult(ret, ctx.Err())
574647
default:
575648
r, err := q.Query.exec.Next(ctx)
576649
if err != nil {
650+
tracing.LogError(samplesSpan, err)
651+
samplesSpan.Finish()
577652
return newErrResult(ret, err)
578653
}
579654
if r == nil {
@@ -610,6 +685,10 @@ loop:
610685
q.Query.exec.GetPool().PutVectors(r)
611686
}
612687
}
688+
samplesSpan.Finish()
689+
690+
resultSpan := tracing.ChildSpan(span, "prepare_result")
691+
defer resultSpan.Finish()
613692

614693
// For range Query we expect always a Matrix value type.
615694
if q.t == RangeQuery {
@@ -623,7 +702,9 @@ loop:
623702
sort.Sort(matrix)
624703
ret.Value = matrix
625704
if matrix.ContainsSameLabelset() {
626-
return newErrResult(ret, extlabels.ErrDuplicateLabelSet)
705+
err := extlabels.ErrDuplicateLabelSet
706+
tracing.LogError(resultSpan, err)
707+
return newErrResult(ret, err)
627708
}
628709
return ret
629710
}
@@ -657,7 +738,9 @@ loop:
657738
}
658739
sort.Slice(vector, q.resultSort.comparer(&vector))
659740
if vector.ContainsSameLabelset() {
660-
return newErrResult(ret, extlabels.ErrDuplicateLabelSet)
741+
err := extlabels.ErrDuplicateLabelSet
742+
tracing.LogError(resultSpan, err)
743+
return newErrResult(ret, err)
661744
}
662745
result = vector
663746
case parser.ValueTypeScalar:
@@ -667,7 +750,9 @@ loop:
667750
}
668751
result = promql.Scalar{V: v, T: q.ts.UnixMilli()}
669752
default:
670-
panic(errors.Newf("new.Engine.exec: unexpected expression type %q", q.plan.Root().ReturnType()))
753+
err := errors.Newf("new.Engine.exec: unexpected expression type %q", q.plan.Root().ReturnType())
754+
tracing.LogError(resultSpan, err)
755+
panic(err)
671756
}
672757

673758
ret.Value = result

execution/execution.go

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,16 @@ import (
4040
"github.com/prometheus/prometheus/promql"
4141
"github.com/prometheus/prometheus/promql/parser"
4242
promstorage "github.com/prometheus/prometheus/storage"
43+
"github.com/thanos-io/promql-engine/tracing"
4344
)
4445

4546
// New creates new physical query execution for a given query expression which represents logical plan.
4647
// TODO(bwplotka): Add definition (could be parameters for each execution operator) we can optimize - it would represent physical plan.
4748
func New(ctx context.Context, expr logicalplan.Node, storage storage.Scanners, opts *query.Options) (model.VectorOperator, error) {
49+
span, ctx := tracing.StartSpanFromContext(ctx, "execution.New")
50+
defer span.Finish()
51+
span.SetTag("expr_type", parser.DocumentedType(expr.ReturnType()))
52+
4853
hints := promstorage.SelectHints{
4954
Start: opts.Start.UnixMilli(),
5055
End: opts.End.UnixMilli(),
@@ -54,43 +59,79 @@ func New(ctx context.Context, expr logicalplan.Node, storage storage.Scanners, o
5459
}
5560

5661
func newOperator(ctx context.Context, expr logicalplan.Node, storage storage.Scanners, opts *query.Options, hints promstorage.SelectHints) (model.VectorOperator, error) {
62+
span, ctx := tracing.StartSpanFromContext(ctx, "execution.newOperator")
63+
defer span.Finish()
64+
span.SetTag("expr_type", parser.DocumentedType(expr.ReturnType()))
65+
span.SetTag("expr", expr.String())
66+
5767
switch e := expr.(type) {
5868
case *logicalplan.NumberLiteral:
69+
span.SetTag("operator_type", "number_literal")
5970
return scan.NewNumberLiteralSelector(model.NewVectorPool(opts.StepsBatch), opts, e.Val), nil
6071
case *logicalplan.VectorSelector:
72+
span.SetTag("operator_type", "vector_selector")
6173
return newVectorSelector(ctx, e, storage, opts, hints)
6274
case *logicalplan.FunctionCall:
75+
span.SetTag("operator_type", "function_call")
76+
span.SetTag("function_name", e.Func.Name)
6377
return newCall(ctx, e, storage, opts, hints)
6478
case *logicalplan.Aggregation:
79+
span.SetTag("operator_type", "aggregation")
80+
span.SetTag("aggregation_op", e.Op.String())
6581
return newAggregateExpression(ctx, e, storage, opts, hints)
6682
case *logicalplan.Binary:
83+
span.SetTag("operator_type", "binary")
84+
span.SetTag("binary_op", e.Op.String())
6785
return newBinaryExpression(ctx, e, storage, opts, hints)
6886
case *logicalplan.Parens:
87+
span.SetTag("operator_type", "parens")
6988
return newOperator(ctx, e.Expr, storage, opts, hints)
7089
case *logicalplan.Unary:
90+
span.SetTag("operator_type", "unary")
91+
span.SetTag("unary_op", e.Op.String())
7192
return newUnaryExpression(ctx, e, storage, opts, hints)
7293
case *logicalplan.StepInvariantExpr:
94+
span.SetTag("operator_type", "step_invariant")
7395
return newStepInvariantExpression(ctx, e, storage, opts, hints)
7496
case logicalplan.Deduplicate:
97+
span.SetTag("operator_type", "deduplicate")
7598
return newDeduplication(ctx, e, storage, opts, hints)
7699
case logicalplan.RemoteExecution:
100+
span.SetTag("operator_type", "remote_execution")
77101
return newRemoteExecution(ctx, e, opts, hints)
78102
case *logicalplan.CheckDuplicateLabels:
103+
span.SetTag("operator_type", "check_duplicate_labels")
79104
return newDuplicateLabelCheck(ctx, e, storage, opts, hints)
80105
case logicalplan.Noop:
106+
span.SetTag("operator_type", "noop")
81107
return noop.NewOperator(opts), nil
82108
case logicalplan.UserDefinedExpr:
109+
span.SetTag("operator_type", "user_defined")
83110
return e.MakeExecutionOperator(ctx, model.NewVectorPool(opts.StepsBatch), opts, hints)
84111
default:
85-
return nil, errors.Wrapf(parse.ErrNotSupportedExpr, "got: %s (%T)", e, e)
112+
err := errors.Wrapf(parse.ErrNotSupportedExpr, "got: %s (%T)", e, e)
113+
tracing.LogError(span, err)
114+
return nil, err
86115
}
87116
}
88117

89118
func newVectorSelector(ctx context.Context, e *logicalplan.VectorSelector, scanners storage.Scanners, opts *query.Options, hints promstorage.SelectHints) (model.VectorOperator, error) {
119+
span, ctx := tracing.StartSpanFromContext(ctx, "execution.newVectorSelector")
120+
defer span.Finish()
121+
span.SetTag("vector_selector", e.String())
122+
90123
start, end := getTimeRangesForVectorSelector(e, opts, 0)
91124
hints.Start = start
92125
hints.End = end
93-
return scanners.NewVectorSelector(ctx, opts, hints, *e)
126+
127+
span.SetTag("start_time", start)
128+
span.SetTag("end_time", end)
129+
130+
op, err := scanners.NewVectorSelector(ctx, opts, hints, *e)
131+
if err != nil {
132+
tracing.LogError(span, err)
133+
}
134+
return op, err
94135
}
95136

96137
func newCall(ctx context.Context, e *logicalplan.FunctionCall, scanners storage.Scanners, opts *query.Options, hints promstorage.SelectHints) (model.VectorOperator, error) {

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ require (
88
github.com/efficientgo/core v1.0.0-rc.2
99
github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb
1010
github.com/google/go-cmp v0.7.0
11+
github.com/opentracing/opentracing-go v1.2.0
1112
github.com/prometheus/client_golang v1.20.5
1213
github.com/prometheus/common v0.61.0
1314
github.com/prometheus/prometheus v0.301.0

0 commit comments

Comments
 (0)