Skip to content

Commit 40933d2

Browse files
authored
Check for broker existence when forming partition replica and ISR lists (#776)
Data for each partition returned by Conn.ReadPartitions includes the list of replicas and the ISR for the partition. The broker data is copied from the list of currently available brokers, retrieved from Kafka metadata. It can happen that a broker is not present in metadata (due to being down, for example), but still listed as a replica for a partition. (For example, broker 2 may be down but the ID 2 can still be listed as a replica for a partition.) The logic that copies broker data from the list of available brokers into partition data now omits any that are not present in the metadata list. Without this change, partition data receives the copy of a nil object in its replica or ISR list (ID 0, host nil, default port 9092), which is useless.
1 parent df0521c commit 40933d2

File tree

2 files changed

+46
-10
lines changed

2 files changed

+46
-10
lines changed

conn.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -994,14 +994,6 @@ func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err err
994994
}
995995
}
996996

997-
makeBrokers := func(ids ...int32) []Broker {
998-
b := make([]Broker, len(ids))
999-
for i, id := range ids {
1000-
b[i] = brokers[id]
1001-
}
1002-
return b
1003-
}
1004-
1005997
for _, t := range res.Topics {
1006998
if t.TopicErrorCode != 0 && (c.topic == "" || t.TopicName == c.topic) {
1007999
// We only report errors if they happened for the topic of
@@ -1013,8 +1005,8 @@ func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err err
10131005
partitions = append(partitions, Partition{
10141006
Topic: t.TopicName,
10151007
Leader: brokers[p.Leader],
1016-
Replicas: makeBrokers(p.Replicas...),
1017-
Isr: makeBrokers(p.Isr...),
1008+
Replicas: makeBrokers(brokers, p.Replicas...),
1009+
Isr: makeBrokers(brokers, p.Isr...),
10181010
ID: int(p.PartitionID),
10191011
})
10201012
}
@@ -1025,6 +1017,16 @@ func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err err
10251017
return
10261018
}
10271019

1020+
func makeBrokers(brokers map[int32]Broker, ids ...int32) []Broker {
1021+
b := make([]Broker, 0, len(ids))
1022+
for _, id := range ids {
1023+
if br, ok := brokers[id]; ok {
1024+
b = append(b, br)
1025+
}
1026+
}
1027+
return b
1028+
}
1029+
10281030
// Write writes a message to the kafka broker that this connection was
10291031
// established to. The method returns the number of bytes written, or an error
10301032
// if something went wrong.

conn_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1322,3 +1322,37 @@ func TestEmptyToNullableLeavesStringsIntact(t *testing.T) {
13221322
t.Error("Non empty string is not equal to the original string")
13231323
}
13241324
}
1325+
1326+
func TestMakeBrokersAllPresent(t *testing.T) {
1327+
brokers := make(map[int32]Broker)
1328+
brokers[1] = Broker{ID: 1, Host: "203.0.113.101", Port: 9092}
1329+
brokers[2] = Broker{ID: 1, Host: "203.0.113.102", Port: 9092}
1330+
brokers[3] = Broker{ID: 1, Host: "203.0.113.103", Port: 9092}
1331+
1332+
b := makeBrokers(brokers, 1, 2, 3)
1333+
if len(b) != 3 {
1334+
t.Errorf("Expected 3 brokers, got %d", len(b))
1335+
}
1336+
for _, i := range []int32{1, 2, 3} {
1337+
if b[i-1] != brokers[i] {
1338+
t.Errorf("Expected broker %d at index %d, got %d", i, i-1, b[i].ID)
1339+
}
1340+
}
1341+
}
1342+
1343+
func TestMakeBrokersOneMissing(t *testing.T) {
1344+
brokers := make(map[int32]Broker)
1345+
brokers[1] = Broker{ID: 1, Host: "203.0.113.101", Port: 9092}
1346+
brokers[3] = Broker{ID: 1, Host: "203.0.113.103", Port: 9092}
1347+
1348+
b := makeBrokers(brokers, 1, 2, 3)
1349+
if len(b) != 2 {
1350+
t.Errorf("Expected 2 brokers, got %d", len(b))
1351+
}
1352+
if b[0] != brokers[1] {
1353+
t.Errorf("Expected broker 1 at index 0, got %d", b[0].ID)
1354+
}
1355+
if b[1] != brokers[3] {
1356+
t.Errorf("Expected broker 3 at index 1, got %d", b[1].ID)
1357+
}
1358+
}

0 commit comments

Comments
 (0)