Skip to content

Commit b56f750

Browse files
plan,engine: add partition labels (#569)
Add a "PartitionLabelSets" method to remote engine api. This makes sense since a remote engine can have a big labelset but we only need a subset of those to actually perform a partition of all engines. Operating on a smaller labelset there helps us pushing down more expressions, since we know if we lose one of the unimportant labels from the labelset we can still keep pushing down. Signed-off-by: Michael Hoffmann <[email protected]>
1 parent 732ddbd commit b56f750

File tree

4 files changed

+108
-26
lines changed

4 files changed

+108
-26
lines changed

api/remote.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,16 @@ type RemoteEndpoints interface {
2323
type RemoteEngine interface {
2424
MaxT() int64
2525
MinT() int64
26+
27+
// The external labels of the remote engine. These are used to limit fanout. The engine uses these to
28+
// not distribute into remote engines that would return empty responses because their labelset is not matching.
2629
LabelSets() []labels.Labels
30+
31+
// The external labels of the remote engine that form a logical partition. This is expected to be
32+
// a subset of the result of "LabelSets()". The engine uses these to compute how to distribute a query.
33+
// It is important that, for a given set of remote engines, these labels do not overlap meaningfully.
34+
PartitionLabelSets() []labels.Labels
35+
2736
NewRangeQuery(ctx context.Context, opts promql.QueryOpts, plan RemoteQuery, start, end time.Time, interval time.Duration) (promql.Query, error)
2837
}
2938

engine/distributed.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ func (l remoteEngine) LabelSets() []labels.Labels {
4545
return l.labelSets
4646
}
4747

48+
func (l remoteEngine) PartitionLabelSets() []labels.Labels {
49+
return l.labelSets
50+
}
51+
4852
func (l remoteEngine) NewRangeQuery(ctx context.Context, opts promql.QueryOpts, plan api.RemoteQuery, start, end time.Time, interval time.Duration) (promql.Query, error) {
4953
return l.engine.NewRangeQuery(ctx, l.q, opts, plan.String(), start, end, interval)
5054
}

logicalplan/distribute.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ func (m DistributedExecutionOptimizer) Optimize(plan Node, opts *query.Options)
168168
labelRanges := make(labelSetRanges)
169169
engineLabels := make(map[string]struct{})
170170
for _, e := range engines {
171-
for _, lset := range e.LabelSets() {
171+
for _, lset := range e.PartitionLabelSets() {
172172
lsetKey := lset.String()
173173
labelRanges.addRange(lsetKey, timeRange{
174174
start: time.UnixMilli(e.MinT()),
@@ -285,7 +285,7 @@ func newRemoteAggregation(rootAggregation *Aggregation, engines []api.RemoteEngi
285285
}
286286

287287
for _, engine := range engines {
288-
for _, lbls := range engine.LabelSets() {
288+
for _, lbls := range engine.PartitionLabelSets() {
289289
lbls.Range(func(lbl labels.Label) {
290290
if rootAggregation.Without {
291291
delete(groupingSet, lbl.Name)

logicalplan/distribute_test.go

Lines changed: 93 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func TestDistributedExecution(t *testing.T) {
5757
expr: `sum by (pod) (rate(http_requests_total[5m]))`,
5858
expected: `
5959
sum by (pod) (dedup(
60-
remote(sum by (pod, region) (rate(http_requests_total[5m]))),
60+
remote(sum by (pod, region) (rate(http_requests_total[5m]))),
6161
remote(sum by (pod, region) (rate(http_requests_total[5m])))))`,
6262
},
6363
{
@@ -158,7 +158,7 @@ sum by (pod) (
158158
expr: `max by (pod) (sum by (pod) (http_requests_total))`,
159159
expected: `
160160
max by (pod) (
161-
sum by (pod) (
161+
sum by (pod) (
162162
dedup(
163163
remote(sum by (pod, region) (http_requests_total)),
164164
remote(sum by (pod, region) (http_requests_total))
@@ -194,7 +194,7 @@ max by (pod) (quantile(0.9,
194194
expr: `label_replace(http_requests_total, "pod", "$1", "instance", "(.*)")`,
195195
expected: `
196196
dedup(
197-
remote(label_replace(http_requests_total, "pod", "$1", "instance", "(.*)")),
197+
remote(label_replace(http_requests_total, "pod", "$1", "instance", "(.*)")),
198198
remote(label_replace(http_requests_total, "pod", "$1", "instance", "(.*)"))
199199
)`,
200200
},
@@ -204,7 +204,7 @@ dedup(
204204
expected: `
205205
max by (instance) (
206206
dedup(
207-
remote(max by (instance, region) (label_replace(http_requests_total, "pod", "$1", "instance", "(.*)"))),
207+
remote(max by (instance, region) (label_replace(http_requests_total, "pod", "$1", "instance", "(.*)"))),
208208
remote(max by (instance, region) (label_replace(http_requests_total, "pod", "$1", "instance", "(.*)")))
209209
)
210210
)`,
@@ -214,7 +214,7 @@ max by (instance) (
214214
expr: `max by (location) (label_replace(http_requests_total, "zone", "$1", "location", "(.*)"))`,
215215
expected: `
216216
max by (location) (dedup(
217-
remote(max by (location, region) (label_replace(http_requests_total, "zone", "$1", "location", "(.*)"))),
217+
remote(max by (location, region) (label_replace(http_requests_total, "zone", "$1", "location", "(.*)"))),
218218
remote(max by (location, region) (label_replace(http_requests_total, "zone", "$1", "location", "(.*)")))
219219
))`,
220220
},
@@ -237,20 +237,20 @@ max by (location) (dedup(
237237
sum by (location) (
238238
dedup(
239239
remote(sum by (location, region) (label_replace(http_requests_total, "zone", "$1", "location", "(.*)"))),
240-
remote(sum by (location, region) (label_replace(http_requests_total, "zone", "$1", "location", "(.*)")))))
240+
remote(sum by (location, region) (label_replace(http_requests_total, "zone", "$1", "location", "(.*)")))))
241241
/ on (location)
242242
sum by (location) (
243243
dedup(
244244
remote(count by (location, region) (label_replace(http_requests_total, "zone", "$1", "location", "(.*)"))),
245-
remote(count by (location, region) (label_replace(http_requests_total, "zone", "$1", "location", "(.*)")))))
245+
remote(count by (location, region) (label_replace(http_requests_total, "zone", "$1", "location", "(.*)")))))
246246
`,
247247
},
248248
{
249249
name: "label replace after an aggregation",
250250
expr: `label_replace(max by (location) (http_requests_total), "region", "$1", "location", "(.*)")`,
251251
expected: `
252252
label_replace(max by (location) (dedup(
253-
remote(max by (location, region) (http_requests_total)),
253+
remote(max by (location, region) (http_requests_total)),
254254
remote(max by (location, region) (http_requests_total))
255255
)), "region", "$1", "location", "(.*)")`,
256256
expectWarn: true,
@@ -272,12 +272,12 @@ max by (pod) (
272272
expr: `sum by (pod) (metric_a) / sum by (pod) (metric_b)`,
273273
expected: `
274274
sum by (pod) (dedup(
275-
remote(sum by (pod, region) (metric_a)),
275+
remote(sum by (pod, region) (metric_a)),
276276
remote(sum by (pod, region) (metric_a)))
277277
)
278-
/
278+
/
279279
sum by (pod) (dedup(
280-
remote(sum by (pod, region) (metric_b)),
280+
remote(sum by (pod, region) (metric_b)),
281281
remote(sum by (pod, region) (metric_b))
282282
))`,
283283
},
@@ -286,7 +286,7 @@ sum by (pod) (dedup(
286286
expr: `rate(http_requests_total[2m])`,
287287
expected: `
288288
dedup(
289-
remote(rate(http_requests_total[2m])),
289+
remote(rate(http_requests_total[2m])),
290290
remote(rate(http_requests_total[2m]))
291291
)`,
292292
},
@@ -305,7 +305,7 @@ dedup(
305305
expr: `histogram_quantile(0.5, sum by (le) (rate(coredns_dns_request_duration_seconds_bucket[5m])))`,
306306
expected: `
307307
histogram_quantile(0.5, sum by (le) (dedup(
308-
remote(sum by (le, region) (rate(coredns_dns_request_duration_seconds_bucket[5m]))),
308+
remote(sum by (le, region) (rate(coredns_dns_request_duration_seconds_bucket[5m]))),
309309
remote(sum by (le, region) (rate(coredns_dns_request_duration_seconds_bucket[5m])))
310310
)))`,
311311
},
@@ -338,7 +338,7 @@ histogram_quantile(0.5, sum by (le) (dedup(
338338
name: "binary expression with constant",
339339
expr: `sum by (pod) (rate(http_requests_total[2m]) * 60)`,
340340
expected: `sum by (pod) (dedup(
341-
remote(sum by (pod, region) (rate(http_requests_total[2m]) * 60)),
341+
remote(sum by (pod, region) (rate(http_requests_total[2m]) * 60)),
342342
remote(sum by (pod, region) (rate(http_requests_total[2m]) * 60))))`,
343343
},
344344
{
@@ -361,7 +361,7 @@ remote(sum by (pod, region) (rate(http_requests_total[2m]) * 60))))`,
361361
expr: `sum_over_time(max(http_requests_total)[5m:1m])`,
362362
expected: `
363363
sum_over_time(max(dedup(
364-
remote(max by (region) (http_requests_total)) [1969-12-31 23:55:00 +0000 UTC, 1970-01-01 00:00:00 +0000 UTC],
364+
remote(max by (region) (http_requests_total)) [1969-12-31 23:55:00 +0000 UTC, 1970-01-01 00:00:00 +0000 UTC],
365365
remote(max by (region) (http_requests_total)) [1969-12-31 23:55:00 +0000 UTC, 1970-01-01 00:00:00 +0000 UTC])
366366
)[5m:1m])`,
367367
},
@@ -415,7 +415,7 @@ count by (cluster) (
415415
)`,
416416
expected: `
417417
sum by (cluster) (dedup(
418-
remote(count by (cluster, region) (label_replace(up, "ns", "$0", "namespace", ".*") * on (region) group_left (project) label_replace(k8s_cluster_info, "k8s_cluster", "$0", "cluster", ".*"))),
418+
remote(count by (cluster, region) (label_replace(up, "ns", "$0", "namespace", ".*") * on (region) group_left (project) label_replace(k8s_cluster_info, "k8s_cluster", "$0", "cluster", ".*"))),
419419
remote(count by (cluster, region) (label_replace(up, "ns", "$0", "namespace", ".*") * on (region) group_left (project) label_replace(k8s_cluster_info, "k8s_cluster", "$0", "cluster", ".*"))))
420420
)`,
421421
},
@@ -548,7 +548,7 @@ dedup(
548548
expr: `sum_over_time(sum_over_time(metric[1h])[1h:30m])`,
549549
expected: `
550550
dedup(
551-
remote(sum_over_time(sum_over_time(metric[1h])[1h:30m])),
551+
remote(sum_over_time(sum_over_time(metric[1h])[1h:30m])),
552552
remote(sum_over_time(sum_over_time(metric[1h])[1h:30m])) [1970-01-01 08:00:00 +0000 UTC, 1970-01-01 12:00:00 +0000 UTC]
553553
)`,
554554
},
@@ -606,7 +606,7 @@ dedup(
606606
expr: `sum(metric @ 25200)`,
607607
expected: `
608608
sum(dedup(
609-
remote(sum by (region) (metric @ 25200.000)),
609+
remote(sum by (region) (metric @ 25200.000)),
610610
remote(sum by (region) (metric @ 25200.000)) [1970-01-01 06:00:00 +0000 UTC, 1970-01-01 12:00:00 +0000 UTC]
611611
))`,
612612
},
@@ -685,7 +685,7 @@ func TestDistributedExecutionPruningByTime(t *testing.T) {
685685
expected: `
686686
sum(
687687
dedup(
688-
remote(sum by (region) (metric)) [1970-01-01 06:00:00 +0000 UTC, 1970-01-01 06:00:00 +0000 UTC],
688+
remote(sum by (region) (metric)) [1970-01-01 06:00:00 +0000 UTC, 1970-01-01 06:00:00 +0000 UTC],
689689
remote(sum by (region) (metric)) [1970-01-01 06:00:00 +0000 UTC, 1970-01-01 06:00:00 +0000 UTC]
690690
)
691691
)`,
@@ -713,14 +713,74 @@ sum(
713713
}
714714
}
715715

716+
func TestDistributedExecutionPruningByLabelset(t *testing.T) {
717+
cases := []struct {
718+
name string
719+
expr string
720+
expected string
721+
}{
722+
{
723+
name: "querying by labelsets restricts to partition that matches that labelset",
724+
expr: `sum by (pod) (rate(http_requests_total{region="west"}[2m]))`,
725+
expected: `sum by (pod) (dedup(remote(sum by (datacenter, pod) (rate(http_requests_total{region="west"}[2m])))))`,
726+
},
727+
{
728+
name: "querying by labelsets restricts to partition that matches that labelset",
729+
expr: `sum by (pod) (rate(http_requests_total{region="east"}[2m]))`,
730+
expected: `
731+
sum by (pod) (dedup(
732+
remote(sum by (datacenter, pod) (rate(http_requests_total{region="east"}[2m]))),
733+
remote(sum by (datacenter, pod) (rate(http_requests_total{region="east"}[2m])))
734+
))`,
735+
},
736+
}
737+
738+
for _, tcase := range cases {
739+
t.Run(tcase.name, func(t *testing.T) {
740+
// We are partitioned by datacenter but also have a "region" label that we can target
741+
engines := []api.RemoteEngine{
742+
newEngineMockWithExplicitPartition(
743+
math.MinInt64,
744+
math.MaxInt64,
745+
[]labels.Labels{labels.FromStrings("region", "east", "datacenter", "east-1")},
746+
[]labels.Labels{labels.FromStrings("datacenter", "east-1")},
747+
),
748+
newEngineMockWithExplicitPartition(
749+
math.MinInt64,
750+
math.MaxInt64,
751+
[]labels.Labels{labels.FromStrings("region", "east", "datacenter", "east-2")},
752+
[]labels.Labels{labels.FromStrings("datacenter", "east-2")},
753+
),
754+
newEngineMockWithExplicitPartition(
755+
math.MinInt64,
756+
math.MaxInt64,
757+
[]labels.Labels{labels.FromStrings("region", "west", "datacenter", "west-1")},
758+
[]labels.Labels{labels.FromStrings("datacenter", "west-1")},
759+
),
760+
}
761+
optimizers := []Optimizer{
762+
DistributedExecutionOptimizer{Endpoints: api.NewStaticEndpoints(engines)},
763+
}
764+
765+
expr, err := parser.ParseExpr(tcase.expr)
766+
testutil.Ok(t, err)
767+
768+
plan := NewFromAST(expr, &query.Options{Start: time.Unix(0, 0), End: time.Unix(0, 0)}, PlanOptions{})
769+
optimizedPlan, _ := plan.Optimize(optimizers)
770+
expectedPlan := cleanUp(replacements, tcase.expected)
771+
testutil.Equals(t, expectedPlan, renderExprTree(optimizedPlan.Root()))
772+
})
773+
}
774+
}
775+
716776
func TestDistributedExecutionClonesNodes(t *testing.T) {
717777
var (
718778
start = time.Unix(0, 0)
719779
end = time.Unix(0, 0).Add(6 * time.Hour)
720780
step = time.Second
721781
expected = `
722782
sum(dedup(
723-
remote(sum by (region) (metric{region="east"})),
783+
remote(sum by (region) (metric{region="east"})),
724784
remote(sum by (region) (metric{region="east"}))
725785
))`
726786
)
@@ -759,9 +819,10 @@ sum(dedup(
759819

760820
type engineMock struct {
761821
api.RemoteEngine
762-
minT int64
763-
maxT int64
764-
labelSets []labels.Labels
822+
minT int64
823+
maxT int64
824+
labelSets []labels.Labels
825+
partitionLabelSets []labels.Labels
765826
}
766827

767828
func (e engineMock) MaxT() int64 {
@@ -776,6 +837,14 @@ func (e engineMock) LabelSets() []labels.Labels {
776837
return e.labelSets
777838
}
778839

840+
func (e engineMock) PartitionLabelSets() []labels.Labels {
841+
return e.partitionLabelSets
842+
}
843+
779844
func newEngineMock(mint, maxt int64, labelSets []labels.Labels) *engineMock {
780-
return &engineMock{minT: mint, maxT: maxt, labelSets: labelSets}
845+
return &engineMock{minT: mint, maxT: maxt, labelSets: labelSets, partitionLabelSets: labelSets}
846+
}
847+
848+
func newEngineMockWithExplicitPartition(mint, maxt int64, labelSets, partitionLabelSets []labels.Labels) *engineMock {
849+
return &engineMock{minT: mint, maxT: maxt, labelSets: labelSets, partitionLabelSets: partitionLabelSets}
781850
}

0 commit comments

Comments
 (0)