Skip to content

Commit f49bd2f

Browse files
committed
plan,engine: add partition labels
Add a "PartitionLabelSets" method to remote engine api. This makes sense since a remote engine can have a big labelset but we only need a subset of those to actually perform a partition of all engines. Operating on a smaller labelset there helps us pushing down more expressions, since we know if we lose one of the unimportant labels from the labelset we can still keep pushing down. Signed-off-by: Michael Hoffmann <[email protected]>
1 parent 149870f commit f49bd2f

File tree

4 files changed

+39
-22
lines changed

4 files changed

+39
-22
lines changed

api/remote.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,16 @@ type RemoteEndpoints interface {
2323
type RemoteEngine interface {
2424
MaxT() int64
2525
MinT() int64
26+
27+
// The external labels of the remote engine. These are used to limit fanout. The engine uses these to
28+
// not distribute into remote engines that would return empty responses because their labelset is not matching.
2629
LabelSets() []labels.Labels
30+
31+
// The external labels of the remote engine that form a logical partition. This is expected to be
32+
// a subset of the result of "LabelSets()". The engine uses these to compute how to distribute a query.
33+
// It is important that, for a given set of remote engines, these labels do not overlap meaningfully.
34+
PartitionLabelSets() []labels.Labels
35+
2736
NewRangeQuery(ctx context.Context, opts promql.QueryOpts, plan RemoteQuery, start, end time.Time, interval time.Duration) (promql.Query, error)
2837
}
2938

engine/distributed.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ func (l remoteEngine) LabelSets() []labels.Labels {
4545
return l.labelSets
4646
}
4747

48+
func (l remoteEngine) PartitionLabelSets() []labels.Labels {
49+
return l.labelSets
50+
}
51+
4852
func (l remoteEngine) NewRangeQuery(ctx context.Context, opts promql.QueryOpts, plan api.RemoteQuery, start, end time.Time, interval time.Duration) (promql.Query, error) {
4953
return l.engine.NewRangeQuery(ctx, l.q, opts, plan.String(), start, end, interval)
5054
}

logicalplan/distribute.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ func (m DistributedExecutionOptimizer) Optimize(plan Node, opts *query.Options)
168168
labelRanges := make(labelSetRanges)
169169
engineLabels := make(map[string]struct{})
170170
for _, e := range engines {
171-
for _, lset := range e.LabelSets() {
171+
for _, lset := range e.PartitionLabelSets() {
172172
lsetKey := lset.String()
173173
labelRanges.addRange(lsetKey, timeRange{
174174
start: time.UnixMilli(e.MinT()),
@@ -285,7 +285,7 @@ func newRemoteAggregation(rootAggregation *Aggregation, engines []api.RemoteEngi
285285
}
286286

287287
for _, engine := range engines {
288-
for _, lbls := range engine.LabelSets() {
288+
for _, lbls := range engine.PartitionLabelSets() {
289289
lbls.Range(func(lbl labels.Label) {
290290
if rootAggregation.Without {
291291
delete(groupingSet, lbl.Name)

logicalplan/distribute_test.go

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func TestDistributedExecution(t *testing.T) {
5757
expr: `sum by (pod) (rate(http_requests_total[5m]))`,
5858
expected: `
5959
sum by (pod) (dedup(
60-
remote(sum by (pod, region) (rate(http_requests_total[5m]))),
60+
remote(sum by (pod, region) (rate(http_requests_total[5m]))),
6161
remote(sum by (pod, region) (rate(http_requests_total[5m])))))`,
6262
},
6363
{
@@ -158,7 +158,7 @@ sum by (pod) (
158158
expr: `max by (pod) (sum by (pod) (http_requests_total))`,
159159
expected: `
160160
max by (pod) (
161-
sum by (pod) (
161+
sum by (pod) (
162162
dedup(
163163
remote(sum by (pod, region) (http_requests_total)),
164164
remote(sum by (pod, region) (http_requests_total))
@@ -194,7 +194,7 @@ max by (pod) (quantile(0.9,
194194
expr: `label_replace(http_requests_total, "pod", "$1", "instance", "(.*)")`,
195195
expected: `
196196
dedup(
197-
remote(label_replace(http_requests_total, "pod", "$1", "instance", "(.*)")),
197+
remote(label_replace(http_requests_total, "pod", "$1", "instance", "(.*)")),
198198
remote(label_replace(http_requests_total, "pod", "$1", "instance", "(.*)"))
199199
)`,
200200
},
@@ -204,7 +204,7 @@ dedup(
204204
expected: `
205205
max by (instance) (
206206
dedup(
207-
remote(max by (instance, region) (label_replace(http_requests_total, "pod", "$1", "instance", "(.*)"))),
207+
remote(max by (instance, region) (label_replace(http_requests_total, "pod", "$1", "instance", "(.*)"))),
208208
remote(max by (instance, region) (label_replace(http_requests_total, "pod", "$1", "instance", "(.*)")))
209209
)
210210
)`,
@@ -214,7 +214,7 @@ max by (instance) (
214214
expr: `max by (location) (label_replace(http_requests_total, "zone", "$1", "location", "(.*)"))`,
215215
expected: `
216216
max by (location) (dedup(
217-
remote(max by (location, region) (label_replace(http_requests_total, "zone", "$1", "location", "(.*)"))),
217+
remote(max by (location, region) (label_replace(http_requests_total, "zone", "$1", "location", "(.*)"))),
218218
remote(max by (location, region) (label_replace(http_requests_total, "zone", "$1", "location", "(.*)")))
219219
))`,
220220
},
@@ -237,20 +237,20 @@ max by (location) (dedup(
237237
sum by (location) (
238238
dedup(
239239
remote(sum by (location, region) (label_replace(http_requests_total, "zone", "$1", "location", "(.*)"))),
240-
remote(sum by (location, region) (label_replace(http_requests_total, "zone", "$1", "location", "(.*)")))))
240+
remote(sum by (location, region) (label_replace(http_requests_total, "zone", "$1", "location", "(.*)")))))
241241
/ on (location)
242242
sum by (location) (
243243
dedup(
244244
remote(count by (location, region) (label_replace(http_requests_total, "zone", "$1", "location", "(.*)"))),
245-
remote(count by (location, region) (label_replace(http_requests_total, "zone", "$1", "location", "(.*)")))))
245+
remote(count by (location, region) (label_replace(http_requests_total, "zone", "$1", "location", "(.*)")))))
246246
`,
247247
},
248248
{
249249
name: "label replace after an aggregation",
250250
expr: `label_replace(max by (location) (http_requests_total), "region", "$1", "location", "(.*)")`,
251251
expected: `
252252
label_replace(max by (location) (dedup(
253-
remote(max by (location, region) (http_requests_total)),
253+
remote(max by (location, region) (http_requests_total)),
254254
remote(max by (location, region) (http_requests_total))
255255
)), "region", "$1", "location", "(.*)")`,
256256
expectWarn: true,
@@ -272,12 +272,12 @@ max by (pod) (
272272
expr: `sum by (pod) (metric_a) / sum by (pod) (metric_b)`,
273273
expected: `
274274
sum by (pod) (dedup(
275-
remote(sum by (pod, region) (metric_a)),
275+
remote(sum by (pod, region) (metric_a)),
276276
remote(sum by (pod, region) (metric_a)))
277277
)
278-
/
278+
/
279279
sum by (pod) (dedup(
280-
remote(sum by (pod, region) (metric_b)),
280+
remote(sum by (pod, region) (metric_b)),
281281
remote(sum by (pod, region) (metric_b))
282282
))`,
283283
},
@@ -286,7 +286,7 @@ sum by (pod) (dedup(
286286
expr: `rate(http_requests_total[2m])`,
287287
expected: `
288288
dedup(
289-
remote(rate(http_requests_total[2m])),
289+
remote(rate(http_requests_total[2m])),
290290
remote(rate(http_requests_total[2m]))
291291
)`,
292292
},
@@ -305,7 +305,7 @@ dedup(
305305
expr: `histogram_quantile(0.5, sum by (le) (rate(coredns_dns_request_duration_seconds_bucket[5m])))`,
306306
expected: `
307307
histogram_quantile(0.5, sum by (le) (dedup(
308-
remote(sum by (le, region) (rate(coredns_dns_request_duration_seconds_bucket[5m]))),
308+
remote(sum by (le, region) (rate(coredns_dns_request_duration_seconds_bucket[5m]))),
309309
remote(sum by (le, region) (rate(coredns_dns_request_duration_seconds_bucket[5m])))
310310
)))`,
311311
},
@@ -338,7 +338,7 @@ histogram_quantile(0.5, sum by (le) (dedup(
338338
name: "binary expression with constant",
339339
expr: `sum by (pod) (rate(http_requests_total[2m]) * 60)`,
340340
expected: `sum by (pod) (dedup(
341-
remote(sum by (pod, region) (rate(http_requests_total[2m]) * 60)),
341+
remote(sum by (pod, region) (rate(http_requests_total[2m]) * 60)),
342342
remote(sum by (pod, region) (rate(http_requests_total[2m]) * 60))))`,
343343
},
344344
{
@@ -361,7 +361,7 @@ remote(sum by (pod, region) (rate(http_requests_total[2m]) * 60))))`,
361361
expr: `sum_over_time(max(http_requests_total)[5m:1m])`,
362362
expected: `
363363
sum_over_time(max(dedup(
364-
remote(max by (region) (http_requests_total)) [1969-12-31 23:55:00 +0000 UTC, 1970-01-01 00:00:00 +0000 UTC],
364+
remote(max by (region) (http_requests_total)) [1969-12-31 23:55:00 +0000 UTC, 1970-01-01 00:00:00 +0000 UTC],
365365
remote(max by (region) (http_requests_total)) [1969-12-31 23:55:00 +0000 UTC, 1970-01-01 00:00:00 +0000 UTC])
366366
)[5m:1m])`,
367367
},
@@ -415,7 +415,7 @@ count by (cluster) (
415415
)`,
416416
expected: `
417417
sum by (cluster) (dedup(
418-
remote(count by (cluster, region) (label_replace(up, "ns", "$0", "namespace", ".*") * on (region) group_left (project) label_replace(k8s_cluster_info, "k8s_cluster", "$0", "cluster", ".*"))),
418+
remote(count by (cluster, region) (label_replace(up, "ns", "$0", "namespace", ".*") * on (region) group_left (project) label_replace(k8s_cluster_info, "k8s_cluster", "$0", "cluster", ".*"))),
419419
remote(count by (cluster, region) (label_replace(up, "ns", "$0", "namespace", ".*") * on (region) group_left (project) label_replace(k8s_cluster_info, "k8s_cluster", "$0", "cluster", ".*"))))
420420
)`,
421421
},
@@ -548,7 +548,7 @@ dedup(
548548
expr: `sum_over_time(sum_over_time(metric[1h])[1h:30m])`,
549549
expected: `
550550
dedup(
551-
remote(sum_over_time(sum_over_time(metric[1h])[1h:30m])),
551+
remote(sum_over_time(sum_over_time(metric[1h])[1h:30m])),
552552
remote(sum_over_time(sum_over_time(metric[1h])[1h:30m])) [1970-01-01 08:00:00 +0000 UTC, 1970-01-01 12:00:00 +0000 UTC]
553553
)`,
554554
},
@@ -606,7 +606,7 @@ dedup(
606606
expr: `sum(metric @ 25200)`,
607607
expected: `
608608
sum(dedup(
609-
remote(sum by (region) (metric @ 25200.000)),
609+
remote(sum by (region) (metric @ 25200.000)),
610610
remote(sum by (region) (metric @ 25200.000)) [1970-01-01 06:00:00 +0000 UTC, 1970-01-01 12:00:00 +0000 UTC]
611611
))`,
612612
},
@@ -685,7 +685,7 @@ func TestDistributedExecutionPruningByTime(t *testing.T) {
685685
expected: `
686686
sum(
687687
dedup(
688-
remote(sum by (region) (metric)) [1970-01-01 06:00:00 +0000 UTC, 1970-01-01 06:00:00 +0000 UTC],
688+
remote(sum by (region) (metric)) [1970-01-01 06:00:00 +0000 UTC, 1970-01-01 06:00:00 +0000 UTC],
689689
remote(sum by (region) (metric)) [1970-01-01 06:00:00 +0000 UTC, 1970-01-01 06:00:00 +0000 UTC]
690690
)
691691
)`,
@@ -720,7 +720,7 @@ func TestDistributedExecutionClonesNodes(t *testing.T) {
720720
step = time.Second
721721
expected = `
722722
sum(dedup(
723-
remote(sum by (region) (metric{region="east"})),
723+
remote(sum by (region) (metric{region="east"})),
724724
remote(sum by (region) (metric{region="east"}))
725725
))`
726726
)
@@ -776,6 +776,10 @@ func (e engineMock) LabelSets() []labels.Labels {
776776
return e.labelSets
777777
}
778778

779+
func (e engineMock) PartitionLabelSets() []labels.Labels {
780+
return e.labelSets
781+
}
782+
779783
func newEngineMock(mint, maxt int64, labelSets []labels.Labels) *engineMock {
780784
return &engineMock{minT: mint, maxT: maxt, labelSets: labelSets}
781785
}

0 commit comments

Comments
 (0)