Skip to content

Commit eb07fa4

Browse files
authored
Support metadata request v6 (#1013)
* Support metadata request v6 * Empty commit to trigger circleci * Another empty commit * Fix TestDialer, add OfflineReplicas to expected response. * Fix tests I'm assuming it's ok to send AllowAutoTopicCreation: true in metadata v6 request, since server auto.create.topics.enable controls this anyway. * Set OfflineReplicas to empty array when it doesn't exist in the v1 response. Maybe fixing TestDialer for old kafkas * Fix returning errors from readTopicMetadataV1/6.
1 parent c1240f0 commit eb07fa4

File tree

6 files changed

+220
-50
lines changed

6 files changed

+220
-50
lines changed

conn.go

Lines changed: 87 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -943,48 +943,101 @@ func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err err
943943
topics = nil
944944
}
945945
}
946+
metadataVersion, err := c.negotiateVersion(metadata, v1, v6)
947+
if err != nil {
948+
return nil, err
949+
}
946950

947951
err = c.readOperation(
948952
func(deadline time.Time, id int32) error {
949-
return c.writeRequest(metadata, v1, id, topicMetadataRequestV1(topics))
953+
switch metadataVersion {
954+
case v6:
955+
return c.writeRequest(metadata, v6, id, topicMetadataRequestV6{Topics: topics, AllowAutoTopicCreation: true})
956+
default:
957+
return c.writeRequest(metadata, v1, id, topicMetadataRequestV1(topics))
958+
}
950959
},
951960
func(deadline time.Time, size int) error {
952-
var res metadataResponseV1
961+
partitions, err = c.readPartitionsResponse(metadataVersion, size)
962+
return err
963+
},
964+
)
965+
return
966+
}
953967

954-
if err := c.readResponse(size, &res); err != nil {
955-
return err
956-
}
968+
func (c *Conn) readPartitionsResponse(metadataVersion apiVersion, size int) ([]Partition, error) {
969+
switch metadataVersion {
970+
case v6:
971+
var res metadataResponseV6
972+
if err := c.readResponse(size, &res); err != nil {
973+
return nil, err
974+
}
975+
brokers := readBrokerMetadata(res.Brokers)
976+
return c.readTopicMetadatav6(brokers, res.Topics)
977+
default:
978+
var res metadataResponseV1
979+
if err := c.readResponse(size, &res); err != nil {
980+
return nil, err
981+
}
982+
brokers := readBrokerMetadata(res.Brokers)
983+
return c.readTopicMetadatav1(brokers, res.Topics)
984+
}
985+
}
957986

958-
brokers := make(map[int32]Broker, len(res.Brokers))
959-
for _, b := range res.Brokers {
960-
brokers[b.NodeID] = Broker{
961-
Host: b.Host,
962-
Port: int(b.Port),
963-
ID: int(b.NodeID),
964-
Rack: b.Rack,
965-
}
966-
}
987+
func readBrokerMetadata(brokerMetadata []brokerMetadataV1) map[int32]Broker {
988+
brokers := make(map[int32]Broker, len(brokerMetadata))
989+
for _, b := range brokerMetadata {
990+
brokers[b.NodeID] = Broker{
991+
Host: b.Host,
992+
Port: int(b.Port),
993+
ID: int(b.NodeID),
994+
Rack: b.Rack,
995+
}
996+
}
997+
return brokers
998+
}
967999

