Skip to content

Commit 9066af4

Browse files
savakiachille-roussel
authored andcommitted
Internally, kafka-go offset is consistent with itself. Unfortunately, not with the rest of the world. The commit offset should be the offset of the next message to read and NOT the last message read. Verified by running sarama and kafka-go sequentially to verify they picked up each others offsets. (#62)
Seeing as you were working in commits, I thought I would hop on your commit.
1 parent 7619f5f commit 9066af4

File tree

4 files changed

+29
-10
lines changed

4 files changed

+29
-10
lines changed

commit.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ func makeCommit(msg Message) commit {
1414
return commit{
1515
topic: msg.Topic,
1616
partition: msg.Partition,
17-
offset: msg.Offset,
17+
offset: msg.Offset + 1,
1818
}
1919
}
2020

commit_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package kafka
2+
3+
import "testing"
4+
5+
func TestMakeCommit(t *testing.T) {
6+
msg := Message{
7+
Topic: "blah",
8+
Partition: 1,
9+
Offset: 2,
10+
}
11+
12+
commit := makeCommit(msg)
13+
if commit.topic != msg.Topic {
14+
t.Errorf("bad topic: expected %v; got %v", msg.Topic, commit.topic)
15+
}
16+
if commit.partition != msg.Partition {
17+
t.Errorf("bad partition: expected %v; got %v", msg.Partition, commit.partition)
18+
}
19+
if commit.offset != msg.Offset+1 {
20+
t.Errorf("expected committed offset to be 1 greater than msg offset")
21+
}
22+
}

reader.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -483,9 +483,6 @@ func (r *Reader) fetchOffsets(subs map[string][]int32) (map[int]int64, error) {
483483
for _, partition := range partitions {
484484
if partition == pr.Partition {
485485
offset := pr.Offset
486-
if offset >= 0 {
487-
offset++ // advance to next offset
488-
}
489486
offsetsByPartition[int(partition)] = offset
490487
}
491488
}

reader_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1081,16 +1081,16 @@ func TestOffsetStash(t *testing.T) {
10811081
Given: offsetStash{},
10821082
Messages: []Message{newMessage(0, 0)},
10831083
Expected: offsetStash{
1084-
topic: {0: 0},
1084+
topic: {0: 1},
10851085
},
10861086
},
10871087
"ignores earlier offsets": {
10881088
Given: offsetStash{
1089-
topic: {0: 1},
1089+
topic: {0: 2},
10901090
},
10911091
Messages: []Message{newMessage(0, 0)},
10921092
Expected: offsetStash{
1093-
topic: {0: 1},
1093+
topic: {0: 2},
10941094
},
10951095
},
10961096
"uses latest offset": {
@@ -1101,7 +1101,7 @@ func TestOffsetStash(t *testing.T) {
11011101
newMessage(0, 1),
11021102
},
11031103
Expected: offsetStash{
1104-
topic: {0: 3},
1104+
topic: {0: 4},
11051105
},
11061106
},
11071107
"uses latest offset, across multiple topics": {
@@ -1115,8 +1115,8 @@ func TestOffsetStash(t *testing.T) {
11151115
},
11161116
Expected: offsetStash{
11171117
topic: {
1118-
0: 3,
1119-
1: 6,
1118+
0: 4,
1119+
1: 7,
11201120
},
11211121
},
11221122
},

0 commit comments

Comments
 (0)