Skip to content

Commit b4f1ba6

Browse files
milindlkkoehlerkkoehler
authored
Extend MockCluster to set a broker down and up again (confluentinc#998)
--------- Co-authored-by: kkoehler <[email protected]> Co-authored-by: kkoehler <[email protected]>
1 parent eb9add9 commit b4f1ba6

File tree

5 files changed

+159
-1
lines changed

5 files changed

+159
-1
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
This is a feature release.
66

7+
* MockCluster can now be shutdown and started again to test broker
8+
availability problems (#998, @kkoehler).
79
* Fixes a bug in the mock schema registry client where the wrong ID was being
810
returned for pre-registered schema (#971, @srlk).
911
* Adds `CreateTopic` method to the MockCluster. (#1047, @mimikwang).

examples/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ legacy/consumer_channel_example/consumer_channel_example
2929
legacy/producer_channel_example/producer_channel_example
3030
library-version/library-version
3131
mockcluster_example/mockcluster_example
32+
mockcluster_example/mockcluster_failure_example
3233
oauthbearer_consumer_example/oauthbearer_consumer_example
3334
oauthbearer_oidc_example/oauthbearer_oidc_example
3435
oauthbearer_producer_example/oauthbearer_producer_example

examples/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ Examples
5757
[library-version](library-version) - Show the library version
5858

5959
[mockcluster_example](mockcluster_example) - Use a mock cluster for testing
60+
61+
[mockcluster_failure_example](mockcluster_failure_example) - Use a mock cluster for failure testing
6062

6163
[oauthbearer_consumer_example](oauthbearer_consumer_example) - Unsecured SASL/OAUTHBEARER consumer example
6264

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/**
2+
* Copyright 2023 Confluent Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
// Demonstrates failure modes for mock cluster:
18+
// 1. RTT Duration set to more than Delivery Timeout
19+
// 2. Broker set as being down.
20+
package main
21+
22+
import (
23+
"fmt"
24+
"os"
25+
"time"
26+
27+
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
28+
)
29+
30+
func main() {
31+
32+
mockCluster, err := kafka.NewMockCluster(1)
33+
if err != nil {
34+
fmt.Fprintf(os.Stderr, "Failed to create MockCluster: %s\n", err)
35+
os.Exit(1)
36+
}
37+
defer mockCluster.Close()
38+
39+
// Set RTT > Delivery Timeout
40+
err = mockCluster.SetRoundtripDuration(1, 4*time.Second)
41+
if err != nil {
42+
fmt.Fprintf(os.Stderr, "Could not configure roundtrip duration: %v", err)
43+
return
44+
}
45+
broker := mockCluster.BootstrapServers()
46+
47+
p, err := kafka.NewProducer(&kafka.ConfigMap{
48+
"bootstrap.servers": broker,
49+
"delivery.timeout.ms": 1000,
50+
})
51+
52+
if err != nil {
53+
fmt.Fprintf(os.Stderr, "Failed to create producer: %s\n", err)
54+
os.Exit(1)
55+
}
56+
57+
fmt.Printf("Created Producer %v\n", p)
58+
59+
m, err := sendTestMsg(p)
60+
61+
if m.TopicPartition.Error != nil {
62+
fmt.Printf("EXPECTED: Delivery failed: %v\n", m.TopicPartition.Error)
63+
} else {
64+
fmt.Fprintf(os.Stderr, "Message should timeout because of broker configuration")
65+
return
66+
}
67+
68+
fmt.Println("'reset' broker roundtrip duration")
69+
err = mockCluster.SetRoundtripDuration(1, 10*time.Millisecond)
70+
if err != nil {
71+
fmt.Fprintf(os.Stderr, "Could not configure roundtrip duration: %v", err)
72+
return
73+
}
74+
75+
// See what happens when broker is down.
76+
fmt.Println("Set broker down")
77+
err = mockCluster.SetBrokerDown(1)
78+
if err != nil {
79+
fmt.Fprintf(os.Stderr, "Broker should now be down but got error: %v", err)
80+
return
81+
}
82+
83+
m, err = sendTestMsg(p)
84+
85+
if m.TopicPartition.Error != nil {
86+
fmt.Printf("EXPECTED: Delivery failed: %v\n", m.TopicPartition.Error)
87+
} else {
88+
fmt.Fprintf(os.Stderr, "Message should timeout because of broker configuration")
89+
return
90+
}
91+
92+
// Bring the broker up again.
93+
fmt.Println("Set broker up again")
94+
err = mockCluster.SetBrokerUp(1)
95+
if err != nil {
96+
fmt.Fprintf(os.Stderr, "Broker should now be up again but got error: %v", err)
97+
return
98+
}
99+
100+
m, err = sendTestMsg(p)
101+
if err != nil {
102+
fmt.Fprintf(os.Stderr, "There shouldn't be an error but got: %v", err)
103+
return
104+
}
105+
106+
fmt.Println("Message was sent!")
107+
108+
}
109+
110+
func sendTestMsg(p *kafka.Producer) (*kafka.Message, error) {
111+
112+
topic := "Test"
113+
value := "Hello Go!"
114+
115+
deliveryChan := make(chan kafka.Event)
116+
defer close(deliveryChan)
117+
118+
err := p.Produce(&kafka.Message{
119+
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
120+
Value: []byte(value),
121+
Headers: []kafka.Header{{Key: "myTestHeader", Value: []byte("header values are binary")}},
122+
}, deliveryChan)
123+
124+
if err != nil {
125+
return nil, err
126+
}
127+
128+
e := <-deliveryChan
129+
return e.(*kafka.Message), nil
130+
}

kafka/mockcluster.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Copyright 2022 Confluent Inc.
2+
* Copyright 2023 Confluent Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -83,6 +83,7 @@ func (mc *MockCluster) BootstrapServers() string {
8383
}
8484

8585
// SetRoundtripDuration sets the broker round-trip-time delay for the given broker.
86+
// Use brokerID -1 for all brokers, or >= 0 for a specific broker.
8687
func (mc *MockCluster) SetRoundtripDuration(brokerID int, duration time.Duration) error {
8788
durationInMillis := C.int(duration.Milliseconds())
8889
cError := C.rd_kafka_mock_broker_set_rtt(mc.mcluster, C.int(brokerID), durationInMillis)
@@ -92,6 +93,28 @@ func (mc *MockCluster) SetRoundtripDuration(brokerID int, duration time.Duration
9293
return nil
9394
}
9495

96+
// SetBrokerDown disconnects the broker and disallows any new connections.
97+
// This does NOT trigger leader change.
98+
// Use brokerID -1 for all brokers, or >= 0 for a specific broker.
99+
func (mc *MockCluster) SetBrokerDown(brokerID int) error {
100+
cError := C.rd_kafka_mock_broker_set_down(mc.mcluster, C.int(brokerID))
101+
if cError != C.RD_KAFKA_RESP_ERR_NO_ERROR {
102+
return newError(cError)
103+
}
104+
return nil
105+
}
106+
107+
// SetBrokerUp makes the broker accept connections again.
108+
// This does NOT trigger leader change.
109+
// Use brokerID -1 for all brokers, or >= 0 for a specific broker.
110+
func (mc *MockCluster) SetBrokerUp(brokerID int) error {
111+
cError := C.rd_kafka_mock_broker_set_up(mc.mcluster, C.int(brokerID))
112+
if cError != C.RD_KAFKA_RESP_ERR_NO_ERROR {
113+
return newError(cError)
114+
}
115+
return nil
116+
}
117+
95118
// CreateTopic creates a topic without having to use a producer
96119
func (mc *MockCluster) CreateTopic(topic string, partitions, replicationFactor int) error {
97120
topicStr := C.CString(topic)

0 commit comments

Comments
 (0)