Skip to content

Commit 7a5c77d

Browse files
committed
add projection pushdown optimizer
Signed-off-by: yeya24 <[email protected]>
1 parent 4055a11 commit 7a5c77d

File tree

9 files changed

+931
-6
lines changed

9 files changed

+931
-6
lines changed

engine/pushdown_test.go

Lines changed: 274 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,274 @@
1+
package engine_test
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"math/rand"
8+
"strconv"
9+
"testing"
10+
"time"
11+
12+
"github.com/cortexproject/promqlsmith"
13+
"github.com/efficientgo/core/testutil"
14+
"github.com/google/go-cmp/cmp"
15+
"github.com/prometheus/prometheus/model/labels"
16+
"github.com/prometheus/prometheus/promql"
17+
"github.com/prometheus/prometheus/promql/parser"
18+
"github.com/prometheus/prometheus/promql/promqltest"
19+
"github.com/prometheus/prometheus/storage"
20+
"github.com/prometheus/prometheus/tsdb/chunkenc"
21+
"github.com/prometheus/prometheus/util/annotations"
22+
"github.com/thanos-io/promql-engine/engine"
23+
"github.com/thanos-io/promql-engine/logicalplan"
24+
)
25+
26+
type projectionQuerier struct {
27+
storage.Querier
28+
}
29+
30+
type projectionSeriesSet struct {
31+
storage.SeriesSet
32+
hints *storage.SelectHints
33+
}
34+
35+
func (m projectionSeriesSet) Next() bool { return m.SeriesSet.Next() }
36+
func (m projectionSeriesSet) At() storage.Series {
37+
// Get the original series
38+
originalSeries := m.SeriesSet.At()
39+
if originalSeries == nil {
40+
return nil
41+
}
42+
// If no projection hints, return the original series
43+
if m.hints == nil {
44+
return originalSeries
45+
}
46+
if !m.hints.By && len(m.hints.Grouping) == 0 {
47+
return originalSeries
48+
}
49+
50+
// Apply projection based on hints
51+
originalLabels := originalSeries.Labels()
52+
var projectedLabels labels.Labels
53+
54+
if m.hints.By {
55+
// Include mode: only keep the labels in the grouping
56+
builder := labels.NewBuilder(labels.EmptyLabels())
57+
for _, l := range originalLabels {
58+
for _, groupLabel := range m.hints.Grouping {
59+
if l.Name == groupLabel {
60+
builder.Set(l.Name, l.Value)
61+
break
62+
}
63+
}
64+
}
65+
builder.Set("__series_hash__", strconv.FormatUint(originalLabels.Hash(), 10))
66+
projectedLabels = builder.Labels()
67+
} else {
68+
// Exclude mode: keep all labels except those in the grouping
69+
excludeMap := make(map[string]struct{})
70+
for _, groupLabel := range m.hints.Grouping {
71+
excludeMap[groupLabel] = struct{}{}
72+
}
73+
74+
builder := labels.NewBuilder(labels.EmptyLabels())
75+
for _, l := range originalLabels {
76+
if _, excluded := excludeMap[l.Name]; !excluded {
77+
builder.Set(l.Name, l.Value)
78+
}
79+
}
80+
builder.Set("__series_hash__", strconv.FormatUint(originalLabels.Hash(), 10))
81+
projectedLabels = builder.Labels()
82+
}
83+
84+
// Return a projected series that wraps the original but with filtered labels
85+
return &projectedSeries{
86+
Series: originalSeries,
87+
lset: projectedLabels,
88+
}
89+
}
90+
91+
// projectedSeries wraps a storage.Series but returns projected labels
92+
type projectedSeries struct {
93+
storage.Series
94+
lset labels.Labels
95+
}
96+
97+
func (s *projectedSeries) Labels() labels.Labels {
98+
return s.lset
99+
}
100+
101+
func (s *projectedSeries) Iterator(iter chunkenc.Iterator) chunkenc.Iterator {
102+
return s.Series.Iterator(iter)
103+
}
104+
105+
func (m projectionSeriesSet) Err() error { return m.SeriesSet.Err() }
106+
func (m projectionSeriesSet) Warnings() annotations.Annotations { return m.SeriesSet.Warnings() }
107+
108+
// Implement the Querier interface methods
109+
func (m *projectionQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
110+
return projectionSeriesSet{
111+
SeriesSet: m.Querier.Select(ctx, sortSeries, hints, matchers...),
112+
hints: hints,
113+
}
114+
}
115+
func (m *projectionQuerier) LabelValues(ctx context.Context, name string, _ *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
116+
return nil, nil, nil
117+
}
118+
func (m *projectionQuerier) LabelNames(ctx context.Context, _ *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
119+
return nil, nil, nil
120+
}
121+
func (m *projectionQuerier) Close() error { return nil }
122+
123+
// projectionQueryable is a storage.Queryable that applies projection to the querier
124+
type projectionQueryable struct {
125+
storage.Queryable
126+
}
127+
128+
func (q *projectionQueryable) Querier(mint, maxt int64) (storage.Querier, error) {
129+
querier, err := q.Queryable.Querier(mint, maxt)
130+
if err != nil {
131+
return nil, err
132+
}
133+
return &projectionQuerier{
134+
Querier: querier,
135+
}, nil
136+
}
137+
138+
func TestProjectionPushdownWithFuzz(t *testing.T) {
139+
t.Parallel()
140+
141+
// Define test parameters
142+
seed := time.Now().UnixNano()
143+
rnd := rand.New(rand.NewSource(seed))
144+
testRuns := 1000
145+
146+
// Create test data
147+
load := `load 30s
148+
http_requests_total{pod="nginx-1", job="app", env="prod", instance="1"} 1+1x40
149+
http_requests_total{pod="nginx-2", job="app", env="dev", instance="2"} 2+2x40
150+
http_requests_total{pod="nginx-3", job="api", env="prod", instance="3"} 3+3x40
151+
http_requests_total{pod="nginx-4", job="api", env="dev", instance="4"} 4+4x40
152+
errors_total{pod="nginx-1", job="app", env="prod", instance="1", cluster="us-west-2"} 0.5+0.5x40
153+
errors_total{pod="nginx-2", job="app", env="dev", instance="2", cluster="us-west-2"} 1+1x40
154+
errors_total{pod="nginx-3", job="api", env="prod", instance="3", cluster="us-east-2"} 1.5+1.5x40
155+
errors_total{pod="nginx-4", job="api", env="dev", instance="4", cluster="us-east-1"} 2+2x40`
156+
157+
storage := promqltest.LoadedStorage(t, load)
158+
defer storage.Close()
159+
160+
// Get series for PromQLSmith
161+
seriesSet, err := getSeries(context.Background(), storage)
162+
testutil.Ok(t, err)
163+
164+
// Configure PromQLSmith
165+
psOpts := []promqlsmith.Option{
166+
promqlsmith.WithEnableOffset(false),
167+
promqlsmith.WithEnableAtModifier(false),
168+
// Focus on aggregations that benefit from projection pushdown
169+
promqlsmith.WithEnabledAggrs([]parser.ItemType{
170+
parser.SUM, parser.MIN, parser.MAX, parser.AVG, parser.COUNT, parser.TOPK, parser.BOTTOMK,
171+
}),
172+
promqlsmith.WithEnableVectorMatching(true),
173+
}
174+
ps := promqlsmith.New(rnd, seriesSet, psOpts...)
175+
176+
// Engine options
177+
engineOpts := promql.EngineOpts{
178+
Timeout: 1 * time.Hour,
179+
MaxSamples: 1e10,
180+
EnableNegativeOffset: true,
181+
EnableAtModifier: true,
182+
}
183+
184+
normalEngine := engine.New(engine.Opts{
185+
EngineOpts: engineOpts,
186+
LogicalOptimizers: logicalplan.AllOptimizers,
187+
DisableDuplicateLabelChecks: false,
188+
})
189+
190+
pushdownEngine := engine.New(engine.Opts{
191+
EngineOpts: engineOpts,
192+
// ProjectionPushdown optimizer doesn't support merge selects optimizer
193+
// so disable it for now.
194+
LogicalOptimizers: []logicalplan.Optimizer{
195+
logicalplan.SortMatchers{},
196+
logicalplan.ProjectionPushdown{SeriesHashLabel: "__series_hash__"},
197+
logicalplan.DetectHistogramStatsOptimizer{},
198+
// logicalplan.MergeSelectsOptimizer{},
199+
},
200+
DisableDuplicateLabelChecks: false,
201+
})
202+
203+
ctx := context.Background()
204+
queryTime := time.Unix(600, 0)
205+
206+
t.Logf("Running %d fuzzy tests with seed %d", testRuns, seed)
207+
for i := 0; i < testRuns; i++ {
208+
var expr parser.Expr
209+
var query string
210+
211+
// Generate a query that can be executed by the engine
212+
for {
213+
expr = ps.WalkInstantQuery()
214+
query = expr.Pretty(0)
215+
216+
// Skip queries that don't benefit from projection pushdown
217+
if !containsAggregationOrBinaryOperation(expr) {
218+
continue
219+
}
220+
221+
// Try to parse the query and see if it is valid.
222+
_, err := normalEngine.NewInstantQuery(ctx, storage, nil, query, queryTime)
223+
if err != nil {
224+
continue
225+
}
226+
break
227+
}
228+
229+
t.Run(fmt.Sprintf("Query_%d", i), func(t *testing.T) {
230+
// Create projection querier that wraps the original querier
231+
projectionStorage := &projectionQueryable{
232+
Queryable: storage,
233+
}
234+
235+
normalQuery, err := normalEngine.NewInstantQuery(ctx, storage, &engine.QueryOpts{}, query, queryTime)
236+
testutil.Ok(t, err)
237+
defer normalQuery.Close()
238+
normalResult := normalQuery.Exec(ctx)
239+
if normalResult.Err != nil {
240+
// Something wrong with the generated query so it even failed without projection pushdown, skipping.
241+
return
242+
}
243+
testutil.Ok(t, normalResult.Err, "query: %s", query)
244+
245+
pushdownQuery, err := pushdownEngine.MakeInstantQuery(ctx, projectionStorage, &engine.QueryOpts{}, query, queryTime)
246+
testutil.Ok(t, err)
247+
248+
defer pushdownQuery.Close()
249+
pushdownResult := pushdownQuery.Exec(ctx)
250+
testutil.Ok(t, pushdownResult.Err, "query: %s", query)
251+
252+
if diff := cmp.Diff(normalResult, pushdownResult, comparer); diff != "" {
253+
t.Errorf("Results differ for query %s: %s", query, diff)
254+
}
255+
})
256+
}
257+
}
258+
259+
// containsAggregationOrBinaryOperation checks if the expression contains any aggregation or binary operations
260+
func containsAggregationOrBinaryOperation(expr parser.Expr) bool {
261+
found := false
262+
parser.Inspect(expr, func(node parser.Node, _ []parser.Node) error {
263+
switch node.(type) {
264+
case *parser.AggregateExpr:
265+
found = true
266+
return errors.New("found")
267+
case *parser.BinaryExpr:
268+
found = true
269+
return errors.New("found")
270+
}
271+
return nil
272+
})
273+
return found
274+
}

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
module github.com/thanos-io/promql-engine
22

