Skip to content

Commit 999d0b3

Browse files
nlsuniddqdeika
andauthored
Fix offset when a batch ends with compacted records
Saves the lastOffset and jumps past it when compacted records are detected at the end of a batch. - Adds a test for batches that end with compacted records - Adds a test for batches truncated due to MaxBytes Co-authored-by: iddqdeika <[email protected]>
1 parent 54559ab commit 999d0b3

File tree

4 files changed

+311
-7
lines changed

4 files changed

+311
-7
lines changed

batch.go

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,15 @@ type Batch struct {
2828
offset int64
2929
highWaterMark int64
3030
err error
31+
// The last offset in the batch.
32+
//
33+
// We use lastOffset to skip offsets that have been compacted away.
34+
//
35+
// We store lastOffset because we get lastOffset when we read a new message
36+
// but only try to handle compaction when we receive an EOF. However, when
37+
// we get an EOF we do not get the lastOffset. So there is a mismatch
38+
// between when we receive it and need to use it.
39+
lastOffset int64
3140
}
3241

3342
// Throttle gives the throttling duration applied by the kafka server on the
@@ -190,6 +199,8 @@ func (batch *Batch) ReadMessage() (Message, error) {
190199
return
191200
},
192201
)
202+
// A batch may start before the requested offset so skip messages
203+
// until the requested offset is reached.
193204
for batch.conn != nil && offset < batch.conn.offset {
194205
if err != nil {
195206
break
@@ -225,10 +236,12 @@ func (batch *Batch) readMessage(
225236
return
226237
}
227238

228-
offset, timestamp, headers, err = batch.msgs.readMessage(batch.offset, key, val)
239+
var lastOffset int64
240+
offset, lastOffset, timestamp, headers, err = batch.msgs.readMessage(batch.offset, key, val)
229241
switch err {
230242
case nil:
231243
batch.offset = offset + 1
244+
batch.lastOffset = lastOffset
232245
case errShortRead:
233246
// As an "optimization" kafka truncates the returned response after
234247
// producing MaxBytes, which could then cause the code to return
@@ -252,6 +265,23 @@ func (batch *Batch) readMessage(
252265
// read deadline management.
253266
err = checkTimeoutErr(batch.deadline)
254267
batch.err = err
268+
269+
// Checks the following:
270+
// - `batch.err` for a "success" from the previous timeout check
271+
// - `batch.msgs.lengthRemain` to ensure that this EOF is not due
272+
// to MaxBytes truncation
273+
// - `batch.lastOffset` to ensure that the message format contains
274+
// `lastOffset`
275+
if batch.err == io.EOF && batch.msgs.lengthRemain == 0 && batch.lastOffset != -1 {
276+
// Log compaction can create batches that end with compacted
277+
// records so the normal strategy that increments the "next"
278+
// offset as records are read doesn't work as the compacted
279+
// records are "missing" and never get "read".
280+
//
281+
// In order to reliably reach the next non-compacted offset we
282+
// jump past the saved lastOffset.
283+
batch.offset = batch.lastOffset + 1
284+
}
255285
}
256286
default:
257287
// Since io.EOF is used by the batch to indicate that there is are

message_reader.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ type messageSetReader struct {
1818
*readerStack // used for decompressing compressed messages and record batches
1919
empty bool // if true, short circuits messageSetReader methods
2020
debug bool // enable debug log messages
21+
// How many bytes are expected to remain in the response.
22+
//
23+
// This is used to detect truncation of the response.
24+
lengthRemain int
2125
}
2226

2327
type readerStack struct {
@@ -114,7 +118,7 @@ func (r *messageSetReader) discard() (err error) {
114118
}
115119

116120
func (r *messageSetReader) readMessage(min int64, key readBytesFunc, val readBytesFunc) (
117-
offset int64, timestamp int64, headers []Header, err error) {
121+
offset int64, lastOffset int64, timestamp int64, headers []Header, err error) {
118122

119123
if r.empty {
120124
err = RequestTimedOut
@@ -126,8 +130,10 @@ func (r *messageSetReader) readMessage(min int64, key readBytesFunc, val readByt
126130
switch r.header.magic {
127131
case 0, 1:
128132
offset, timestamp, headers, err = r.readMessageV1(min, key, val)
133+
// Set an invalid value so that it can be ignored
134+
lastOffset = -1
129135
case 2:
130-
offset, timestamp, headers, err = r.readMessageV2(min, key, val)
136+
offset, lastOffset, timestamp, headers, err = r.readMessageV2(min, key, val)
131137
default:
132138
err = r.header.badMagic()
133139
}
@@ -239,7 +245,7 @@ func (r *messageSetReader) readMessageV1(min int64, key readBytesFunc, val readB
239245
}
240246

241247
func (r *messageSetReader) readMessageV2(_ int64, key readBytesFunc, val readBytesFunc) (
242-
offset int64, timestamp int64, headers []Header, err error) {
248+
offset int64, lastOffset int64, timestamp int64, headers []Header, err error) {
243249
if err = r.readHeader(); err != nil {
244250
return
245251
}
@@ -282,10 +288,12 @@ func (r *messageSetReader) readMessageV2(_ int64, key readBytesFunc, val readByt
282288
r.readerStack.parent.count = 0
283289
}
284290
}
291+
remainBefore := r.remain
285292
var length int64
286293
if err = r.readVarInt(&length); err != nil {
287294
return
288295
}
296+
lengthOfLength := remainBefore - r.remain
289297
var attrs int8
290298
if err = r.readInt8(&attrs); err != nil {
291299
return
@@ -316,6 +324,8 @@ func (r *messageSetReader) readMessageV2(_ int64, key readBytesFunc, val readByt
316324
return
317325
}
318326
}
327+
lastOffset = r.header.firstOffset + int64(r.header.v2.lastOffsetDelta)
328+
r.lengthRemain -= int(length) + lengthOfLength
319329
r.markRead()
320330
return
321331
}
@@ -407,6 +417,9 @@ func (r *messageSetReader) readHeader() (err error) {
407417
return
408418
}
409419
r.count = 1
420+
// Set arbitrary non-zero length so that we always assume the
421+
// message is truncated since bytes remain.
422+
r.lengthRemain = 1
410423
r.log("Read v0 header with offset=%d len=%d magic=%d attributes=%d", r.header.firstOffset, r.header.length, r.header.magic, r.header.v1.attributes)
411424
case 1:
412425
r.header.crc = crcOrLeaderEpoch
@@ -417,6 +430,9 @@ func (r *messageSetReader) readHeader() (err error) {
417430
return
418431
}
419432
r.count = 1
433+
// Set arbitrary non-zero length so that we always assume the
434+
// message is truncated since bytes remain.
435+
r.lengthRemain = 1
420436
r.log("Read v1 header with remain=%d offset=%d magic=%d and attributes=%d", r.remain, r.header.firstOffset, r.header.magic, r.header.v1.attributes)
421437
case 2:
422438
r.header.v2.leaderEpoch = crcOrLeaderEpoch
@@ -448,6 +464,8 @@ func (r *messageSetReader) readHeader() (err error) {
448464
return
449465
}
450466
r.count = int(r.header.v2.count)
467+
// Subtracts the header bytes from the length
468+
r.lengthRemain = int(r.header.length) - 49
451469
r.log("Read v2 header with count=%d offset=%d len=%d magic=%d attributes=%d", r.count, r.header.firstOffset, r.header.length, r.header.magic, r.header.v2.attributes)
452470
default:
453471
err = r.header.badMagic()

message_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -541,7 +541,7 @@ func TestMessageSetReaderEmpty(t *testing.T) {
541541
return 0, nil
542542
}
543543

544-
offset, timestamp, headers, err := m.readMessage(0, noop, noop)
544+
offset, _, timestamp, headers, err := m.readMessage(0, noop, noop)
545545
if offset != 0 {
546546
t.Errorf("expected offset of 0, get %d", offset)
547547
}
@@ -737,12 +737,12 @@ func (r *readerHelper) readMessageErr() (msg Message, err error) {
737737
}
738738
var timestamp int64
739739
var headers []Header
740-
r.offset, timestamp, headers, err = r.messageSetReader.readMessage(r.offset, keyFunc, valueFunc)
740+
r.offset, _, timestamp, headers, err = r.messageSetReader.readMessage(r.offset, keyFunc, valueFunc)
741741
if err != nil {
742742
return
743743
}
744744
msg.Offset = r.offset
745-
msg.Time = time.Unix(timestamp / 1000, (timestamp % 1000) * 1000000)
745+
msg.Time = time.Unix(timestamp/1000, (timestamp%1000)*1000000)
746746
msg.Headers = headers
747747
return
748748
}

0 commit comments

Comments
 (0)