968-
for _, t := range res.Topics {
969-
if t.TopicErrorCode != 0 && (c.topic == "" || t.TopicName == c.topic) {
970-
// We only report errors if they happened for the topic of
971-
// the connection, otherwise the topic will simply have no
972-
// partitions in the result set.
973-
return Error(t.TopicErrorCode)
974-
}
975-
for _, p := range t.Partitions {
976-
partitions = append(partitions, Partition{
977-
Topic: t.TopicName,
978-
Leader: brokers[p.Leader],
979-
Replicas: makeBrokers(brokers, p.Replicas...),
980-
Isr: makeBrokers(brokers, p.Isr...),
981-
ID: int(p.PartitionID),
982-
})
983-
}
984-
}
985-
return nil
986-
},
987-
)
1000+
func (c *Conn) readTopicMetadatav1(brokers map[int32]Broker, topicMetadata []topicMetadataV1) (partitions []Partition, err error) {
1001+
for _, t := range topicMetadata {
1002+
if t.TopicErrorCode != 0 && (c.topic == "" || t.TopicName == c.topic) {
1003+
// We only report errors if they happened for the topic of
1004+
// the connection, otherwise the topic will simply have no
1005+
// partitions in the result set.
1006+
return nil, Error(t.TopicErrorCode)
1007+
}
1008+
for _, p := range t.Partitions {
1009+
partitions = append(partitions, Partition{
1010+
Topic: t.TopicName,
1011+
Leader: brokers[p.Leader],
1012+
Replicas: makeBrokers(brokers, p.Replicas...),
1013+
Isr: makeBrokers(brokers, p.Isr...),
1014+
ID: int(p.PartitionID),
1015+
OfflineReplicas: []Broker{},
1016+
})
1017+
}
1018+
}
1019+
return
1020+
}
1021+
1022+
func (c *Conn) readTopicMetadatav6(brokers map[int32]Broker, topicMetadata []topicMetadataV6) (partitions []Partition, err error) {
1023+
for _, t := range topicMetadata {
1024+
if t.TopicErrorCode != 0 && (c.topic == "" || t.TopicName == c.topic) {
1025+
// We only report errors if they happened for the topic of
1026+
// the connection, otherwise the topic will simply have no
1027+
// partitions in the result set.
1028+
return nil, Error(t.TopicErrorCode)
1029+
}
1030+
for _, p := range t.Partitions {
1031+
partitions = append(partitions, Partition{
1032+
Topic: t.TopicName,
1033+
Leader: brokers[p.Leader],
1034+
Replicas: makeBrokers(brokers, p.Replicas...),
1035+
Isr: makeBrokers(brokers, p.Isr...),
1036+
ID: int(p.PartitionID),
1037+
OfflineReplicas: makeBrokers(brokers, p.OfflineReplicas...),
1038+
})
1039+
}
1040+
}
9881041
return
9891042
}
9901043

