Skip to content

Commit 8492075

Browse files
authored
Merge pull request #1132 from segmentio/issue-1050-message-batching-headers
Writer: ensure batching logic factors in message headers
2 parents df7e711 + cb2a487 commit 8492075

File tree

3 files changed

+84
-7
lines changed

3 files changed

+84
-7
lines changed

message.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,17 @@ func (msg *Message) size() int32 {
4949
return 4 + 1 + 1 + sizeofBytes(msg.Key) + sizeofBytes(msg.Value) + timestampSize
5050
}
5151

52+
func (msg *Message) headerSize() int {
53+
return varArrayLen(len(msg.Headers), func(i int) int {
54+
h := &msg.Headers[i]
55+
return varStringLen(h.Key) + varBytesLen(h.Value)
56+
})
57+
}
58+
59+
func (msg *Message) totalSize() int32 {
60+
return int32(msg.headerSize()) + msg.size()
61+
}
62+
5263
type message struct {
5364
CRC int32
5465
MagicByte int8

writer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -624,7 +624,7 @@ func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {
624624
batchBytes := w.batchBytes()
625625

626626
for i := range msgs {
627-
n := int64(msgs[i].size())
627+
n := int64(msgs[i].totalSize())
628628
if n > batchBytes {
629629
// This error is left for backward compatibility with historical
630630
// behavior, but it can yield O(N^2) behaviors. The expectations
@@ -1219,7 +1219,7 @@ func newWriteBatch(now time.Time, timeout time.Duration) *writeBatch {
12191219
}
12201220

12211221
func (b *writeBatch) add(msg Message, maxSize int, maxBytes int64) bool {
1222-
bytes := int64(msg.size())
1222+
bytes := int64(msg.totalSize())
12231223

12241224
if b.size > 0 && (b.bytes+bytes) > maxBytes {
12251225
return false

writer_test.go

Lines changed: 71 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"io"
88
"math"
99
"strconv"
10+
"strings"
1011
"sync"
1112
"testing"
1213
"time"
@@ -134,6 +135,10 @@ func TestWriter(t *testing.T) {
134135
scenario: "writing messages with a small batch byte size",
135136
function: testWriterSmallBatchBytes,
136137
},
138+
{
139+
scenario: "writing messages with headers",
140+
function: testWriterBatchBytesHeaders,
141+
},
137142
{
138143
scenario: "setting a non default balancer on the writer",
139144
function: testWriterSetsRightBalancer,
@@ -449,7 +454,7 @@ func testWriterBatchBytes(t *testing.T) {
449454

450455
w := newTestWriter(WriterConfig{
451456
Topic: topic,
452-
BatchBytes: 48,
457+
BatchBytes: 50,
453458
BatchTimeout: math.MaxInt32 * time.Second,
454459
Balancer: &RoundRobin{},
455460
})
@@ -458,10 +463,10 @@ func testWriterBatchBytes(t *testing.T) {
458463
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
459464
defer cancel()
460465
if err := w.WriteMessages(ctx, []Message{
461-
{Value: []byte("M0")}, // 24 Bytes
462-
{Value: []byte("M1")}, // 24 Bytes
463-
{Value: []byte("M2")}, // 24 Bytes
464-
{Value: []byte("M3")}, // 24 Bytes
466+
{Value: []byte("M0")}, // 25 Bytes
467+
{Value: []byte("M1")}, // 25 Bytes
468+
{Value: []byte("M2")}, // 25 Bytes
469+
{Value: []byte("M3")}, // 25 Bytes
465470
}...); err != nil {
466471
t.Error(err)
467472
return
@@ -592,6 +597,67 @@ func testWriterSmallBatchBytes(t *testing.T) {
592597
}
593598
}
594599

600+
func testWriterBatchBytesHeaders(t *testing.T) {
601+
topic := makeTopic()
602+
createTopic(t, topic, 1)
603+
defer deleteTopic(t, topic)
604+
605+
offset, err := readOffset(topic, 0)
606+
if err != nil {
607+
t.Fatal(err)
608+
}
609+
610+
w := newTestWriter(WriterConfig{
611+
Topic: topic,
612+
BatchBytes: 100,
613+
BatchTimeout: 50 * time.Millisecond,
614+
Balancer: &RoundRobin{},
615+
})
616+
defer w.Close()
617+
618+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
619+
defer cancel()
620+
if err := w.WriteMessages(ctx, []Message{
621+
{
622+
Value: []byte("Hello World 1"),
623+
Headers: []Header{
624+
{Key: "User-Agent", Value: []byte("abc/xyz")},
625+
},
626+
},
627+
{
628+
Value: []byte("Hello World 2"),
629+
Headers: []Header{
630+
{Key: "User-Agent", Value: []byte("abc/xyz")},
631+
},
632+
},
633+
}...); err != nil {
634+
t.Error(err)
635+
return
636+
}
637+
ws := w.Stats()
638+
if ws.Writes != 2 {
639+
t.Error("didn't batch messages; Writes: ", ws.Writes)
640+
return
641+
}
642+
msgs, err := readPartition(topic, 0, offset)
643+
if err != nil {
644+
t.Error("error reading partition", err)
645+
return
646+
}
647+
648+
if len(msgs) != 2 {
649+
t.Error("bad messages in partition", msgs)
650+
return
651+
}
652+
653+
for _, m := range msgs {
654+
if strings.HasPrefix(string(m.Value), "Hello World") {
655+
continue
656+
}
657+
t.Error("bad messages in partition", msgs)
658+
}
659+
}
660+
595661
func testWriterMultipleTopics(t *testing.T) {
596662
topic1 := makeTopic()
597663
createTopic(t, topic1, 1)

0 commit comments

Comments
 (0)