Skip to content

Commit c7e5cee

Browse files
committed
CASSGO-6 Accept peers with empty rack
This fixes #1706.
1 parent be35a5b commit c7e5cee

File tree

5 files changed

+348
-32
lines changed

5 files changed

+348
-32
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
6262
- Don't panic in MapExecuteBatchCAS if no `[applied]` column is returned (CASSGO-42)
6363
- Fix deadlock in refresh debouncer stop (CASSGO-41)
6464
- Endless query execution fix (CASSGO-50)
65+
- Accept peers with empty rack (CASSGO-6)
6566

6667
## [1.7.0] - 2024-09-23
6768

cassandra_test.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4013,3 +4013,109 @@ func TestRoutingKeyCacheUsesOverriddenKeyspace(t *testing.T) {
40134013

40144014
session.Query("DROP KEYSPACE IF EXISTS gocql_test_routing_key_cache").Exec()
40154015
}
4016+
4017+
func TestHostInfoFromIter(t *testing.T) {
4018+
session := createSession(t)
4019+
defer session.Close()
4020+
4021+
// order by data_center so we have predictable results
4022+
err := createTable(session, `CREATE TABLE IF NOT EXISTS gocql_test.system_peers(
4023+
peer inet PRIMARY KEY,
4024+
data_center text,
4025+
host_id uuid,
4026+
preferred_ip inet,
4027+
rack text,
4028+
release_version text,
4029+
rpc_address inet,
4030+
schema_version uuid,
4031+
tokens set<text>
4032+
)`)
4033+
if err != nil {
4034+
t.Fatal(err)
4035+
}
4036+
4037+
id1 := MustRandomUUID()
4038+
err = session.Query(
4039+
"INSERT INTO gocql_test.system_peers (peer, data_center, host_id, rack, release_version, rpc_address, tokens) VALUES (?, ?, ?, ?, ?, ?, ?)",
4040+
net.ParseIP("10.0.0.1"),
4041+
"dc1",
4042+
id1,
4043+
"rack1",
4044+
"4.0.0",
4045+
net.ParseIP("10.0.0.2"),
4046+
[]string{"0", "1"},
4047+
).Exec()
4048+
if err != nil {
4049+
t.Fatal(err)
4050+
}
4051+
4052+
id2 := MustRandomUUID()
4053+
err = session.Query(
4054+
"INSERT INTO gocql_test.system_peers (peer, data_center, host_id, release_version, rpc_address, tokens) VALUES (?, ?, ?, ?, ?, ?)",
4055+
net.ParseIP("10.0.0.2"),
4056+
"dc2",
4057+
id2,
4058+
"4.0.0",
4059+
net.ParseIP("10.0.0.3"),
4060+
[]string{"0", "1"},
4061+
).Exec()
4062+
if err != nil {
4063+
t.Fatal(err)
4064+
}
4065+
4066+
iter := session.Query("SELECT * FROM gocql_test.system_peers WHERE data_center='dc1' ALLOW FILTERING").Iter()
4067+
4068+
h, err := session.hostInfoFromIter(iter, nil, 9042)
4069+
if err != nil {
4070+
t.Fatal(err)
4071+
}
4072+
if !isValidPeer(h) {
4073+
t.Errorf("expected %+v to be a valid peer", h)
4074+
}
4075+
if addr := h.ConnectAddressAndPort(); addr != "10.0.0.2:9042" {
4076+
t.Errorf("unexpected connect address: %s != '10.0.0.2:9042'", addr)
4077+
}
4078+
if h.HostID() != id1.String() {
4079+
t.Errorf("unexpected hostID %s != %s", h.HostID(), id1.String())
4080+
}
4081+
if h.Version().String() != "v4.0.0" {
4082+
t.Errorf("unexpected version %s != v4.0.0", h.Version().String())
4083+
}
4084+
if h.Rack() != "rack1" {
4085+
t.Errorf("unexpected rack %s != 'rack1'", h.Rack())
4086+
}
4087+
if h.DataCenter() != "dc1" {
4088+
t.Errorf("unexpected data center %s != 'dc1'", h.DataCenter())
4089+
}
4090+
if h.missingRack {
4091+
t.Errorf("unexpected missing rack")
4092+
}
4093+
4094+
iter = session.Query("SELECT * FROM gocql_test.system_peers WHERE data_center='dc2' ALLOW FILTERING").Iter()
4095+
4096+
h, err = session.hostInfoFromIter(iter, nil, 9042)
4097+
if err != nil {
4098+
t.Fatal(err)
4099+
}
4100+
if isValidPeer(h) {
4101+
t.Errorf("expected %+v to be an invalid peer", h)
4102+
}
4103+
if addr := h.ConnectAddressAndPort(); addr != "10.0.0.3:9042" {
4104+
t.Errorf("unexpected connect address: %s != '10.0.0.3:9042'", addr)
4105+
}
4106+
if h.HostID() != id2.String() {
4107+
t.Errorf("unexpected hostID %s != %s", h.HostID(), id2.String())
4108+
}
4109+
if h.Version().String() != "v4.0.0" {
4110+
t.Errorf("unexpected version %s != v4.0.0", h.Version().String())
4111+
}
4112+
if h.Rack() != "" {
4113+
t.Errorf("unexpected rack %s != ''", h.Rack())
4114+
}
4115+
if h.DataCenter() != "dc2" {
4116+
t.Errorf("unexpected data center %s != 'dc2'", h.DataCenter())
4117+
}
4118+
if !h.missingRack {
4119+
t.Errorf("unexpected non-missing rack")
4120+
}
4121+
}

