Skip to content

CASSGO-6 Accept peers with empty rack #1895

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Don't panic in MapExecuteBatchCAS if no `[applied]` column is returned (CASSGO-42)
- Fix deadlock in refresh debouncer stop (CASSGO-41)
- Endless query execution fix (CASSGO-50)
- Accept peers with empty rack (CASSGO-6)

## [1.7.0] - 2024-09-23

Expand Down
105 changes: 105 additions & 0 deletions cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4013,3 +4013,108 @@ func TestRoutingKeyCacheUsesOverriddenKeyspace(t *testing.T) {

session.Query("DROP KEYSPACE IF EXISTS gocql_test_routing_key_cache").Exec()
}

func TestHostInfoFromIter(t *testing.T) {
session := createSession(t)
defer session.Close()

err := createTable(session, `CREATE TABLE IF NOT EXISTS gocql_test.system_peers(
peer inet PRIMARY KEY,
data_center text,
host_id uuid,
preferred_ip inet,
rack text,
release_version text,
rpc_address inet,
schema_version uuid,
tokens set<text>
)`)
if err != nil {
t.Fatal(err)
}

id1 := MustRandomUUID()
err = session.Query(
"INSERT INTO gocql_test.system_peers (peer, data_center, host_id, rack, release_version, rpc_address, tokens) VALUES (?, ?, ?, ?, ?, ?, ?)",
net.ParseIP("10.0.0.1"),
"dc1",
id1,
"rack1",
"4.0.0",
net.ParseIP("10.0.0.2"),
[]string{"0", "1"},
).Exec()
if err != nil {
t.Fatal(err)
}

id2 := MustRandomUUID()
err = session.Query(
"INSERT INTO gocql_test.system_peers (peer, data_center, host_id, release_version, rpc_address, tokens) VALUES (?, ?, ?, ?, ?, ?)",
net.ParseIP("10.0.0.2"),
"dc2",
id2,
"4.0.0",
net.ParseIP("10.0.0.3"),
[]string{"0", "1"},
).Exec()
if err != nil {
t.Fatal(err)
}

iter := session.Query("SELECT * FROM gocql_test.system_peers WHERE data_center='dc1' ALLOW FILTERING").Iter()

h, err := session.hostInfoFromIter(iter, nil, 9042)
if err != nil {
t.Fatal(err)
}
if !isValidPeer(h) {
t.Errorf("expected %+v to be a valid peer", h)
}
if addr := h.ConnectAddressAndPort(); addr != "10.0.0.2:9042" {
t.Errorf("unexpected connect address: %s != '10.0.0.2:9042'", addr)
}
if h.HostID() != id1.String() {
t.Errorf("unexpected hostID %s != %s", h.HostID(), id1.String())
}
if h.Version().String() != "v4.0.0" {
t.Errorf("unexpected version %s != v4.0.0", h.Version().String())
}
if h.Rack() != "rack1" {
t.Errorf("unexpected rack %s != 'rack1'", h.Rack())
}
if h.DataCenter() != "dc1" {
t.Errorf("unexpected data center %s != 'dc1'", h.DataCenter())
}
if h.missingRack {
t.Errorf("unexpected missing rack")
}

iter = session.Query("SELECT * FROM gocql_test.system_peers WHERE data_center='dc2' ALLOW FILTERING").Iter()

h, err = session.hostInfoFromIter(iter, nil, 9042)
if err != nil {
t.Fatal(err)
}
if isValidPeer(h) {
t.Errorf("expected %+v to be an invalid peer", h)
}
if addr := h.ConnectAddressAndPort(); addr != "10.0.0.3:9042" {
t.Errorf("unexpected connect address: %s != '10.0.0.3:9042'", addr)
}
if h.HostID() != id2.String() {
t.Errorf("unexpected hostID %s != %s", h.HostID(), id2.String())
}
if h.Version().String() != "v4.0.0" {
t.Errorf("unexpected version %s != v4.0.0", h.Version().String())
}
if h.Rack() != "" {
t.Errorf("unexpected rack %s != ''", h.Rack())
}
if h.DataCenter() != "dc2" {
t.Errorf("unexpected data center %s != 'dc2'", h.DataCenter())
}
if !h.missingRack {
t.Errorf("unexpected non-missing rack")
}
}
7 changes: 6 additions & 1 deletion control.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,12 @@ func (c *controlConn) setupConn(conn *Conn, sessionInit bool) error {
iter := conn.querySystemLocal(context.TODO())
host, err := c.session.hostInfoFromIter(iter, conn.host.connectAddress, conn.r.RemoteAddr().(*net.TCPAddr).Port)
if err != nil {
return err
// just cleanup
iter.Close()
return fmt.Errorf("could not retrieve control host info: %w", err)
}
if host == nil {
return errors.New("could not retrieve control host info: query returned 0 rows")
}

var exists bool
Expand Down
120 changes: 89 additions & 31 deletions host_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ type HostInfo struct {
port int
dataCenter string
rack string
missingRack bool
hostId string
workload string
graph bool
Expand Down Expand Up @@ -413,8 +414,9 @@ func (h *HostInfo) update(from *HostInfo) {
if h.dataCenter == "" {
h.dataCenter = from.dataCenter
}
if h.rack == "" {
if h.missingRack {
h.rack = from.rack
h.missingRack = from.missingRack
}
if h.hostId == "" {
h.hostId = from.hostId
Expand Down Expand Up @@ -530,7 +532,7 @@ func newHostInfoFromRow(s *Session, defaultAddr net.IP, defaultPort int, row map
const assertErrorMsg = "Assertion failed for %s, type was %T"
var ok bool

host := &HostInfo{connectAddress: defaultAddr, port: defaultPort}
host := &HostInfo{connectAddress: defaultAddr, port: defaultPort, missingRack: true}

// Process all fields from the row
for key, value := range row {
Expand All @@ -541,14 +543,30 @@ func newHostInfoFromRow(s *Session, defaultAddr net.IP, defaultPort int, row map
return nil, fmt.Errorf(assertErrorMsg, "data_center", value)
}
case "rack":
host.rack, ok = value.(string)
rack, ok := value.(*string)
if !ok {
return nil, fmt.Errorf(assertErrorMsg, "rack", value)
if rack, ok := value.(string); !ok {
return nil, fmt.Errorf(assertErrorMsg, "rack", value)
} else {
host.rack = rack
host.missingRack = false
}
} else if rack != nil {
host.rack = *rack
host.missingRack = false
}
case "host_id":
hostId, ok := value.(UUID)
if !ok {
return nil, fmt.Errorf(assertErrorMsg, "host_id", value)
if str, ok := value.(string); ok {
var err error
hostId, err = ParseUUID(str)
if err != nil {
return nil, fmt.Errorf("failed to parse host_id: %w", err)
}
} else {
return nil, fmt.Errorf(assertErrorMsg, "host_id", value)
}
}
host.hostId = hostId.String()
case "release_version":
Expand All @@ -560,7 +578,11 @@ func newHostInfoFromRow(s *Session, defaultAddr net.IP, defaultPort int, row map
case "peer":
ip, ok := value.(net.IP)
if !ok {
return nil, fmt.Errorf(assertErrorMsg, "peer", value)
if str, ok := value.(string); ok {
ip = net.ParseIP(str)
} else {
return nil, fmt.Errorf(assertErrorMsg, "peer", value)
}
}
host.peer = ip
case "cluster_name":
Expand All @@ -576,31 +598,51 @@ func newHostInfoFromRow(s *Session, defaultAddr net.IP, defaultPort int, row map
case "broadcast_address":
ip, ok := value.(net.IP)
if !ok {
return nil, fmt.Errorf(assertErrorMsg, "broadcast_address", value)
if str, ok := value.(string); ok {
ip = net.ParseIP(str)
} else {
return nil, fmt.Errorf(assertErrorMsg, "broadcast_address", value)
}
}
host.broadcastAddress = ip
case "preferred_ip":
ip, ok := value.(net.IP)
if !ok {
return nil, fmt.Errorf(assertErrorMsg, "preferred_ip", value)
if str, ok := value.(string); ok {
ip = net.ParseIP(str)
} else {
return nil, fmt.Errorf(assertErrorMsg, "preferred_ip", value)
}
}
host.preferredIP = ip
case "rpc_address":
ip, ok := value.(net.IP)
if !ok {
return nil, fmt.Errorf(assertErrorMsg, "rpc_address", value)
if str, ok := value.(string); ok {
ip = net.ParseIP(str)
} else {
return nil, fmt.Errorf(assertErrorMsg, "rpc_address", value)
}
}
host.rpcAddress = ip
case "native_address":
ip, ok := value.(net.IP)
if !ok {
return nil, fmt.Errorf(assertErrorMsg, "native_address", value)
if str, ok := value.(string); ok {
ip = net.ParseIP(str)
} else {
return nil, fmt.Errorf(assertErrorMsg, "native_address", value)
}
}
host.rpcAddress = ip
case "listen_address":
ip, ok := value.(net.IP)
if !ok {
return nil, fmt.Errorf(assertErrorMsg, "listen_address", value)
if str, ok := value.(string); ok {
ip = net.ParseIP(str)
} else {
return nil, fmt.Errorf(assertErrorMsg, "listen_address", value)
}
}
host.listenAddress = ip
case "native_port":
Expand Down Expand Up @@ -666,18 +708,23 @@ func newHostInfoFromRow(s *Session, defaultAddr net.IP, defaultPort int, row map
}
}

// this will return nil, nil if there were no rows left in the Iter
func (s *Session) hostInfoFromIter(iter *Iter, connectAddress net.IP, defaultPort int) (*HostInfo, error) {
rows, err := iter.SliceMap()
if err != nil {
// TODO(zariel): make typed error
return nil, err
// TODO: switch this to a new iterator method once CASSGO-36 is solved
m := map[string]interface{}{
// we set rack to a double pointer so we can know if it's NULL or not since
// we need to be able to filter out NULL rack hosts but not empty string hosts
// see CASSGO-6
"rack": new(*string),
}

if len(rows) == 0 {
return nil, errors.New("query returned 0 rows")
if !iter.MapScan(m) {
if err := iter.Close(); err != nil {
return nil, err
}
return nil, nil
}

host, err := s.newHostInfoFromMap(connectAddress, defaultPort, rows[0])
host, err := s.newHostInfoFromMap(connectAddress, defaultPort, m)
if err != nil {
return nil, err
}
Expand All @@ -700,8 +747,13 @@ func (r *ringDescriber) getLocalHostInfo() (*HostInfo, error) {

host, err := r.session.hostInfoFromIter(iter, nil, r.session.cfg.Port)
if err != nil {
// just cleanup
iter.Close()
return nil, fmt.Errorf("could not retrieve local host info: %w", err)
}
if host == nil {
return nil, errors.New("could not retrieve local host info: query returned 0 rows")
}
return host, nil
}

Expand All @@ -711,7 +763,6 @@ func (r *ringDescriber) getClusterPeerInfo(localHost *HostInfo) ([]*HostInfo, er
return nil, errNoControl
}

var peers []*HostInfo
iter := r.session.control.withConnHost(func(ch *connHost) *Iter {
return ch.conn.querySystemPeers(context.TODO(), localHost.version)
})
Expand All @@ -720,18 +771,25 @@ func (r *ringDescriber) getClusterPeerInfo(localHost *HostInfo) ([]*HostInfo, er
return nil, errNoControl
}

rows, err := iter.SliceMap()
if err != nil {
// TODO(zariel): make typed error
return nil, fmt.Errorf("unable to fetch peer host info: %s", err)
}

for _, row := range rows {
var peers []*HostInfo
for {
// extract all available info about the peer
host, err := r.session.newHostInfoFromMap(nil, r.session.cfg.Port, row)
host, err := r.session.hostInfoFromIter(iter, nil, r.session.cfg.Port)
if err != nil {
return nil, err
} else if !isValidPeer(host) {
// if the error came from the iterator then return it, otherwise ignore
// and warn
if iterErr := iter.Close(); iterErr != nil {
return nil, fmt.Errorf("unable to fetch peer host info: %s", iterErr)
}
// skip over peers that we couldn't parse
r.session.logger.Warning("Failed to parse peer this host will be ignored.", newLogFieldError("err", err))
continue
}
// if nil then none left
if host == nil {
break
}
if !isValidPeer(host) {
// If it's not a valid peer
r.session.logger.Warning("Found invalid peer "+
"likely due to a gossip or snitch issue, this host will be ignored.", newLogFieldStringer("host", host))
Expand All @@ -749,7 +807,7 @@ func isValidPeer(host *HostInfo) bool {
return !(len(host.RPCAddress()) == 0 ||
host.hostId == "" ||
host.dataCenter == "" ||
host.rack == "" ||
host.missingRack ||
len(host.tokens) == 0)
}

Expand Down
Loading