Skip to content

Commit acf8e62

Browse files
committed
CASSGO-6 Accept peers with empty rack
This fixes #1706.
1 parent be35a5b commit acf8e62

File tree

3 files changed

+230
-21
lines changed

3 files changed

+230
-21
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
6262
- Don't panic in MapExecuteBatchCAS if no `[applied]` column is returned (CASSGO-42)
6363
- Fix deadlock in refresh debouncer stop (CASSGO-41)
6464
- Endless query execution fix (CASSGO-50)
65+
- Accept peers with empty rack (CASSGO-6)
6566

6667
## [1.7.0] - 2024-09-23
6768

host_source.go

Lines changed: 81 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ type HostInfo struct {
171171
port int
172172
dataCenter string
173173
rack string
174+
missingRack bool
174175
hostId string
175176
workload string
176177
graph bool
@@ -413,8 +414,9 @@ func (h *HostInfo) update(from *HostInfo) {
413414
if h.dataCenter == "" {
414415
h.dataCenter = from.dataCenter
415416
}
416-
if h.rack == "" {
417+
if h.missingRack || h.rack == "" {
417418
h.rack = from.rack
419+
h.missingRack = from.missingRack
418420
}
419421
if h.hostId == "" {
420422
h.hostId = from.hostId
@@ -530,7 +532,7 @@ func newHostInfoFromRow(s *Session, defaultAddr net.IP, defaultPort int, row map
530532
const assertErrorMsg = "Assertion failed for %s, type was %T"
531533
var ok bool
532534

533-
host := &HostInfo{connectAddress: defaultAddr, port: defaultPort}
535+
host := &HostInfo{connectAddress: defaultAddr, port: defaultPort, missingRack: true}
534536

535537
// Process all fields from the row
536538
for key, value := range row {
@@ -541,14 +543,30 @@ func newHostInfoFromRow(s *Session, defaultAddr net.IP, defaultPort int, row map
541543
return nil, fmt.Errorf(assertErrorMsg, "data_center", value)
542544
}
543545
case "rack":
544-
host.rack, ok = value.(string)
546+
rack, ok := value.(*string)
545547
if !ok {
546-
return nil, fmt.Errorf(assertErrorMsg, "rack", value)
548+
if rack, ok := value.(string); !ok {
549+
return nil, fmt.Errorf(assertErrorMsg, "rack", value)
550+
} else {
551+
host.rack = rack
552+
host.missingRack = false
553+
}
554+
} else if rack != nil {
555+
host.rack = *rack
556+
host.missingRack = false
547557
}
548558
case "host_id":
549559
hostId, ok := value.(UUID)
550560
if !ok {
551-
return nil, fmt.Errorf(assertErrorMsg, "host_id", value)
561+
if str, ok := value.(string); ok {
562+
var err error
563+
hostId, err = ParseUUID(str)
564+
if err != nil {
565+
return nil, fmt.Errorf("failed to parse host_id: %w", err)
566+
}
567+
} else {
568+
return nil, fmt.Errorf(assertErrorMsg, "host_id", value)
569+
}
552570
}
553571
host.hostId = hostId.String()
554572
case "release_version":
@@ -560,7 +578,11 @@ func newHostInfoFromRow(s *Session, defaultAddr net.IP, defaultPort int, row map
560578
case "peer":
561579
ip, ok := value.(net.IP)
562580
if !ok {
563-
return nil, fmt.Errorf(assertErrorMsg, "peer", value)
581+
if str, ok := value.(string); ok {
582+
ip = net.ParseIP(str)
583+
} else {
584+
return nil, fmt.Errorf(assertErrorMsg, "peer", value)
585+
}
564586
}
565587
host.peer = ip
566588
case "cluster_name":
@@ -576,31 +598,51 @@ func newHostInfoFromRow(s *Session, defaultAddr net.IP, defaultPort int, row map
576598
case "broadcast_address":
577599
ip, ok := value.(net.IP)
578600
if !ok {
579-
return nil, fmt.Errorf(assertErrorMsg, "broadcast_address", value)
601+
if str, ok := value.(string); ok {
602+
ip = net.ParseIP(str)
603+
} else {
604+
return nil, fmt.Errorf(assertErrorMsg, "broadcast_address", value)
605+
}
580606
}
581607
host.broadcastAddress = ip
582608
case "preferred_ip":
583609
ip, ok := value.(net.IP)
584610
if !ok {
585-
return nil, fmt.Errorf(assertErrorMsg, "preferred_ip", value)
611+
if str, ok := value.(string); ok {
612+
ip = net.ParseIP(str)
613+
} else {
614+
return nil, fmt.Errorf(assertErrorMsg, "preferred_ip", value)
615+
}
586616
}
587617
host.preferredIP = ip
588618
case "rpc_address":
589619
ip, ok := value.(net.IP)
590620
if !ok {
591-
return nil, fmt.Errorf(assertErrorMsg, "rpc_address", value)
621+
if str, ok := value.(string); ok {
622+
ip = net.ParseIP(str)
623+
} else {
624+
return nil, fmt.Errorf(assertErrorMsg, "rpc_address", value)
625+
}
592626
}
593627
host.rpcAddress = ip
594628
case "native_address":
595629
ip, ok := value.(net.IP)
596630
if !ok {
597-
return nil, fmt.Errorf(assertErrorMsg, "native_address", value)
631+
if str, ok := value.(string); ok {
632+
ip = net.ParseIP(str)
633+
} else {
634+
return nil, fmt.Errorf(assertErrorMsg, "native_address", value)
635+
}
598636
}
599637
host.rpcAddress = ip
600638
case "listen_address":
601639
ip, ok := value.(net.IP)
602640
if !ok {
603-
return nil, fmt.Errorf(assertErrorMsg, "listen_address", value)
641+
if str, ok := value.(string); ok {
642+
ip = net.ParseIP(str)
643+
} else {
644+
return nil, fmt.Errorf(assertErrorMsg, "listen_address", value)
645+
}
604646
}
605647
host.listenAddress = ip
606648
case "native_port":
@@ -667,17 +709,22 @@ func newHostInfoFromRow(s *Session, defaultAddr net.IP, defaultPort int, row map
667709
}
668710

669711
func (s *Session) hostInfoFromIter(iter *Iter, connectAddress net.IP, defaultPort int) (*HostInfo, error) {
670-
rows, err := iter.SliceMap()
671-
if err != nil {
672-
// TODO(zariel): make typed error
673-
return nil, err
712+
// TODO: switch this to a new iterator method once CASSGO-36 is solved
713+
m := map[string]interface{}{
714+
// we set rack to a pointer so we can know if it's NULL or not since we
715+
// need to be able to filter out NULL rack hosts but not empty string hosts
716+
// see CASSGO-6
717+
"rack": new(string),
674718
}
675-
676-
if len(rows) == 0 {
719+
if !iter.MapScan(m) {
677720
return nil, errors.New("query returned 0 rows")
678721
}
722+
if err := iter.Close(); err != nil {
723+
// TODO(zariel): make typed error
724+
return nil, err
725+
}
679726

680-
host, err := s.newHostInfoFromMap(connectAddress, defaultPort, rows[0])
727+
host, err := s.newHostInfoFromMap(connectAddress, defaultPort, m)
681728
if err != nil {
682729
return nil, err
683730
}
@@ -720,8 +767,21 @@ func (r *ringDescriber) getClusterPeerInfo(localHost *HostInfo) ([]*HostInfo, er
720767
return nil, errNoControl
721768
}
722769

723-
rows, err := iter.SliceMap()
724-
if err != nil {
770+
// TODO: switch this to a new iterator method once CASSGO-36 is solved
771+
var rows []map[string]interface{}
772+
for {
773+
m := map[string]interface{}{
774+
// we set rack to a pointer so we can know if it's NULL or not since we
775+
// need to be able to filter out NULL rack hosts but not empty string hosts
776+
// see CASSGO-6
777+
"rack": new(string),
778+
}
779+
if !iter.MapScan(m) {
780+
break
781+
}
782+
rows = append(rows, m)
783+
}
784+
if err := iter.Close(); err != nil {
725785
// TODO(zariel): make typed error
726786
return nil, fmt.Errorf("unable to fetch peer host info: %s", err)
727787
}
@@ -749,7 +809,7 @@ func isValidPeer(host *HostInfo) bool {
749809
return !(len(host.RPCAddress()) == 0 ||
750810
host.hostId == "" ||
751811
host.dataCenter == "" ||
752-
host.rack == "" ||
812+
host.missingRack ||
753813
len(host.tokens) == 0)
754814
}
755815

host_source_test.go

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,153 @@ func TestCassVersionBefore(t *testing.T) {
8585

8686
}
8787

88+
func TestNewHostInfoFromRow(t *testing.T) {
89+
id := MustRandomUUID()
90+
row := map[string]interface{}{
91+
"broadcast_address": "10.0.0.1",
92+
"listen_address": net.ParseIP("10.0.0.2"),
93+
"rpc_address": net.ParseIP("10.0.0.3"),
94+
"data_center": "dc",
95+
"rack": "",
96+
"host_id": id,
97+
"release_version": "4.0.0",
98+
"native_port": 9042,
99+
"tokens": []string{"0", "1"},
100+
}
101+
s := &Session{}
102+
h, err := newHostInfoFromRow(s, nil, 0, row)
103+
if err != nil {
104+
t.Fatal(err)
105+
}
106+
if !isValidPeer(h) {
107+
t.Errorf("expected %+v to be a valid peer", h)
108+
}
109+
if addr := h.ConnectAddressAndPort(); addr != "10.0.0.3:9042" {
110+
t.Errorf("unexpected connect address: %s != '10.0.0.3:9042'", addr)
111+
}
112+
if h.HostID() != id.String() {
113+
t.Errorf("unexpected hostID %s != %s", h.HostID(), id.String())
114+
}
115+
if h.Version().String() != "v4.0.0" {
116+
t.Errorf("unexpected version %s != v4.0.0", h.Version().String())
117+
}
118+
if h.Rack() != "" {
119+
t.Errorf("unexpected rack %s != ''", h.Rack())
120+
}
121+
if h.DataCenter() != "dc" {
122+
t.Errorf("unexpected data center %s != 'dc'", h.DataCenter())
123+
}
124+
125+
row = map[string]interface{}{
126+
"broadcast_address": "10.0.0.1",
127+
"listen_address": net.ParseIP("10.0.0.2"),
128+
"preferred_ip": "10.0.0.4",
129+
"data_center": "dc",
130+
"rack": "rack",
131+
"host_id": id,
132+
"release_version": "4.0.0",
133+
"native_port": 9042,
134+
"tokens": []string{"0", "1"},
135+
}
136+
h, err = newHostInfoFromRow(s, nil, 0, row)
137+
if err != nil {
138+
t.Fatal(err)
139+
}
140+
// missing rpc_address
141+
if isValidPeer(h) {
142+
t.Errorf("expected %+v to be an invalid peer", h)
143+
}
144+
if addr := h.ConnectAddressAndPort(); addr != "10.0.0.4:9042" {
145+
t.Errorf("unexpected connect address: %s != '10.0.0.4:9042'", addr)
146+
}
147+
if h.Rack() != "rack" {
148+
t.Errorf("unexpected rack %s != 'rack'", h.Rack())
149+
}
150+
151+
row = map[string]interface{}{
152+
"broadcast_address": "10.0.0.1",
153+
"data_center": "dc",
154+
"rack": "rack",
155+
"host_id": id,
156+
"native_port": 9042,
157+
"tokens": []string{"0", "1"},
158+
}
159+
h, err = newHostInfoFromRow(s, nil, 0, row)
160+
if err != nil {
161+
t.Fatal(err)
162+
}
163+
// missing rpc_address
164+
if isValidPeer(h) {
165+
t.Errorf("expected %+v to be an invalid peer", h)
166+
}
167+
if addr := h.ConnectAddressAndPort(); addr != "10.0.0.1:9042" {
168+
t.Errorf("unexpected connect address: %s != '10.0.0.1:9042'", addr)
169+
}
170+
171+
row = map[string]interface{}{
172+
"rpc_address": "10.0.0.2",
173+
"data_center": "dc",
174+
"rack": "rack",
175+
"host_id": id,
176+
"tokens": []string{"0", "1"},
177+
}
178+
s = &Session{
179+
cfg: ClusterConfig{
180+
AddressTranslator: AddressTranslatorFunc(func(addr net.IP, port int) (net.IP, int) {
181+
if !addr.Equal(net.ParseIP("10.0.0.2")) {
182+
t.Errorf("unexpected ip sent to translator: %s != '10.0.0.2'", addr.String())
183+
}
184+
if port != 9042 {
185+
t.Errorf("unexpected port sent to translator: %d != 9042", port)
186+
}
187+
return net.ParseIP("10.0.0.5"), 9043
188+
}),
189+
},
190+
logger: &defaultLogger{},
191+
}
192+
h, err = newHostInfoFromRow(s, nil, 9042, row)
193+
if err != nil {
194+
t.Fatal(err)
195+
}
196+
if !isValidPeer(h) {
197+
t.Errorf("expected %+v to be a valid peer", h)
198+
}
199+
if addr := h.ConnectAddressAndPort(); addr != "10.0.0.5:9043" {
200+
t.Errorf("unexpected connect address: %s != '10.0.0.5:9043'", addr)
201+
}
202+
203+
// missing rack
204+
row = map[string]interface{}{
205+
"rpc_address": "10.0.0.2",
206+
"data_center": "dc",
207+
"host_id": id,
208+
"tokens": []string{"0", "1"},
209+
}
210+
h, err = newHostInfoFromRow(nil, nil, 9042, row)
211+
if err != nil {
212+
t.Fatal(err)
213+
}
214+
if isValidPeer(h) {
215+
t.Errorf("expected %+v to be an invalid peer", h)
216+
}
217+
if h.Rack() != "" {
218+
t.Errorf("unexpected rack %s != ''", h.Rack())
219+
}
220+
221+
// inavlid ip
222+
row = map[string]interface{}{
223+
"rpc_address": net.ParseIP("0.0.0.0"),
224+
"data_center": "dc",
225+
"rack": "rack",
226+
"host_id": id,
227+
"tokens": []string{"0", "1"},
228+
}
229+
_, err = newHostInfoFromRow(nil, nil, 9042, row)
230+
if err == nil {
231+
t.Error("expected invalid ip to error")
232+
}
233+
}
234+
88235
func TestIsValidPeer(t *testing.T) {
89236
host := &HostInfo{
90237
rpcAddress: net.ParseIP("0.0.0.0"),
@@ -99,6 +246,7 @@ func TestIsValidPeer(t *testing.T) {
99246
}
100247

101248
host.rack = ""
249+
host.missingRack = true
102250
if isValidPeer(host) {
103251
t.Errorf("expected %+v to NOT be a valid peer", host)
104252
}

0 commit comments

Comments
 (0)