Skip to content

Commit 18548e9

Browse files
author
Steve van Loben Sels
authored
Add CRC32 and Murmur2 balancers (segmentio#334)
1 parent b4c376a commit 18548e9

File tree

3 files changed

+376
-4
lines changed

3 files changed

+376
-4
lines changed

README.md

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -231,20 +231,49 @@ w.Close()
231231
**Note:** Even though kafka.Message contain ```Topic``` and ```Partition``` fields, they **MUST NOT** be
232232
set when writing messages. They are intended for read use only.
233233

234-
### Compatibility with Sarama
234+
### Compatibility with other clients
235+
236+
#### Sarama
235237

236238
If you're switching from Sarama and need/want to use the same algorithm for message
237239
partitioning, you can use the ```kafka.Hash``` balancer. ```kafka.Hash``` routes
238-
messages to the same partitions that sarama's default partitioner would route to.
240+
messages to the same partitions that Sarama's default partitioner would route to.
239241

240242
```go
241243
w := kafka.NewWriter(kafka.WriterConfig{
242-
Brokers: []string{"localhost:9092"},
243-
Topic: "topic-A",
244+
Brokers: []string{"localhost:9092"},
245+
Topic: "topic-A",
244246
Balancer: &kafka.Hash{},
245247
})
246248
```
247249

250+
#### librdkafka and confluent-kafka-go
251+
252+
Use the ```kafka.CRC32Balancer``` balancer to get the same behaviour as librdkafka's
253+
default ```consistent_random``` partition strategy.
254+
255+
```go
256+
w := kafka.NewWriter(kafka.WriterConfig{
257+
Brokers: []string{"localhost:9092"},
258+
Topic: "topic-A",
259+
Balancer: kafka.CRC32Balancer{},
260+
})
261+
```
262+
263+
#### Java
264+
265+
Use the ```kafka.Murmur2Balancer``` balancer to get the same behaviour as the canonical
266+
Java client's default partitioner. Note: the Java class allows you to directly specify
267+
the partition which is not permitted.
268+
269+
```go
270+
w := kafka.NewWriter(kafka.WriterConfig{
271+
Brokers: []string{"localhost:9092"},
272+
Topic: "topic-A",
273+
Balancer: kafka.Murmur2Balancer{},
274+
})
275+
```
276+
248277
### Compression
249278

250279
Compression can be enabled on the `Writer` by configuring the `CompressionCodec`:

balancer.go

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package kafka
22

33
import (
44
"hash"
5+
"hash/crc32"
56
"hash/fnv"
7+
"math/rand"
68
"sort"
79
"sync"
810
)
@@ -158,3 +160,128 @@ func (h *Hash) Balance(msg Message, partitions ...int) (partition int) {
158160

159161
return
160162
}
163+
164+
type randomBalancer struct {
165+
mock int // mocked return value, used for testing
166+
}
167+
168+
func (b randomBalancer) Balance(msg Message, partitions ...int) (partition int) {
169+
if b.mock != 0 {
170+
return b.mock
171+
}
172+
return partitions[rand.Int()%len(partitions)]
173+
}
174+
175+
// CRC32Balancer is a Balancer that uses the CRC32 hash function to determine
176+
// which partition to route messages to. This ensures that messages with the
177+
// same key are routed to the same partition. This balancer is compatible with
178+
// the built-in hash partitioners in librdkafka and the language bindings that
179+
// are built on top of it, including the
180+
// github.com/confluentinc/confluent-kafka-go Go package.
181+
//
182+
// With the Consistent field false (default), this partitioner is equivalent to
183+
// the "consistent_random" setting in librdkafka. When Consistent is true, this
184+
// partitioner is equivalent to the "consistent" setting. The latter will hash
185+
// empty or nil keys into the same partition.
186+
//
187+
// Unless you are absolutely certain that all your messages will have keys, it's
188+
// best to leave the Consistent flag off. Otherwise, you run the risk of
189+
// creating a very hot partition.
190+
type CRC32Balancer struct {
191+
Consistent bool
192+
random randomBalancer
193+
}
194+
195+
func (b CRC32Balancer) Balance(msg Message, partitions ...int) (partition int) {
196+
// NOTE: the crc32 balancers in librdkafka don't differentiate between nil
197+
// and empty keys. both cases are treated as unset.
198+
if len(msg.Key) == 0 && !b.Consistent {
199+
return b.random.Balance(msg, partitions...)
200+
}
201+
202+
idx := crc32.ChecksumIEEE(msg.Key) % uint32(len(partitions))
203+
return partitions[idx]
204+
}
205+
206+
// Murmur2Balancer is a Balancer that uses the Murmur2 hash function to
207+
// determine which partition to route messages to. This ensures that messages
208+
// with the same key are routed to the same partition. This balancer is
209+
// compatible with the partitioner used by the Java library and by librdkafka's
210+
// "murmur2" and "murmur2_random" partitioners. /
211+
//
212+
// With the Consistent field false (default), this partitioner is equivalent to
213+
// the "murmur2_random" setting in librdkafka. When Consistent is true, this
214+
// partitioner is equivalent to the "murmur2" setting. The latter will hash
215+
// nil keys into the same partition. Empty, non-nil keys are always hashed to
216+
// the same partition regardless of configuration.
217+
//
218+
// Unless you are absolutely certain that all your messages will have keys, it's
219+
// best to leave the Consistent flag off. Otherwise, you run the risk of
220+
// creating a very hot partition.
221+
//
222+
// Note that the librdkafka documentation states that the "murmur2_random" is
223+
// functionally equivalent to the default Java partitioner. That's because the
224+
// Java partitioner will use a round robin balancer instead of random on nil
225+
// keys. We choose librdkafka's implementation because it arguably has a larger
226+
// install base.
227+
type Murmur2Balancer struct {
228+
Consistent bool
229+
random randomBalancer
230+
}
231+
232+
func (b Murmur2Balancer) Balance(msg Message, partitions ...int) (partition int) {
233+
// NOTE: the murmur2 balancers in java and librdkafka treat a nil key as
234+
// non-existent while treating an empty slice as a defined value.
235+
if msg.Key == nil && !b.Consistent {
236+
return b.random.Balance(msg, partitions...)
237+
}
238+
239+
idx := (murmur2(msg.Key) & 0x7fffffff) % uint32(len(partitions))
240+
return partitions[idx]
241+
}
242+
243+
// Go port of the Java library's murmur2 function.
244+
// https://github.com/apache/kafka/blob/1.0/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L353
245+
func murmur2(data []byte) uint32 {
246+
length := len(data)
247+
const (
248+
seed uint32 = 0x9747b28c
249+
// 'm' and 'r' are mixing constants generated offline.
250+
// They're not really 'magic', they just happen to work well.
251+
m = 0x5bd1e995
252+
r = 24
253+
)
254+
255+
// Initialize the hash to a random value
256+
h := seed ^ uint32(length)
257+
length4 := length / 4
258+
259+
for i := 0; i < length4; i++ {
260+
i4 := i * 4
261+
k := (uint32(data[i4+0]) & 0xff) + ((uint32(data[i4+1]) & 0xff) << 8) + ((uint32(data[i4+2]) & 0xff) << 16) + ((uint32(data[i4+3]) & 0xff) << 24)
262+
k *= m
263+
k ^= k >> r
264+
k *= m
265+
h *= m
266+
h ^= k
267+
}
268+
269+
// Handle the last few bytes of the input array
270+
extra := length % 4
271+
if extra >= 3 {
272+
h ^= (uint32(data[(length & ^3)+2]) & 0xff) << 16
273+
}
274+
if extra >= 2 {
275+
h ^= (uint32(data[(length & ^3)+1]) & 0xff) << 8
276+
}
277+
if extra >= 1 {
278+
h ^= uint32(data[length & ^3]) & 0xff
279+
h *= m
280+
}
281+
282+
h ^= h >> 13
283+
h *= m
284+
h ^= h >> 15
285+
286+
return h
287+
}

0 commit comments

Comments
 (0)