Skip to content

Commit ccd2596

Browse files
committed
plan,execution: late coalesce
1 parent 7e2f0a1 commit ccd2596

File tree

5 files changed

+157
-3
lines changed

5 files changed

+157
-3
lines changed

execution/execution.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O
7979
return newRemoteExecution(e, opts, hints)
8080
case logicalplan.Noop:
8181
return noop.NewOperator(), nil
82+
case logicalplan.Coalesce:
83+
return newCoalesce(e, storage, opts, hints)
8284
case logicalplan.UserDefinedExpr:
8385
return e.MakeExecutionOperator(model.NewVectorPool(opts.StepsBatch), storage, opts, hints)
8486
default:
@@ -107,6 +109,13 @@ func newVectorSelector(expr parser.Expr, storage *engstore.SelectorPool, opts *q
107109
offset = e.Offset
108110
batchsize = e.BatchSize
109111
selector = storage.GetFilteredSelector(start, end, opts.Step.Milliseconds(), e.LabelMatchers, e.Filters, hints)
112+
113+
if e.Shards > 0 {
114+
return exchange.NewConcurrent(
115+
scan.NewVectorSelector(
116+
model.NewVectorPool(opts.StepsBatch), selector, opts, offset, hints, batchsize, e.N, e.Shards),
117+
2), nil
118+
}
110119
default:
111120
return nil, errors.Wrapf(parse.ErrNotSupportedExpr, "got: %s", e)
112121
}
@@ -465,6 +474,18 @@ func newRemoteExecution(e logicalplan.RemoteExecution, opts *query.Options, hint
465474
return exchange.NewConcurrent(remoteExec, 2), nil
466475
}
467476

477+
func newCoalesce(e logicalplan.Coalesce, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) {
478+
operators := make([]model.VectorOperator, len(e.Shards))
479+
for i, expr := range e.Shards {
480+
operator, err := newOperator(expr, storage, opts, hints)
481+
if err != nil {
482+
return nil, err
483+
}
484+
operators[i] = operator
485+
}
486+
return exchange.NewCoalesce(model.NewVectorPool(opts.StepsBatch), opts, 0, operators...), nil
487+
}
488+
468489
// Copy from https://github.com/prometheus/prometheus/blob/v2.39.1/promql/engine.go#L791.
469490
func getTimeRangesForVectorSelector(n *parser.VectorSelector, opts *query.Options, evalRange int64) (int64, int64) {
470491
start := opts.Start.UnixMilli()

logicalplan/filter.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"fmt"
88

99
"github.com/prometheus/prometheus/model/labels"
10-
1110
"github.com/prometheus/prometheus/promql/parser"
1211
"github.com/prometheus/prometheus/promql/parser/posrange"
1312
)
@@ -19,6 +18,9 @@ type VectorSelector struct {
1918
*parser.VectorSelector
2019
Filters []*labels.Matcher
2120
BatchSize int64
21+
22+
N int
23+
Shards int
2224
}
2325

