Skip to content

Commit f568774

Browse files
authored
Add Chunk Size to RR Balancer (Increased Batching Ability) (#1232)
* Add Chunk Size to RR Balancer
1 parent 6481322 commit f568774

File tree

2 files changed

+80
-6
lines changed

2 files changed

+80
-6
lines changed

balancer.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,14 @@ func (f BalancerFunc) Balance(msg Message, partitions ...int) int {
3636
}
3737

3838
// RoundRobin is an Balancer implementation that equally distributes messages
39-
// across all available partitions.
39+
// across all available partitions. It can take an optional chunk size to send
40+
// ChunkSize messages to the same partition before moving to the next partition.
41+
// This can be used to improve batch sizes.
4042
type RoundRobin struct {
43+
ChunkSize int
4144
// Use a 32 bits integer so RoundRobin values don't need to be aligned to
4245
// apply atomic increments.
43-
offset uint32
46+
counter uint32
4447
}
4548

4649
// Balance satisfies the Balancer interface.
@@ -49,8 +52,14 @@ func (rr *RoundRobin) Balance(msg Message, partitions ...int) int {
4952
}
5053

5154
func (rr *RoundRobin) balance(partitions []int) int {
52-
length := uint32(len(partitions))
53-
offset := atomic.AddUint32(&rr.offset, 1) - 1
55+
if rr.ChunkSize < 1 {
56+
rr.ChunkSize = 1
57+
}
58+
59+
length := len(partitions)
60+
counterNow := atomic.LoadUint32(&rr.counter)
61+
offset := int(counterNow / uint32(rr.ChunkSize))
62+
atomic.AddUint32(&rr.counter, 1)
5463
return partitions[offset%length]
5564
}
5665

@@ -122,7 +131,7 @@ var (
122131
//
123132
// The logic to calculate the partition is:
124133
//
125-
// hasher.Sum32() % len(partitions) => partition
134+
// hasher.Sum32() % len(partitions) => partition
126135
//
127136
// By default, Hash uses the FNV-1a algorithm. This is the same algorithm used
128137
// by the Sarama Producer and ensures that messages produced by kafka-go will
@@ -173,7 +182,7 @@ func (h *Hash) Balance(msg Message, partitions ...int) int {
173182
//
174183
// The logic to calculate the partition is:
175184
//
176-
// (int32(hasher.Sum32()) & 0x7fffffff) % len(partitions) => partition
185+
// (int32(hasher.Sum32()) & 0x7fffffff) % len(partitions) => partition
177186
//
178187
// By default, ReferenceHash uses the FNV-1a algorithm. This is the same algorithm as
179188
// the Sarama NewReferenceHashPartitioner and ensures that messages produced by kafka-go will

balancer_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,3 +411,68 @@ func TestLeastBytes(t *testing.T) {
411411
})
412412
}
413413
}
414+
415+
func TestRoundRobin(t *testing.T) {
416+
testCases := map[string]struct {
417+
Partitions []int
418+
ChunkSize int
419+
}{
420+
"default - odd partition count": {
421+
Partitions: []int{0, 1, 2, 3, 4, 5, 6},
422+
},
423+
"negative chunk size - odd partition count": {
424+
Partitions: []int{0, 1, 2, 3, 4, 5, 6},
425+
ChunkSize: -1,
426+
},
427+
"0 chunk size - odd partition count": {
428+
Partitions: []int{0, 1, 2, 3, 4, 5, 6},
429+
ChunkSize: 0,
430+
},
431+
"5 chunk size - odd partition count": {
432+
Partitions: []int{0, 1, 2, 3, 4, 5, 6},
433+
ChunkSize: 5,
434+
},
435+
"12 chunk size - odd partition count": {
436+
Partitions: []int{0, 1, 2, 3, 4, 5, 6},
437+
ChunkSize: 12,
438+
},
439+
"default - even partition count": {
440+
Partitions: []int{0, 1, 2, 3, 4, 5, 6, 7},
441+
},
442+
"negative chunk size - even partition count": {
443+
Partitions: []int{0, 1, 2, 3, 4, 5, 6, 7},
444+
ChunkSize: -1,
445+
},
446+
"0 chunk size - even partition count": {
447+
Partitions: []int{0, 1, 2, 3, 4, 5, 6, 7},
448+
ChunkSize: 0,
449+
},
450+
"5 chunk size - even partition count": {
451+
Partitions: []int{0, 1, 2, 3, 4, 5, 6, 7},
452+
ChunkSize: 5,
453+
},
454+
"12 chunk size - even partition count": {
455+
Partitions: []int{0, 1, 2, 3, 4, 5, 6, 7},
456+
ChunkSize: 12,
457+
},
458+
}
459+
for label, test := range testCases {
460+
t.Run(label, func(t *testing.T) {
461+
lb := &RoundRobin{ChunkSize: test.ChunkSize}
462+
msg := Message{}
463+
var partition int
464+
var i int
465+
expectedChunkSize := test.ChunkSize
466+
if expectedChunkSize < 1 {
467+
expectedChunkSize = 1
468+
}
469+
partitions := test.Partitions
470+
for i = 0; i < 50; i++ {
471+
partition = lb.Balance(msg, partitions...)
472+
if partition != i/expectedChunkSize%len(partitions) {
473+
t.Error("Returned partition", partition, "expecting", i/expectedChunkSize%len(partitions))
474+
}
475+
}
476+
})
477+
}
478+
}

0 commit comments

Comments
 (0)