Skip to content

Commit c2af8e7

Browse files
author
Steve van Loben Sels
authored
Fixed compatibility with the sarama hash partitioner (segmentio#336)
Previously, if the 32 bit hash code had its most significant bit set, kafka-go and sarama would select different partitions.
1 parent 433ef07 commit c2af8e7

File tree

2 files changed

+12
-3
lines changed

2 files changed

+12
-3
lines changed

balancer.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ type Hash struct {
141141
lock sync.Mutex
142142
}
143143

144-
func (h *Hash) Balance(msg Message, partitions ...int) (partition int) {
144+
func (h *Hash) Balance(msg Message, partitions ...int) int {
145145
if msg.Key == nil {
146146
return h.rr.Balance(msg, partitions...)
147147
}
@@ -161,12 +161,14 @@ func (h *Hash) Balance(msg Message, partitions ...int) (partition int) {
161161
}
162162

163163
// uses same algorithm that Sarama's hashPartitioner uses
164-
partition = int(hasher.Sum32()) % len(partitions)
164+
// note the type conversions here. if the uint32 hash code is not cast to
165+
// an int32, we do not get the same result as sarama.
166+
partition := int32(hasher.Sum32()) % int32(len(partitions))
165167
if partition < 0 {
166168
partition = -partition
167169
}
168170

169-
return
171+
return int(partition)
170172
}
171173

172174
type randomBalancer struct {

balancer_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,13 @@ func TestHashBalancer(t *testing.T) {
4040
Partitions: []int{0, 1, 2},
4141
Partition: 1,
4242
},
43+
// in a previous version, this test would select a different partition
44+
// than sarama's hash partitioner.
45+
"hash code with MSB set": {
46+
Key: []byte("20"),
47+
Partitions: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
48+
Partition: 1,
49+
},
4350
}
4451

4552
for label, test := range testCases {

0 commit comments

Comments
 (0)