Skip to content

Commit cf40a01

Browse files
halkarAchille
and
Achille
authored
Add WriteBackoffMin/Max config to Writer (#1015)
* Added Writer config to configure min and max delay between reties. No changes in the logic or in default values. * Empty commit * Update writer.go Co-authored-by: Achille <[email protected]>
1 parent 029f34c commit cf40a01

File tree

2 files changed

+75
-28
lines changed

2 files changed

+75
-28
lines changed

writer.go

Lines changed: 58 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,18 @@ type Writer struct {
113113
// The default is to try at most 10 times.
114114
MaxAttempts int
115115

116+
// WriteBackoffMin optionally sets the smallest amount of time the writer waits before
117+
// it attempts to write a batch of messages
118+
//
119+
// Default: 100ms
120+
WriteBackoffMin time.Duration
121+
122+
// WriteBackoffMax optionally sets the maximum amount of time the writer waits before
123+
// it attempts to write a batch of messages
124+
//
125+
// Default: 1s
126+
WriteBackoffMax time.Duration
127+
116128
// Limit on how many messages will be buffered before being sent to a
117129
// partition.
118130
//
@@ -360,13 +372,15 @@ type WriterStats struct {
360372
BatchSize SummaryStats `metric:"kafka.writer.batch.size"`
361373
BatchBytes SummaryStats `metric:"kafka.writer.batch.bytes"`
362374

363-
MaxAttempts int64 `metric:"kafka.writer.attempts.max" type:"gauge"`
364-
MaxBatchSize int64 `metric:"kafka.writer.batch.max" type:"gauge"`
365-
BatchTimeout time.Duration `metric:"kafka.writer.batch.timeout" type:"gauge"`
366-
ReadTimeout time.Duration `metric:"kafka.writer.read.timeout" type:"gauge"`
367-
WriteTimeout time.Duration `metric:"kafka.writer.write.timeout" type:"gauge"`
368-
RequiredAcks int64 `metric:"kafka.writer.acks.required" type:"gauge"`
369-
Async bool `metric:"kafka.writer.async" type:"gauge"`
375+
MaxAttempts int64 `metric:"kafka.writer.attempts.max" type:"gauge"`
376+
WriteBackoffMin time.Duration `metric:"kafka.writer.backoff.min" type:"gauge"`
377+
WriteBackoffMax time.Duration `metric:"kafka.writer.backoff.max" type:"gauge"`
378+
MaxBatchSize int64 `metric:"kafka.writer.batch.max" type:"gauge"`
379+
BatchTimeout time.Duration `metric:"kafka.writer.batch.timeout" type:"gauge"`
380+
ReadTimeout time.Duration `metric:"kafka.writer.read.timeout" type:"gauge"`
381+
WriteTimeout time.Duration `metric:"kafka.writer.write.timeout" type:"gauge"`
382+
RequiredAcks int64 `metric:"kafka.writer.acks.required" type:"gauge"`
383+
Async bool `metric:"kafka.writer.async" type:"gauge"`
370384

371385
Topic string `tag:"topic"`
372386

@@ -759,6 +773,20 @@ func (w *Writer) maxAttempts() int {
759773
return 10
760774
}
761775

776+
func (w *Writer) writeBackoffMin() time.Duration {
777+
if w.WriteBackoffMin > 0 {
778+
return w.WriteBackoffMin
779+
}
780+
return 100 * time.Millisecond
781+
}
782+
783+
func (w *Writer) writeBackoffMax() time.Duration {
784+
if w.WriteBackoffMax > 0 {
785+
return w.WriteBackoffMax
786+
}
787+
return 1 * time.Second
788+
}
789+
762790
func (w *Writer) batchSize() int {
763791
if w.BatchSize > 0 {
764792
return w.BatchSize
@@ -829,26 +857,28 @@ func (w *Writer) stats() *writerStats {
829857
func (w *Writer) Stats() WriterStats {
830858
stats := w.stats()
831859
return WriterStats{
832-
Dials: stats.dials.snapshot(),
833-
Writes: stats.writes.snapshot(),
834-
Messages: stats.messages.snapshot(),
835-
Bytes: stats.bytes.snapshot(),
836-
Errors: stats.errors.snapshot(),
837-
DialTime: stats.dialTime.snapshotDuration(),
838-
BatchTime: stats.batchTime.snapshotDuration(),
839-
WriteTime: stats.writeTime.snapshotDuration(),
840-
WaitTime: stats.waitTime.snapshotDuration(),
841-
Retries: stats.retries.snapshot(),
842-
BatchSize: stats.batchSize.snapshot(),
843-
BatchBytes: stats.batchSizeBytes.snapshot(),
844-
MaxAttempts: int64(w.MaxAttempts),
845-
MaxBatchSize: int64(w.BatchSize),
846-
BatchTimeout: w.BatchTimeout,
847-
ReadTimeout: w.ReadTimeout,
848-
WriteTimeout: w.WriteTimeout,
849-
RequiredAcks: int64(w.RequiredAcks),
850-
Async: w.Async,
851-
Topic: w.Topic,
860+
Dials: stats.dials.snapshot(),
861+
Writes: stats.writes.snapshot(),
862+
Messages: stats.messages.snapshot(),
863+
Bytes: stats.bytes.snapshot(),
864+
Errors: stats.errors.snapshot(),
865+
DialTime: stats.dialTime.snapshotDuration(),
866+
BatchTime: stats.batchTime.snapshotDuration(),
867+
WriteTime: stats.writeTime.snapshotDuration(),
868+
WaitTime: stats.waitTime.snapshotDuration(),
869+
Retries: stats.retries.snapshot(),
870+
BatchSize: stats.batchSize.snapshot(),
871+
BatchBytes: stats.batchSizeBytes.snapshot(),
872+
MaxAttempts: int64(w.MaxAttempts),
873+
WriteBackoffMin: w.WriteBackoffMin,
874+
WriteBackoffMax: w.WriteBackoffMax,
875+
MaxBatchSize: int64(w.BatchSize),
876+
BatchTimeout: w.BatchTimeout,
877+
ReadTimeout: w.ReadTimeout,
878+
WriteTimeout: w.WriteTimeout,
879+
RequiredAcks: int64(w.RequiredAcks),
880+
Async: w.Async,
881+
Topic: w.Topic,
852882
}
853883
}
854884

@@ -1066,7 +1096,7 @@ func (ptw *partitionWriter) writeBatch(batch *writeBatch) {
10661096
// guarantees to abort, but may be better to avoid long wait times
10671097
// on close.
10681098
//
1069-
delay := backoff(attempt, 100*time.Millisecond, 1*time.Second)
1099+
delay := backoff(attempt, ptw.w.writeBackoffMin(), ptw.w.writeBackoffMax())
10701100
ptw.w.withLogger(func(log Logger) {
10711101
log.Printf("backing off %s writing %d messages to %s (partition: %d)", delay, len(batch.msgs), key.topic, key.partition)
10721102
})

writer_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,10 @@ func TestWriter(t *testing.T) {
170170
scenario: "writing a message with SASL Plain authentication",
171171
function: testWriterSasl,
172172
},
173+
{
174+
scenario: "test default configuration values",
175+
function: testWriterDefaults,
176+
},
173177
}
174178

175179
for _, test := range tests {
@@ -818,6 +822,19 @@ func testWriterSasl(t *testing.T) {
818822
}
819823
}
820824

825+
func testWriterDefaults(t *testing.T) {
826+
w := &Writer{}
827+
defer w.Close()
828+
829+
if w.writeBackoffMin() != 100*time.Millisecond {
830+
t.Error("Incorrect default min write backoff delay")
831+
}
832+
833+
if w.writeBackoffMax() != 1*time.Second {
834+
t.Error("Incorrect default max write backoff delay")
835+
}
836+
}
837+
821838
type staticBalancer struct {
822839
partition int
823840
}

0 commit comments

Comments
 (0)