Skip to content

Fix correctness issue in predict_linear with step invariant #527

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,15 @@ func TestQueriesAgainstOldEngine(t *testing.T) {
http_requests_total{pod="nginx-2", route="/"} -108+173.00x40`,
query: `predict_linear(http_requests_total{route="/"}[1h:1m] offset 1m, 60)`,
},
{
name: "predict_linear with step invariant",
load: `load 30s
http_requests_total{pod="nginx-1"} 1+1.1x10
http_requests_total{pod="nginx-2"} 2+2.3x50`,
query: `predict_linear({__name__="http_requests_total",pod!~"nginx-1"}[5m] @ start(), -0.37690610678629094)`,
end: time.Unix(600, 0),
start: time.Unix(300, 0),
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this an issue for predict linear only or it can impact any function that takes matrix selector with step invariant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will only impact functions that are at modifier unsafe that takes matrix selector as arg.

https://github.com/prometheus/prometheus/blob/308c8c48c15c74a929c430447df3d3c3a3d4001f/promql/functions.go#L1914

So far only applicable to predict_linear.

{
name: "duplicate label fuzz",
load: `load 30s
Expand Down
1 change: 1 addition & 0 deletions logicalplan/logical_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func (f *VectorSelector) ReturnType() parser.ValueType { return parser.ValueType
type MatrixSelector struct {
VectorSelector *VectorSelector `json:"-"`
Range time.Duration
Timestamp *int64

// Needed because this operator is used in the distributed mode
OriginalString string
Expand Down
8 changes: 7 additions & 1 deletion logicalplan/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,13 @@ func replacePrometheusNodes(plan parser.Expr) Node {
// parents of matrixselector nodes are always expected to be functions, not step invariant
// operators.
if m, ok := t.Expr.(*parser.MatrixSelector); ok {
return replacePrometheusNodes(m)
n := replacePrometheusNodes(m)
if ms, ok := n.(*MatrixSelector); ok {
// For cases such as predict_linear(metric[5m] @start() , 0.3), we mark the
// matrix selector to be invariant. The scanner will only scan points once in such cases.
ms.Timestamp = ms.VectorSelector.Timestamp
}
return n
}
return &StepInvariantExpr{Expr: replacePrometheusNodes(t.Expr)}
case *parser.MatrixSelector:
Expand Down
4 changes: 4 additions & 0 deletions ringbuffer/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ func (r *GenericRingBuffer) ReadIntoLast(f func(*Sample)) {
f(&r.items[len(r.items)-1])
}

func (r *GenericRingBuffer) SetStep(step int64) {
r.currentStep = step
}

// Push adds a new sample to the buffer.
func (r *GenericRingBuffer) Push(t int64, v Value) {
n := len(r.items)
Expand Down
3 changes: 3 additions & 0 deletions ringbuffer/rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Buffer interface {
MaxT() int64
Push(t int64, v Value)
Reset(mint int64, evalt int64)
SetStep(int64)
Eval(ctx context.Context, _, _ float64, _ *int64) (float64, *histogram.FloatHistogram, bool, error)
ReadIntoLast(f func(*Sample))
}
Expand Down Expand Up @@ -96,6 +97,8 @@ func (r *RateBuffer) Len() int { return r.stepRanges[0].numSamples }

func (r *RateBuffer) MaxT() int64 { return r.last.T }

func (r *RateBuffer) SetStep(step int64) { r.step = step }

func (r *RateBuffer) Push(t int64, v Value) {
// Detect resets and store the current and previous sample so that
// the rate is properly adjusted.
Expand Down
25 changes: 24 additions & 1 deletion storage/prometheus/matrix_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type matrixScanner struct {
iterator chunkenc.Iterator
lastSample ringbuffer.Sample
metricAppearedTs *int64
timestamp *int64
once sync.Once
}

type matrixSelector struct {
Expand All @@ -57,6 +59,7 @@ type matrixSelector struct {
numSteps int
mint int64
maxt int64
timestamp *int64
step int64
selectRange int64
offset int64
Expand Down Expand Up @@ -85,6 +88,7 @@ func NewMatrixSelector(
functionName string,
arg float64,
arg2 float64,
timestamp *int64,
opts *query.Options,
selectRange, offset time.Duration,
batchSize int64,
Expand All @@ -108,6 +112,7 @@ func NewMatrixSelector(
mint: opts.Start.UnixMilli(),
maxt: opts.End.UnixMilli(),
step: opts.Step.Milliseconds(),
timestamp: timestamp,
isExtFunction: parse.IsExtFunction(functionName),

selectRange: selectRange.Milliseconds(),
Expand Down Expand Up @@ -250,6 +255,7 @@ func (o *matrixSelector) loadSeries(ctx context.Context) error {
iterator: s.Iterator(nil),
lastSample: ringbuffer.Sample{T: math.MinInt64},
buffer: o.newBuffer(ctx),
timestamp: o.timestamp,
}
o.series[i] = lbls
}
Expand Down Expand Up @@ -301,6 +307,23 @@ func (o *matrixSelector) String() string {
return fmt.Sprintf("[matrixSelector] {%v}[%s] %v mod %v", o.storage.Matchers(), r, o.shard, o.numShards)
}

func (m *matrixScanner) selectPoints(
mint, maxt, evalt int64,
fh *histogram.FloatHistogram,
isExtFunction bool,
) error {
if m.timestamp != nil {
// If timestamp is set, then the points should be selected only once.
var err error
m.once.Do(func() {
err = m.selectPointsInternal(mint, maxt, *m.timestamp, fh, isExtFunction)
})
m.buffer.SetStep(evalt)
return err
}
return m.selectPointsInternal(mint, maxt, evalt, fh, isExtFunction)
}

// matrixIterSlice populates a matrix vector covering the requested range for a
// single time series, with points retrieved from an iterator.
//
Expand All @@ -310,7 +333,7 @@ func (o *matrixSelector) String() string {
// into the [mint, maxt] range are retained; only points with later timestamps
// are populated from the iterator.
// TODO(fpetkovski): Add max samples limit.
func (m *matrixScanner) selectPoints(
func (m *matrixScanner) selectPointsInternal(
mint, maxt, evalt int64,
fh *histogram.FloatHistogram,
isExtFunction bool,
Expand Down
1 change: 1 addition & 0 deletions storage/prometheus/scanners.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func (p Scanners) NewMatrixSelector(
call.Func.Name,
arg,
arg2,
logicalNode.Timestamp,
opts,
logicalNode.Range,
vs.Offset,
Expand Down
Loading