Skip to content

Commit ea83b29

Browse files
authored
Allow Writer to automatically create topics. (#775)
* set AutoCreateTopic when requesting metadata in Writer if a metadata request is made with auto create topic set, we now make a metadata request to trigger the topic creation if the topic is not already in the cache and update the cache after the request is made.
1 parent 40933d2 commit ea83b29

File tree

3 files changed

+69
-3
lines changed

3 files changed

+69
-3
lines changed

transport.go

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -344,14 +344,32 @@ func (p *connPool) roundTrip(ctx context.Context, req Request) (Response, error)
344344

345345
switch m := req.(type) {
346346
case *meta.Request:
347-
// We serve metadata requests directly from the transport cache.
347+
// We serve metadata requests directly from the transport cache unless
348+
// we would like to auto create a topic that isn't in our cache.
348349
//
349350
// This reduces the number of round trips to kafka brokers while keeping
350351
// the logic simple when applying partitioning strategies.
351352
if state.err != nil {
352353
return nil, state.err
353354
}
354-
return filterMetadataResponse(m, state.metadata), nil
355+
356+
cachedMeta := filterMetadataResponse(m, state.metadata)
357+
// requestNeeded indicates if we need to send this metadata request to the server.
358+
// It's true when we want to auto-create topics and we don't have the topic in our
359+
// cache.
360+
var requestNeeded bool
361+
if m.AllowAutoTopicCreation {
362+
for _, topic := range cachedMeta.Topics {
363+
if topic.ErrorCode == int16(UnknownTopicOrPartition) {
364+
requestNeeded = true
365+
break
366+
}
367+
}
368+
}
369+
370+
if !requestNeeded {
371+
return cachedMeta, nil
372+
}
355373

356374
case protocol.Splitter:
357375
// Messages that implement the Splitter interface trigger the creation of
@@ -392,6 +410,14 @@ func (p *connPool) roundTrip(ctx context.Context, req Request) (Response, error)
392410
}
393411

394412
p.refreshMetadata(ctx, topicsToRefresh)
413+
case *meta.Response:
414+
m := req.(*meta.Request)
415+
// If we get here with allow auto topic creation then
416+
// we didn't have that topic in our cache so we should update
417+
// the cache.
418+
if m.AllowAutoTopicCreation {
419+
p.refreshMetadata(ctx, m.TopicNames)
420+
}
395421
}
396422

397423
return r, nil

writer.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -702,7 +702,8 @@ func (w *Writer) partitions(ctx context.Context, topic string) (int, error) {
702702
// It is expected that the transport will optimize this request by
703703
// caching recent results (the kafka.Transport types does).
704704
r, err := client.transport().RoundTrip(ctx, client.Addr, &metadataAPI.Request{
705-
TopicNames: []string{topic},
705+
TopicNames: []string{topic},
706+
AllowAutoTopicCreation: true,
706707
})
707708
if err != nil {
708709
return 0, err

writer_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package kafka
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"io"
78
"math"
@@ -155,6 +156,10 @@ func TestWriter(t *testing.T) {
155156
scenario: "writing a message to an invalid partition",
156157
function: testWriterInvalidPartition,
157158
},
159+
{
160+
scenario: "writing a message to a non-existant topic creates the topic",
161+
function: testWriterAutoCreateTopic,
162+
},
158163
}
159164

160165
for _, test := range tests {
@@ -698,6 +703,40 @@ func testWriterUnexpectedMessageTopic(t *testing.T) {
698703
}
699704
}
700705

706+
func testWriterAutoCreateTopic(t *testing.T) {
707+
topic := makeTopic()
708+
// Assume it's going to get created.
709+
defer deleteTopic(t, topic)
710+
711+
w := newTestWriter(WriterConfig{
712+
Topic: topic,
713+
Balancer: &RoundRobin{},
714+
})
715+
defer w.Close()
716+
717+
msg := Message{Key: []byte("key"), Value: []byte("Hello World")}
718+
719+
var err error
720+
const retries = 5
721+
for i := 0; i < retries; i++ {
722+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
723+
defer cancel()
724+
err = w.WriteMessages(ctx, msg)
725+
if errors.Is(err, LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) {
726+
time.Sleep(time.Millisecond * 250)
727+
continue
728+
}
729+
730+
if err != nil {
731+
t.Errorf("unexpected error %v", err)
732+
return
733+
}
734+
}
735+
if err != nil {
736+
t.Errorf("unable to create topic %v", err)
737+
}
738+
}
739+
701740
type staticBalancer struct {
702741
partition int
703742
}

0 commit comments

Comments
 (0)