3-
go 1.24
3+
go 1.24.0
44

55
require (
66
github.com/cespare/xxhash/v2 v2.3.0
7-
github.com/cortexproject/promqlsmith v0.0.0-20240506042652-6cfdd9739a5e
7+
github.com/cortexproject/promqlsmith v0.0.0-20250407233056-90db95b1a4e4
88
github.com/efficientgo/core v1.0.0-rc.2
99
github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb
1010
github.com/google/go-cmp v0.7.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF
3838
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
3939
github.com/cncf/xds/go v0.0.0-20240905190251-b4127c9b8d78 h1:QVw89YDxXxEe+l8gU8ETbOasdwEV+avkR75ZzsVV9WI=
4040
github.com/cncf/xds/go v0.0.0-20240905190251-b4127c9b8d78/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8=
41-
github.com/cortexproject/promqlsmith v0.0.0-20240506042652-6cfdd9739a5e h1:nOWmgQD3L/Z0bmm29iDxB7nlqjMnh7yD/PNOx9rnZmA=
42-
github.com/cortexproject/promqlsmith v0.0.0-20240506042652-6cfdd9739a5e/go.mod h1:+bSqRETXJ1uk2S93m//htzTVqu8DJPvlGEb3bSE9PzI=
41+
github.com/cortexproject/promqlsmith v0.0.0-20250407233056-90db95b1a4e4 h1:dpo7kQ24uFSV6Zgm9/kB34TIUWjGmadlbKrM6fNfQko=
42+
github.com/cortexproject/promqlsmith v0.0.0-20250407233056-90db95b1a4e4/go.mod h1:jh6POgN18lXU133HBMfwr/1TjvBp8e5kL4ZtRsAPvGY=
4343
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
4444
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
4545
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=

logicalplan/logical_nodes.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ func (f *VectorSelector) Clone() Node {
8888
clone.Filters = shallowCloneSlice(f.Filters)
8989
clone.LabelMatchers = shallowCloneSlice(f.LabelMatchers)
9090
clone.Projection.Labels = shallowCloneSlice(f.Projection.Labels)
91+
clone.Projection.Include = f.Projection.Include
9192

9293
if f.VectorSelector.Timestamp != nil {
9394
ts := *f.VectorSelector.Timestamp

logicalplan/plan_test.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package logicalplan
66
import (
77
"fmt"
88
"regexp"
9+
"sort"
910
"strings"
1011
"testing"
1112
"time"
@@ -39,6 +40,14 @@ func renderExprTree(expr Node) string {
3940
if t.BatchSize > 0 {
4041
base += fmt.Sprintf("[batch=%d]", t.BatchSize)
4142
}
43+
if t.Projection.Labels != nil {
44+
sort.Strings(t.Projection.Labels)
45+
if t.Projection.Include {
46+
base += fmt.Sprintf("[projection=include(%s)]", strings.Join(t.Projection.Labels, ","))
47+
} else {
48+
base += fmt.Sprintf("[projection=exclude(%s)]", strings.Join(t.Projection.Labels, ","))
49+
}
50+
}
4251
if len(t.Filters) > 0 {
4352
b.WriteString("filter(")
4453
b.WriteString(fmt.Sprintf("%s", t.Filters))
@@ -49,7 +58,10 @@ func renderExprTree(expr Node) string {
4958
}
5059
return base
5160
case *MatrixSelector:
52-
return t.String()
61+
// Render the inner vector selector first
62+
vsStr := renderExprTree(t.VectorSelector)
63+
// Then add the range
64+
return fmt.Sprintf("%s[%s]", vsStr, t.Range.String())
5365
case *Binary:
5466
var b strings.Builder
5567
b.WriteString(renderExprTree(t.LHS))
@@ -108,6 +120,16 @@ func renderExprTree(expr Node) string {
108120
return renderExprTree(t.Expr)
109121
case *CheckDuplicateLabels:
110122
return renderExprTree(t.Expr)
123+
case *Subquery:
124+
var b strings.Builder
125+
126+
// Render the inner expression
127+
innerExpr := renderExprTree(t.Expr)
128+
b.WriteString(innerExpr)
129+
130+
// Add the subquery range and step
131+
b.WriteString(fmt.Sprintf("[%s:%s]", t.Range.String(), t.Step.String()))
132+
return b.String()
111133
default:
112134
return t.String()
113135
}

0 commit comments

Comments
 (0)