control.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,11 @@ func (c *controlConn) setupConn(conn *Conn, sessionInit bool) error {
322322
iter := conn.querySystemLocal(context.TODO())
323323
host, err := c.session.hostInfoFromIter(iter, conn.host.connectAddress, conn.r.RemoteAddr().(*net.TCPAddr).Port)
324324
if err != nil {
325-
return err
325+
iter.Close()
326+
return fmt.Errorf("could not retrieve control host info: %w", err)
327+
}
328+
if host == nil {
329+
return errors.New("could not retrieve control host info: query returned 0 rows")
326330
}
327331

328332
var exists bool

host_source.go

Lines changed: 88 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ type HostInfo struct {
171171
port int
172172
dataCenter string
173173
rack string
174+
missingRack bool
174175
hostId string
175176
workload string
176177
graph bool
@@ -413,8 +414,9 @@ func (h *HostInfo) update(from *HostInfo) {
413414
if h.dataCenter == "" {
414415
h.dataCenter = from.dataCenter
415416
}
416-
if h.rack == "" {
417+
if h.missingRack || h.rack == "" {
417418
h.rack = from.rack
419+
h.missingRack = from.missingRack
418420
}
419421
if h.hostId == "" {
420422
h.hostId = from.hostId
@@ -530,7 +532,7 @@ func newHostInfoFromRow(s *Session, defaultAddr net.IP, defaultPort int, row map
530532
const assertErrorMsg = "Assertion failed for %s, type was %T"
531533
var ok bool
532534

533-
host := &HostInfo{connectAddress: defaultAddr, port: defaultPort}
535+
host := &HostInfo{connectAddress: defaultAddr, port: defaultPort, missingRack: true}
534536

535537
// Process all fields from the row
536538
for key, value := range row {
@@ -541,14 +543,30 @@ func newHostInfoFromRow(s *Session, defaultAddr net.IP, defaultPort int, row map
541543
return nil, fmt.Errorf(assertErrorMsg, "data_center", value)
542544
}
543545
case "rack":
544-
host.rack, ok = value.(string)
546+
rack, ok := value.(*string)
545547
if !ok {
546-
return nil, fmt.Errorf(assertErrorMsg, "rack", value)
548+
if rack, ok := value.(string); !ok {
549+
return nil, fmt.Errorf(assertErrorMsg, "rack", value)
550+
} else {
551+
host.rack = rack
552+
host.missingRack = false
553+
}
554+
} else if rack != nil {
555+
host.rack = *rack
556+
host.missingRack = false
547557
}
548558
case "host_id":
549559
hostId, ok := value.(UUID)
550560
if !ok {
551-
return nil, fmt.Errorf(assertErrorMsg, "host_id", value)
561+
if str, ok := value.(string); ok {
562+
var err error
563+
hostId, err = ParseUUID(str)
564+
if err != nil {
565+
return nil, fmt.Errorf("failed to parse host_id: %w", err)
566+
}
567+
} else {
568+
return nil, fmt.Errorf(assertErrorMsg, "host_id", value)
569+
}
552570
}
553571
host.hostId = hostId.String()
554572
case "release_version":
@@ -560,7 +578,11 @@ func newHostInfoFromRow(s *Session, defaultAddr net.IP, defaultPort int, row map
560578
case "peer":
561579
ip, ok := value.(net.IP)
562580
if !ok {
563-
return nil, fmt.Errorf(assertErrorMsg, "peer", value)
581+
if str, ok := value.(string); ok {
582+
ip = net.ParseIP(str)
583+
} else {
584+
return nil, fmt.Errorf(assertErrorMsg, "peer", value)
585+
}
564586
}
565587
host.peer = ip
566588
case "cluster_name":
@@ -576,31 +598,51 @@ func newHostInfoFromRow(s *Session, defaultAddr net.IP, defaultPort int, row map
576598
case "broadcast_address":
577599
ip, ok := value.(net.IP)
578600
if !ok {
579-
return nil, fmt.Errorf(assertErrorMsg, "broadcast_address", value)
601+
if str, ok := value.(string); ok {
602+
ip = net.ParseIP(str)
603+
} else {
604+
return nil, fmt.Errorf(assertErrorMsg, "broadcast_address", value)
605+
}
580606
}
581607
host.broadcastAddress = ip
582608
case "preferred_ip":
583609
ip, ok := value.(net.IP)
584610
if !ok {
585-
return nil, fmt.Errorf(assertErrorMsg, "preferred_ip", value)
611+
if str, ok := value.(string); ok {
612+
ip = net.ParseIP(str)
613+
} else {
614+
return nil, fmt.Errorf(assertErrorMsg, "preferred_ip", value)
615+
}
586616
}
587617
host.preferredIP = ip
588618
case "rpc_address":
589619
ip, ok := value.(net.IP)
590620
if !ok {
591-
return nil, fmt.Errorf(assertErrorMsg, "rpc_address", value)
621+
if str, ok := value.(string); ok {
622+
ip = net.ParseIP(str)
623+
} else {
624+
return nil, fmt.Errorf(assertErrorMsg, "rpc_address", value)
625+
}
592626
}
593627
host.rpcAddress = ip
594628
case "native_address":
595629
ip, ok := value.(net.IP)
596630
if !ok {
597-
return nil, fmt.Errorf(assertErrorMsg, "native_address", value)
631+
if str, ok := value.(string); ok {
632+
ip = net.ParseIP(str)
633+
} else {
634+
return nil, fmt.Errorf(assertErrorMsg, "native_address", value)
635+
}
598636
}
599637
host.rpcAddress = ip
600638
case "listen_address":
601639
ip, ok := value.(net.IP)
602640
if !ok {
603-
return nil, fmt.Errorf(assertErrorMsg, "listen_address", value)
641+
if str, ok := value.(string); ok {
642+
ip = net.ParseIP(str)
643+
} else {
644+
return nil, fmt.Errorf(assertErrorMsg, "listen_address", value)
645+
}
604646
}
605647
host.listenAddress = ip
606648
case "native_port":
@@ -666,18 +708,23 @@ func newHostInfoFromRow(s *Session, defaultAddr net.IP, defaultPort int, row map
666708
}
667709
}
668710

711+
// this will return nil, nil if there were no rows left in the Iter
669712
func (s *Session) hostInfoFromIter(iter *Iter, connectAddress net.IP, defaultPort int) (*HostInfo, error) {
670-
rows, err := iter.SliceMap()
671-
if err != nil {
672-
// TODO(zariel): make typed error
673-
return nil, err
713+
// TODO: switch this to a new iterator method once CASSGO-36 is solved
714+
m := map[string]interface{}{
715+
// we set rack to a double pointer so we can know if it's NULL or not since
716+
// we need to be able to filter out NULL rack hosts but not empty string hosts
717+
// see CASSGO-6
718+
"rack": new(*string),
674719
}
675-
676-
if len(rows) == 0 {
677-
return nil, errors.New("query returned 0 rows")
720+
if !iter.MapScan(m) {
721+
if err := iter.Close(); err != nil {
722+
return nil, err
723+
}
724+
return nil, nil
678725
}
679726

680-
host, err := s.newHostInfoFromMap(connectAddress, defaultPort, rows[0])
727+
host, err := s.newHostInfoFromMap(connectAddress, defaultPort, m)
681728
if err != nil {
682729
return nil, err
683730
}
@@ -700,8 +747,12 @@ func (r *ringDescriber) getLocalHostInfo() (*HostInfo, error) {
700747

701748
host, err := r.session.hostInfoFromIter(iter, nil, r.session.cfg.Port)
702749
if err != nil {
750+
iter.Close()
703751
return nil, fmt.Errorf("could not retrieve local host info: %w", err)
704752
}
753+
if host == nil {
754+
return nil, errors.New("could not retrieve local host info: query returned 0 rows")
755+
}
705756
return host, nil
706757
}
707758

@@ -711,7 +762,6 @@ func (r *ringDescriber) getClusterPeerInfo(localHost *HostInfo) ([]*HostInfo, er
711762
return nil, errNoControl
712763
}
713764

714-
var peers []*HostInfo
715765
iter := r.session.control.withConnHost(func(ch *connHost) *Iter {
716766
return ch.conn.querySystemPeers(context.TODO(), localHost.version)
717767
})
@@ -720,18 +770,25 @@ func (r *ringDescriber) getClusterPeerInfo(localHost *HostInfo) ([]*HostInfo, er
720770
return nil, errNoControl
721771
}
722772

723-
rows, err := iter.SliceMap()
724-
if err != nil {
725-
// TODO(zariel): make typed error
726-
return nil, fmt.Errorf("unable to fetch peer host info: %s", err)
727-
}
728-
729-
for _, row := range rows {
773+
var peers []*HostInfo
774+
for {
730775
// extract all available info about the peer
731-
host, err := r.session.newHostInfoFromMap(nil, r.session.cfg.Port, row)
776+
host, err := r.session.hostInfoFromIter(iter, nil, r.session.cfg.Port)
732777
if err != nil {
733-
return nil, err
734-
} else if !isValidPeer(host) {
778+
// if the error came from the iterator then return it, otherwise ignore
779+
// and warn
780+
if iterErr := iter.Close(); iterErr != nil {
781+
return nil, fmt.Errorf("unable to fetch peer host info: %s", iterErr)
782+
}
783+
// skip over peers that we couldn't parse
784+
r.session.logger.Warning("Failed to parse peer this host will be ignored.", newLogFieldError("err", err))
785+
continue
786+
}
787+
// if nil then none left
788+
if host == nil {
789+
break
790+
}
791+
if !isValidPeer(host) {
735792
// If it's not a valid peer
736793
r.session.logger.Warning("Found invalid peer "+
737794
"likely due to a gossip or snitch issue, this host will be ignored.", newLogFieldStringer("host", host))
@@ -749,7 +806,7 @@ func isValidPeer(host *HostInfo) bool {
749806
return !(len(host.RPCAddress()) == 0 ||
750807
host.hostId == "" ||
751808
host.dataCenter == "" ||
752-
host.rack == "" ||
809+
host.missingRack ||
753810
len(host.tokens) == 0)
754811
}
755812

0 commit comments

Comments
 (0)