Skip to content

Commit f0653e4

Browse files
committed
Fix failure
Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>
1 parent 472224a commit f0653e4

File tree

5 files changed

+33
-2
lines changed

5 files changed

+33
-2
lines changed

engine/engine_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ func TestQueriesAgainstOldEngine(t *testing.T) {
246246
query: `predict_linear(http_requests_total{route="/"}[1h:1m] offset 1m, 60)`,
247247
},
248248
{
249-
name: "predict_linear correctness",
249+
name: "predict_linear with step invariant",
250250
load: `load 30s
251251
http_requests_total{pod="nginx-1"} 1+1.1x10
252252
http_requests_total{pod="nginx-2"} 2+2.3x50`,

ringbuffer/generic.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ func (r *GenericRingBuffer) ReadIntoLast(f func(*Sample)) {
6363
f(&r.items[len(r.items)-1])
6464
}
6565

66+
func (r *GenericRingBuffer) SetStep(step int64) {
67+
r.currentStep = step
68+
}
69+
6670
// Push adds a new sample to the buffer.
6771
func (r *GenericRingBuffer) Push(t int64, v Value) {
6872
n := len(r.items)

ringbuffer/rate.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ type Buffer interface {
1818
MaxT() int64
1919
Push(t int64, v Value)
2020
Reset(mint int64, evalt int64)
21+
SetStep(int64)
2122
Eval(ctx context.Context, _, _ float64, _ *int64) (float64, *histogram.FloatHistogram, bool, error)
2223
ReadIntoLast(f func(*Sample))
2324
}
@@ -96,6 +97,8 @@ func (r *RateBuffer) Len() int { return r.stepRanges[0].numSamples }
9697

9798
func (r *RateBuffer) MaxT() int64 { return r.last.T }
9899

100+
func (r *RateBuffer) SetStep(step int64) { r.step = step }
101+
99102
func (r *RateBuffer) Push(t int64, v Value) {
100103
// Detect resets and store the current and previous sample so that
101104
// the rate is properly adjusted.

storage/prometheus/matrix_selector.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ type matrixScanner struct {
3636
iterator chunkenc.Iterator
3737
lastSample ringbuffer.Sample
3838
metricAppearedTs *int64
39+
timestamp *int64
40+
once sync.Once
3941
}
4042

4143
type matrixSelector struct {
@@ -57,6 +59,7 @@ type matrixSelector struct {
5759
numSteps int
5860
mint int64
5961
maxt int64
62+
timestamp *int64
6063
step int64
6164
selectRange int64
6265
offset int64
@@ -85,6 +88,7 @@ func NewMatrixSelector(
8588
functionName string,
8689
arg float64,
8790
arg2 float64,
91+
timestamp *int64,
8892
opts *query.Options,
8993
selectRange, offset time.Duration,
9094
batchSize int64,
@@ -108,6 +112,7 @@ func NewMatrixSelector(
108112
mint: opts.Start.UnixMilli(),
109113
maxt: opts.End.UnixMilli(),
110114
step: opts.Step.Milliseconds(),
115+
timestamp: timestamp,
111116
isExtFunction: parse.IsExtFunction(functionName),
112117

113118
selectRange: selectRange.Milliseconds(),
@@ -250,6 +255,7 @@ func (o *matrixSelector) loadSeries(ctx context.Context) error {
250255
iterator: s.Iterator(nil),
251256
lastSample: ringbuffer.Sample{T: math.MinInt64},
252257
buffer: o.newBuffer(ctx),
258+
timestamp: o.timestamp,
253259
}
254260
o.series[i] = lbls
255261
}
@@ -301,6 +307,23 @@ func (o *matrixSelector) String() string {
301307
return fmt.Sprintf("[matrixSelector] {%v}[%s] %v mod %v", o.storage.Matchers(), r, o.shard, o.numShards)
302308
}
303309

310+
func (m *matrixScanner) selectPoints(
311+
mint, maxt, evalt int64,
312+
fh *histogram.FloatHistogram,
313+
isExtFunction bool,
314+
) error {
315+
if m.timestamp != nil {
316+
// If timestamp is set, then the points should be selected only once.
317+
var err error
318+
m.once.Do(func() {
319+
err = m.selectPointsInternal(mint, maxt, *m.timestamp, fh, isExtFunction)
320+
})
321+
m.buffer.SetStep(evalt)
322+
return err
323+
}
324+
return m.selectPointsInternal(mint, maxt, evalt, fh, isExtFunction)
325+
}
326+
304327
// matrixIterSlice populates a matrix vector covering the requested range for a
305328
// single time series, with points retrieved from an iterator.
306329
//
@@ -310,7 +333,7 @@ func (o *matrixSelector) String() string {
310333
// into the [mint, maxt] range are retained; only points with later timestamps
311334
// are populated from the iterator.
312335
// TODO(fpetkovski): Add max samples limit.
313-
func (m *matrixScanner) selectPoints(
336+
func (m *matrixScanner) selectPointsInternal(
314337
mint, maxt, evalt int64,
315338
fh *histogram.FloatHistogram,
316339
isExtFunction bool,

storage/prometheus/scanners.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ func (p Scanners) NewMatrixSelector(
134134
call.Func.Name,
135135
arg,
136136
arg2,
137+
logicalNode.VectorSelector.Timestamp,
137138
opts,
138139
logicalNode.Range,
139140
vs.Offset,

0 commit comments

Comments
 (0)