2426
func (f VectorSelector) String() string {

logicalplan/plan.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,8 @@ import (
1111
"github.com/efficientgo/core/errors"
1212
"github.com/prometheus/prometheus/model/timestamp"
1313
"github.com/prometheus/prometheus/promql"
14-
"github.com/prometheus/prometheus/util/annotations"
15-
1614
"github.com/prometheus/prometheus/promql/parser"
15+
"github.com/prometheus/prometheus/util/annotations"
1716

1817
"github.com/thanos-io/promql-engine/query"
1918
)
@@ -24,6 +23,7 @@ var (
2423
)
2524

2625
var DefaultOptimizers = []Optimizer{
26+
ShardedAggregations{Shards: 8},
2727
SortMatchers{},
2828
MergeSelectsOptimizer{},
2929
}

logicalplan/sharded_aggregations.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
// Copyright (c) The Thanos Community Authors.
2+
// Licensed under the Apache License 2.0.
3+
4+
package logicalplan
5+
6+
import (
7+
"fmt"
8+
"strings"
9+
10+
"github.com/prometheus/prometheus/promql/parser"
11+
"github.com/prometheus/prometheus/promql/parser/posrange"
12+
"github.com/prometheus/prometheus/util/annotations"
13+
14+
"github.com/thanos-io/promql-engine/query"
15+
)
16+
17+
type Coalesce struct {
18+
Shards []parser.Expr
19+
}
20+
21+
func (r Coalesce) String() string {
22+
parts := make([]string, len(r.Shards))
23+
for i, r := range r.Shards {
24+
parts[i] = r.String()
25+
}
26+
return fmt.Sprintf("coalesce(%s)", strings.Join(parts, ", "))
27+
}
28+
29+
func (r Coalesce) Pretty(level int) string { return r.String() }
30+
31+
func (r Coalesce) PositionRange() posrange.PositionRange { return posrange.PositionRange{} }
32+
33+
func (r Coalesce) Type() parser.ValueType { return r.Shards[0].Type() }
34+
35+
func (r Coalesce) PromQLExpr() {}
36+
37+
type ShardedAggregations struct{ Shards int }
38+
39+
func (m ShardedAggregations) Optimize(plan parser.Expr, _ *query.Options) (parser.Expr, annotations.Annotations) {
40+
TraverseBottomUp(nil, &plan, func(parent, current *parser.Expr) (stop bool) {
41+
if parent == nil {
42+
return false
43+
}
44+
aggr, ok := (*parent).(*parser.AggregateExpr)
45+
if !ok {
46+
return false
47+
}
48+
// TODO: only care about sum now
49+
if aggr.Op != parser.SUM {
50+
return false
51+
}
52+
call, ok := (*current).(*parser.Call)
53+
if !ok {
54+
return false
55+
}
56+
if len(call.Args) != 1 {
57+
return false
58+
}
59+
vs, ok := call.Args[0].(*parser.VectorSelector)
60+
if !ok {
61+
return false
62+
}
63+
64+
coalesce := Coalesce{make([]parser.Expr, m.Shards)}
65+
for i := range coalesce.Shards {
66+
coalesce.Shards[i] = &parser.Call{
67+
Func: call.Func,
68+
PosRange: call.PosRange,
69+
Args: []parser.Expr{vectorSelectorForShard(vs, i, m.Shards)},
70+
}
71+
}
72+
73+
*parent = &parser.AggregateExpr{
74+
Op: aggr.Op,
75+
Expr: coalesce,
76+
Param: aggr.Param,
77+
Grouping: aggr.Grouping,
78+
Without: aggr.Without,
79+
PosRange: aggr.PosRange,
80+
}
81+
return true
82+
})
83+
return plan, nil
84+
}
85+
86+
func vectorSelectorForShard(expr *parser.VectorSelector, n, shards int) parser.Expr {
87+
return &VectorSelector{
88+
VectorSelector: expr,
89+
N: n,
90+
Shards: shards,
91+
}
92+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Copyright (c) The Thanos Community Authors.
2+
// Licensed under the Apache License 2.0.
3+
4+
package logicalplan
5+
6+
import (
7+
"testing"
8+
9+
"github.com/efficientgo/core/testutil"
10+
"github.com/prometheus/prometheus/promql/parser"
11+
12+
"github.com/thanos-io/promql-engine/query"
13+
)
14+
15+
func TestShardedAggregations(t *testing.T) {
16+
cases := []struct {
17+
name string
18+
expr string
19+
expected string
20+
}{
21+
{
22+
name: "sum exp",
23+
expr: `sum(exp(X))`,
24+
expected: ``,
25+
},
26+
}
27+
28+
optimizers := []Optimizer{ShardedAggregations{Shards: 2}}
29+
for _, tcase := range cases {
30+
t.Run(tcase.expr, func(t *testing.T) {
31+
expr, err := parser.ParseExpr(tcase.expr)
32+
testutil.Ok(t, err)
33+
34+
plan := New(expr, &query.Options{})
35+
optimizedPlan, _ := plan.Optimize(optimizers)
36+
testutil.Equals(t, tcase.expected, optimizedPlan.Expr().String())
37+
})
38+
}
39+
}

0 commit comments

Comments
 (0)