Skip to content

plan,engine: add partition labels #569

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions api/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,16 @@ type RemoteEndpoints interface {
type RemoteEngine interface {
MaxT() int64
MinT() int64

// The external labels of the remote engine. These are used to limit fanout. The engine uses these to
// not distribute into remote engines that would return empty responses because their labelset is not matching.
LabelSets() []labels.Labels

// The external labels of the remote engine that form a logical partition. This is expected to be
// a subset of the result of "LabelSets()". The engine uses these to compute how to distribute a query.
// It is important that, for a given set of remote engines, these labels do not overlap meaningfully.
PartitionLabelSets() []labels.Labels

NewRangeQuery(ctx context.Context, opts promql.QueryOpts, plan RemoteQuery, start, end time.Time, interval time.Duration) (promql.Query, error)
}

Expand Down
4 changes: 4 additions & 0 deletions engine/distributed.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ func (l remoteEngine) LabelSets() []labels.Labels {
return l.labelSets
}

func (l remoteEngine) PartitionLabelSets() []labels.Labels {
return l.labelSets
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a safe default. Engines who want to specify different partition labels can implement RemoteEngine differently.

}

func (l remoteEngine) NewRangeQuery(ctx context.Context, opts promql.QueryOpts, plan api.RemoteQuery, start, end time.Time, interval time.Duration) (promql.Query, error) {
return l.engine.NewRangeQuery(ctx, l.q, opts, plan.String(), start, end, interval)
}
Expand Down
4 changes: 2 additions & 2 deletions logicalplan/distribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (m DistributedExecutionOptimizer) Optimize(plan Node, opts *query.Options)
labelRanges := make(labelSetRanges)
engineLabels := make(map[string]struct{})
for _, e := range engines {
for _, lset := range e.LabelSets() {
for _, lset := range e.PartitionLabelSets() {
lsetKey := lset.String()
labelRanges.addRange(lsetKey, timeRange{
start: time.UnixMilli(e.MinT()),
Expand Down Expand Up @@ -285,7 +285,7 @@ func newRemoteAggregation(rootAggregation *Aggregation, engines []api.RemoteEngi
}

for _, engine := range engines {
for _, lbls := range engine.LabelSets() {
for _, lbls := range engine.PartitionLabelSets() {
lbls.Range(func(lbl labels.Label) {
if rootAggregation.Without {
delete(groupingSet, lbl.Name)
Expand Down
117 changes: 93 additions & 24 deletions logicalplan/distribute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestDistributedExecution(t *testing.T) {
expr: `sum by (pod) (rate(http_requests_total[5m]))`,
expected: `
sum by (pod) (dedup(
remote(sum by (pod, region) (rate(http_requests_total[5m]))),
remote(sum by (pod, region) (rate(http_requests_total[5m]))),
remote(sum by (pod, region) (rate(http_requests_total[5m])))))`,
},
{
Expand Down Expand Up @@ -158,7 +158,7 @@ sum by (pod) (
expr: `max by (pod) (sum by (pod) (http_requests_total))`,
expected: `
max by (pod) (
sum by (pod) (
sum by (pod) (
dedup(
remote(sum by (pod, region) (http_requests_total)),
remote(sum by (pod, region) (http_requests_total))
Expand Down Expand Up @@ -194,7 +194,7 @@ max by (pod) (quantile(0.9,
expr: `label_replace(http_requests_total, "pod", "$1", "instance", "(.*)")`,
expected: `
dedup(
remote(label_replace(http_requests_total, "pod", "$1", "instance", "(.*)")),
remote(label_replace(http_requests_total, "pod", "$1", "instance", "(.*)")),
remote(label_replace(http_requests_total, "pod", "$1", "instance", "(.*)"))
)`,
},
Expand All @@ -204,7 +204,7 @@ dedup(
expected: `
max by (instance) (
dedup(
remote(max by (instance, region) (label_replace(http_requests_total, "pod", "$1", "instance", "(.*)"))),
remote(max by (instance, region) (label_replace(http_requests_total, "pod", "$1", "instance", "(.*)"))),
remote(max by (instance, region) (label_replace(http_requests_total, "pod", "$1", "instance", "(.*)")))
)
)`,
Expand All @@ -214,7 +214,7 @@ max by (instance) (
expr: `max by (location) (label_replace(http_requests_total, "zone", "$1", "location", "(.*)"))`,
expected: `
max by (location) (dedup(
remote(max by (location, region) (label_replace(http_requests_total, "zone", "$1", "location", "(.*)"))),
remote(max by (location, region) (label_replace(http_requests_total, "zone", "$1", "location", "(.*)"))),
remote(max by (location, region) (label_replace(http_requests_total, "zone", "$1", "location", "(.*)")))
))`,
},
Expand All @@ -237,20 +237,20 @@ max by (location) (dedup(
sum by (location) (
dedup(
remote(sum by (location, region) (label_replace(http_requests_total, "zone", "$1", "location", "(.*)"))),
remote(sum by (location, region) (label_replace(http_requests_total, "zone", "$1", "location", "(.*)")))))
remote(sum by (location, region) (label_replace(http_requests_total, "zone", "$1", "location", "(.*)")))))
/ on (location)
sum by (location) (
dedup(
remote(count by (location, region) (label_replace(http_requests_total, "zone", "$1", "location", "(.*)"))),
remote(count by (location, region) (label_replace(http_requests_total, "zone", "$1", "location", "(.*)")))))
remote(count by (location, region) (label_replace(http_requests_total, "zone", "$1", "location", "(.*)")))))
`,
},
{
name: "label replace after an aggregation",
expr: `label_replace(max by (location) (http_requests_total), "region", "$1", "location", "(.*)")`,
expected: `
label_replace(max by (location) (dedup(
remote(max by (location, region) (http_requests_total)),
remote(max by (location, region) (http_requests_total)),
remote(max by (location, region) (http_requests_total))
)), "region", "$1", "location", "(.*)")`,
expectWarn: true,
Expand All @@ -272,12 +272,12 @@ max by (pod) (
expr: `sum by (pod) (metric_a) / sum by (pod) (metric_b)`,
expected: `
sum by (pod) (dedup(
remote(sum by (pod, region) (metric_a)),
remote(sum by (pod, region) (metric_a)),
remote(sum by (pod, region) (metric_a)))
)
/
/
sum by (pod) (dedup(
remote(sum by (pod, region) (metric_b)),
remote(sum by (pod, region) (metric_b)),
remote(sum by (pod, region) (metric_b))
))`,
},
Expand All @@ -286,7 +286,7 @@ sum by (pod) (dedup(
expr: `rate(http_requests_total[2m])`,
expected: `
dedup(
remote(rate(http_requests_total[2m])),
remote(rate(http_requests_total[2m])),
remote(rate(http_requests_total[2m]))
)`,
},
Expand All @@ -305,7 +305,7 @@ dedup(
expr: `histogram_quantile(0.5, sum by (le) (rate(coredns_dns_request_duration_seconds_bucket[5m])))`,
expected: `
histogram_quantile(0.5, sum by (le) (dedup(
remote(sum by (le, region) (rate(coredns_dns_request_duration_seconds_bucket[5m]))),
remote(sum by (le, region) (rate(coredns_dns_request_duration_seconds_bucket[5m]))),
remote(sum by (le, region) (rate(coredns_dns_request_duration_seconds_bucket[5m])))
)))`,
},
Expand Down Expand Up @@ -338,7 +338,7 @@ histogram_quantile(0.5, sum by (le) (dedup(
name: "binary expression with constant",
expr: `sum by (pod) (rate(http_requests_total[2m]) * 60)`,
expected: `sum by (pod) (dedup(
remote(sum by (pod, region) (rate(http_requests_total[2m]) * 60)),
remote(sum by (pod, region) (rate(http_requests_total[2m]) * 60)),
remote(sum by (pod, region) (rate(http_requests_total[2m]) * 60))))`,
},
{
Expand All @@ -361,7 +361,7 @@ remote(sum by (pod, region) (rate(http_requests_total[2m]) * 60))))`,
expr: `sum_over_time(max(http_requests_total)[5m:1m])`,
expected: `
sum_over_time(max(dedup(
remote(max by (region) (http_requests_total)) [1969-12-31 23:55:00 +0000 UTC, 1970-01-01 00:00:00 +0000 UTC],
remote(max by (region) (http_requests_total)) [1969-12-31 23:55:00 +0000 UTC, 1970-01-01 00:00:00 +0000 UTC],
remote(max by (region) (http_requests_total)) [1969-12-31 23:55:00 +0000 UTC, 1970-01-01 00:00:00 +0000 UTC])
)[5m:1m])`,
},
Expand Down Expand Up @@ -415,7 +415,7 @@ count by (cluster) (
)`,
expected: `
sum by (cluster) (dedup(
remote(count by (cluster, region) (label_replace(up, "ns", "$0", "namespace", ".*") * on (region) group_left (project) label_replace(k8s_cluster_info, "k8s_cluster", "$0", "cluster", ".*"))),
remote(count by (cluster, region) (label_replace(up, "ns", "$0", "namespace", ".*") * on (region) group_left (project) label_replace(k8s_cluster_info, "k8s_cluster", "$0", "cluster", ".*"))),
remote(count by (cluster, region) (label_replace(up, "ns", "$0", "namespace", ".*") * on (region) group_left (project) label_replace(k8s_cluster_info, "k8s_cluster", "$0", "cluster", ".*"))))
)`,
},
Expand Down Expand Up @@ -548,7 +548,7 @@ dedup(
expr: `sum_over_time(sum_over_time(metric[1h])[1h:30m])`,
expected: `
dedup(
remote(sum_over_time(sum_over_time(metric[1h])[1h:30m])),
remote(sum_over_time(sum_over_time(metric[1h])[1h:30m])),
remote(sum_over_time(sum_over_time(metric[1h])[1h:30m])) [1970-01-01 08:00:00 +0000 UTC, 1970-01-01 12:00:00 +0000 UTC]
)`,
},
Expand Down Expand Up @@ -606,7 +606,7 @@ dedup(
expr: `sum(metric @ 25200)`,
expected: `
sum(dedup(
remote(sum by (region) (metric @ 25200.000)),
remote(sum by (region) (metric @ 25200.000)),
remote(sum by (region) (metric @ 25200.000)) [1970-01-01 06:00:00 +0000 UTC, 1970-01-01 12:00:00 +0000 UTC]
))`,
},
Expand Down Expand Up @@ -685,7 +685,7 @@ func TestDistributedExecutionPruningByTime(t *testing.T) {
expected: `
sum(
dedup(
remote(sum by (region) (metric)) [1970-01-01 06:00:00 +0000 UTC, 1970-01-01 06:00:00 +0000 UTC],
remote(sum by (region) (metric)) [1970-01-01 06:00:00 +0000 UTC, 1970-01-01 06:00:00 +0000 UTC],
remote(sum by (region) (metric)) [1970-01-01 06:00:00 +0000 UTC, 1970-01-01 06:00:00 +0000 UTC]
)
)`,
Expand Down Expand Up @@ -713,14 +713,74 @@ sum(
}
}

func TestDistributedExecutionPruningByLabelset(t *testing.T) {
cases := []struct {
name string
expr string
expected string
}{
{
name: "querying by labelsets restricts to partition that matches that labelset",
expr: `sum by (pod) (rate(http_requests_total{region="west"}[2m]))`,
expected: `sum by (pod) (dedup(remote(sum by (datacenter, pod) (rate(http_requests_total{region="west"}[2m])))))`,
},
{
name: "querying by labelsets restricts to partition that matches that labelset",
expr: `sum by (pod) (rate(http_requests_total{region="east"}[2m]))`,
expected: `
sum by (pod) (dedup(
remote(sum by (datacenter, pod) (rate(http_requests_total{region="east"}[2m]))),
remote(sum by (datacenter, pod) (rate(http_requests_total{region="east"}[2m])))
))`,
},
}

for _, tcase := range cases {
t.Run(tcase.name, func(t *testing.T) {
// We are partitioned by datacenter but also have a "region" label that we can target
engines := []api.RemoteEngine{
newEngineMockWithExplicitPartition(
math.MinInt64,
math.MaxInt64,
[]labels.Labels{labels.FromStrings("region", "east", "datacenter", "east-1")},
[]labels.Labels{labels.FromStrings("datacenter", "east-1")},
),
newEngineMockWithExplicitPartition(
math.MinInt64,
math.MaxInt64,
[]labels.Labels{labels.FromStrings("region", "east", "datacenter", "east-2")},
[]labels.Labels{labels.FromStrings("datacenter", "east-2")},
),
newEngineMockWithExplicitPartition(
math.MinInt64,
math.MaxInt64,
[]labels.Labels{labels.FromStrings("region", "west", "datacenter", "west-1")},
[]labels.Labels{labels.FromStrings("datacenter", "west-1")},
),
}
optimizers := []Optimizer{
DistributedExecutionOptimizer{Endpoints: api.NewStaticEndpoints(engines)},
}

expr, err := parser.ParseExpr(tcase.expr)
testutil.Ok(t, err)

plan := NewFromAST(expr, &query.Options{Start: time.Unix(0, 0), End: time.Unix(0, 0)}, PlanOptions{})
optimizedPlan, _ := plan.Optimize(optimizers)
expectedPlan := cleanUp(replacements, tcase.expected)
testutil.Equals(t, expectedPlan, renderExprTree(optimizedPlan.Root()))
})
}
}

func TestDistributedExecutionClonesNodes(t *testing.T) {
var (
start = time.Unix(0, 0)
end = time.Unix(0, 0).Add(6 * time.Hour)
step = time.Second
expected = `
sum(dedup(
remote(sum by (region) (metric{region="east"})),
remote(sum by (region) (metric{region="east"})),
remote(sum by (region) (metric{region="east"}))
))`
)
Expand Down Expand Up @@ -759,9 +819,10 @@ sum(dedup(

type engineMock struct {
api.RemoteEngine
minT int64
maxT int64
labelSets []labels.Labels
minT int64
maxT int64
labelSets []labels.Labels
partitionLabelSets []labels.Labels
}

func (e engineMock) MaxT() int64 {
Expand All @@ -776,6 +837,14 @@ func (e engineMock) LabelSets() []labels.Labels {
return e.labelSets
}

func (e engineMock) PartitionLabelSets() []labels.Labels {
return e.partitionLabelSets
}

func newEngineMock(mint, maxt int64, labelSets []labels.Labels) *engineMock {
return &engineMock{minT: mint, maxT: maxt, labelSets: labelSets}
return &engineMock{minT: mint, maxT: maxt, labelSets: labelSets, partitionLabelSets: labelSets}
}

func newEngineMockWithExplicitPartition(mint, maxt int64, labelSets, partitionLabelSets []labels.Labels) *engineMock {
return &engineMock{minT: mint, maxT: maxt, labelSets: labelSets, partitionLabelSets: partitionLabelSets}
}
Loading