dialer_test.go

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,12 @@ func testDialerLookupPartitions(t *testing.T, ctx context.Context, d *Dialer) {
6161

6262
want := []Partition{
6363
{
64-
Topic: topic,
65-
Leader: Broker{Host: "localhost", Port: 9092, ID: 1},
66-
Replicas: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
67-
Isr: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
68-
ID: 0,
64+
Topic: topic,
65+
Leader: Broker{Host: "localhost", Port: 9092, ID: 1},
66+
Replicas: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
67+
Isr: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
68+
OfflineReplicas: []Broker{},
69+
ID: 0,
6970
},
7071
}
7172
if !reflect.DeepEqual(partitions, want) {
@@ -230,11 +231,12 @@ func TestDialerTLS(t *testing.T) {
230231

231232
want := []Partition{
232233
{
233-
Topic: topic,
234-
Leader: Broker{Host: "localhost", Port: 9092, ID: 1},
235-
Replicas: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
236-
Isr: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
237-
ID: 0,
234+
Topic: topic,
235+
Leader: Broker{Host: "localhost", Port: 9092, ID: 1},
236+
Replicas: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
237+
Isr: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
238+
OfflineReplicas: []Broker{},
239+
ID: 0,
238240
},
239241
}
240242
if !reflect.DeepEqual(partitions, want) {
@@ -377,11 +379,12 @@ func TestDialerResolver(t *testing.T) {
377379

378380
want := []Partition{
379381
{
380-
Topic: topic,
381-
Leader: Broker{Host: "localhost", Port: 9092, ID: 1},
382-
Replicas: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
383-
Isr: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
384-
ID: 0,
382+
Topic: topic,
383+
Leader: Broker{Host: "localhost", Port: 9092, ID: 1},
384+
Replicas: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
385+
Isr: []Broker{{Host: "localhost", Port: 9092, ID: 1}},
386+
OfflineReplicas: []Broker{},
387+
ID: 0,
385388
},
386389
}
387390
if !reflect.DeepEqual(partitions, want) {

kafka.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ type Partition struct {
4747
Replicas []Broker
4848
Isr []Broker
4949

50+
// Available only with metadata API level >= 6:
51+
OfflineReplicas []Broker
52+
5053
// An error that may have occurred while attempting to read the partition
5154
// metadata.
5255
//

metadata.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,3 +203,89 @@ func (p partitionMetadataV1) writeTo(wb *writeBuffer) {
203203
wb.writeInt32Array(p.Replicas)
204204
wb.writeInt32Array(p.Isr)
205205
}
206+
207+
type topicMetadataRequestV6 struct {
208+
Topics []string
209+
AllowAutoTopicCreation bool
210+
}
211+
212+
func (r topicMetadataRequestV6) size() int32 {
213+
return sizeofStringArray([]string(r.Topics)) + 1
214+
}
215+
216+
func (r topicMetadataRequestV6) writeTo(wb *writeBuffer) {
217+
// communicate nil-ness to the broker by passing -1 as the array length.
218+
// for this particular request, the broker interpets a zero length array
219+
// as a request for no topics whereas a nil array is for all topics.
220+
if r.Topics == nil {
221+
wb.writeArrayLen(-1)
222+
} else {
223+
wb.writeStringArray([]string(r.Topics))
224+
}
225+
wb.writeBool(r.AllowAutoTopicCreation)
226+
}
227+
228+
type metadataResponseV6 struct {
229+
ThrottleTimeMs int32
230+
Brokers []brokerMetadataV1
231+
ClusterId string
232+
ControllerID int32
233+
Topics []topicMetadataV6
234+
}
235+
236+
func (r metadataResponseV6) size() int32 {
237+
n1 := sizeofArray(len(r.Brokers), func(i int) int32 { return r.Brokers[i].size() })
238+
n2 := sizeofNullableString(&r.ClusterId)
239+
n3 := sizeofArray(len(r.Topics), func(i int) int32 { return r.Topics[i].size() })
240+
return 4 + 4 + n1 + n2 + n3
241+
}
242+
243+
func (r metadataResponseV6) writeTo(wb *writeBuffer) {
244+
wb.writeInt32(r.ThrottleTimeMs)
245+
wb.writeArray(len(r.Brokers), func(i int) { r.Brokers[i].writeTo(wb) })
246+
wb.writeString(r.ClusterId)
247+
wb.writeInt32(r.ControllerID)
248+
wb.writeArray(len(r.Topics), func(i int) { r.Topics[i].writeTo(wb) })
249+
}
250+
251+
type topicMetadataV6 struct {
252+
TopicErrorCode int16
253+
TopicName string
254+
Internal bool
255+
Partitions []partitionMetadataV6
256+
}
257+
258+
func (t topicMetadataV6) size() int32 {
259+
return 2 + 1 +
260+
sizeofString(t.TopicName) +
261+
sizeofArray(len(t.Partitions), func(i int) int32 { return t.Partitions[i].size() })
262+
}
263+
264+
func (t topicMetadataV6) writeTo(wb *writeBuffer) {
265+
wb.writeInt16(t.TopicErrorCode)
266+
wb.writeString(t.TopicName)
267+
wb.writeBool(t.Internal)
268+
wb.writeArray(len(t.Partitions), func(i int) { t.Partitions[i].writeTo(wb) })
269+
}
270+
271+
type partitionMetadataV6 struct {
272+
PartitionErrorCode int16
273+
PartitionID int32
274+
Leader int32
275+
Replicas []int32
276+
Isr []int32
277+
OfflineReplicas []int32
278+
}
279+
280+
func (p partitionMetadataV6) size() int32 {
281+
return 2 + 4 + 4 + sizeofInt32Array(p.Replicas) + sizeofInt32Array(p.Isr) + sizeofInt32Array(p.OfflineReplicas)
282+
}
283+
284+
func (p partitionMetadataV6) writeTo(wb *writeBuffer) {
285+
wb.writeInt16(p.PartitionErrorCode)
286+
wb.writeInt32(p.PartitionID)
287+
wb.writeInt32(p.Leader)
288+
wb.writeInt32Array(p.Replicas)
289+
wb.writeInt32Array(p.Isr)
290+
wb.writeInt32Array(p.OfflineReplicas)
291+
}

protocol.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,10 +107,11 @@ const (
107107
v2 = 2
108108
v3 = 3
109109
v5 = 5
110+
v6 = 6
110111
v7 = 7
111112
v10 = 10
112113

113-
// Unused protocol versions: v4, v6, v8, v9.
114+
// Unused protocol versions: v4, v8, v9.
114115
)
115116

116117
var apiKeyStrings = [...]string{

protocol_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,30 @@ func TestProtocol(t *testing.T) {
7676
},
7777
},
7878

79+
topicMetadataRequestV6{
80+
Topics: []string{"A", "B", "C"},
81+
AllowAutoTopicCreation: true,
82+
},
83+
84+
metadataResponseV6{
85+
Brokers: []brokerMetadataV1{
86+
{NodeID: 1, Host: "localhost", Port: 9001},
87+
{NodeID: 2, Host: "localhost", Port: 9002, Rack: "rack2"},
88+
},
89+
ClusterId: "cluster",
90+
ControllerID: 2,
91+
Topics: []topicMetadataV6{
92+
{TopicErrorCode: 0, Internal: true, Partitions: []partitionMetadataV6{{
93+
PartitionErrorCode: 0,
94+
PartitionID: 1,
95+
Leader: 2,
96+
Replicas: []int32{1},
97+
Isr: []int32{1},
98+
OfflineReplicas: []int32{1},
99+
}}},
100+
},
101+
},
102+
79103
listOffsetRequestV1{
80104
ReplicaID: 1,
81105
Topics: []listOffsetRequestTopicV1{

0 commit comments

Comments
 (0)