diff --git a/cassandra_test.go b/cassandra_test.go index 54a54f426..e6c9ba9f2 100644 --- a/cassandra_test.go +++ b/cassandra_test.go @@ -2208,6 +2208,24 @@ func TestGetKeyspaceMetadata(t *testing.T) { } } +func TestGetVirtualKeyspaceMetadata(t *testing.T) { + session := createSession(t) + defer session.Close() + + virtualKeyspaceMatadata, err := session.VirtualKeyspaceMetadata("system_views") + if err != nil { + t.Fatal(err) + } + if len(virtualKeyspaceMatadata.Tables) == 0 { + t.Fatal("virtualKeyspaceMatadata.Tables is empty") + } + for _, table := range virtualKeyspaceMatadata.Tables { + if table.Keyspace != "system_views" { + t.Fatalf("Expected table keyspace to be 'system_views' but got '%s'", table.Keyspace) + } + } +} + // Integration test of just querying for data from the system.schema_keyspace table where the keyspace DOES NOT exist. func TestGetKeyspaceMetadataFails(t *testing.T) { session := createSession(t) diff --git a/metadata.go b/metadata.go index 63e27aeb6..c65da3f50 100644 --- a/metadata.go +++ b/metadata.go @@ -50,6 +50,12 @@ type KeyspaceMetadata struct { UserTypes map[string]*UserTypeMetadata } +// schema metadata for a virtual keyspace +type VirtualKeyspaceMetadata struct { + Name string + Tables map[string]*VirtualTableMetadata +} + // schema metadata for a table (a.k.a. column family) type TableMetadata struct { Keyspace string @@ -66,6 +72,13 @@ type TableMetadata struct { OrderedColumns []string } +type VirtualTableMetadata struct { + Keyspace string + Name string + Comment string + Columns map[string]*VirtualColumnMetadata +} + // schema metadata for a column type ColumnMetadata struct { Keyspace string @@ -80,6 +93,15 @@ type ColumnMetadata struct { Index ColumnIndexMetadata } +type VirtualColumnMetadata struct { + Keyspace string + Table string + Name string + ClusteringOrder string + Kind ColumnKind + Type TypeInfo +} + // FunctionMetadata holds metadata for function constructs type FunctionMetadata struct { Keyspace string @@ -231,6 +253,13 @@ type schemaDescriber struct { cache map[string]*KeyspaceMetadata } +// queries the cluster for schema information for a virtual keyspace +type virtualSchemaDescriber struct { + session *Session + mu sync.Mutex + cache map[string]*VirtualKeyspaceMetadata +} + // creates a session bound schema describer which will query and cache // keyspace metadata func newSchemaDescriber(session *Session) *schemaDescriber { @@ -240,6 +269,15 @@ func newSchemaDescriber(session *Session) *schemaDescriber { } } +// creates a session bound schema describer which will query and cache +// virtual keyspace metadata +func newVirtualSchemaDescriber(session *Session) *virtualSchemaDescriber { + return &virtualSchemaDescriber{ + session: session, + cache: map[string]*VirtualKeyspaceMetadata{}, + } +} + // returns the cached KeyspaceMetadata held by the describer for the named // keyspace. func (s *schemaDescriber) getSchema(keyspaceName string) (*KeyspaceMetadata, error) { @@ -260,6 +298,23 @@ func (s *schemaDescriber) getSchema(keyspaceName string) (*KeyspaceMetadata, err return metadata, nil } +// returns the cached VirtualKeyspaceMetadata held by the describer for the named +// keyspace. +func (s *virtualSchemaDescriber) getSchema(keyspaceName string) (*VirtualKeyspaceMetadata, error) { + s.mu.Lock() + defer s.mu.Unlock() + metadata, found := s.cache[keyspaceName] + if !found { + err := s.refreshSchema(keyspaceName) + if err != nil { + return nil, err + } + metadata = s.cache[keyspaceName] + } + + return metadata, nil +} + // clears the already cached keyspace metadata func (s *schemaDescriber) clearSchema(keyspaceName string) { s.mu.Lock() @@ -314,6 +369,43 @@ func (s *schemaDescriber) refreshSchema(keyspaceName string) error { return nil } +// forcibly updates the current VirtualKeyspaceMetadata held by the virtual schema describer +// for a given named keyspace. +func (s *virtualSchemaDescriber) refreshSchema(keyspaceName string) error { + var wg sync.WaitGroup + + var ( + tables []VirtualTableMetadata + columns []VirtualColumnMetadata + tableErr, columnErr error + ) + + wg.Add(2) + go func() { + defer wg.Done() + tables, tableErr = getVirtualTableMetadata(s.session, keyspaceName) + }() + go func() { + defer wg.Done() + columns, columnErr = getVirtualColumnMetadata(s.session, keyspaceName) + }() + wg.Wait() + + if columnErr != nil { + return columnErr + } + if tableErr != nil { + return tableErr + } + + keyspaceMetadata := &VirtualKeyspaceMetadata{Name: keyspaceName} + compileVirtualMetadata(keyspaceMetadata, tables, columns) + + s.cache[keyspaceName] = keyspaceMetadata + + return nil +} + // "compiles" derived information about keyspace, table, and column metadata // for a keyspace from the basic queried metadata objects returned by // getKeyspaceMetadata, getTableMetadata, and getColumnMetadata respectively; @@ -395,6 +487,33 @@ func compileMetadata( } } +// "compiles" derived information about virtual keyspace, table, and column metadata +// for a keyspace from the basic queried metadata objects returned by +// getVirtualTableMetadata, and getVirtualColumnMetadata respectively; +// Links the metadata objects together. +func compileVirtualMetadata( + keyspace *VirtualKeyspaceMetadata, + tables []VirtualTableMetadata, + columns []VirtualColumnMetadata, +) { + keyspace.Tables = make(map[string]*VirtualTableMetadata) + for i := range tables { + tables[i].Columns = make(map[string]*VirtualColumnMetadata) + + keyspace.Tables[tables[i].Name] = &tables[i] + } + + for i := range columns { + col := &columns[i] + table, ok := keyspace.Tables[col.Table] + if !ok { + continue + } + + table.Columns[col.Name] = col + } +} + // Compiles derived information from TableMetadata which have had // ColumnMetadata added already. V1 protocol does not return as much // column metadata as V2+ (because V1 doesn't support the "type" column in the @@ -738,6 +857,37 @@ func getTableMetadata(session *Session, keyspaceName string) ([]TableMetadata, e return tables, nil } +// query for only the table metadata in the specified keyspace from system_virtual_schema.tables +func getVirtualTableMetadata(s *Session, keyspaceName string) ([]VirtualTableMetadata, error) { + const stmt = ` + SELECT + table_name, + comment + FROM system_virtual_schema.tables + WHERE keyspace_name = ?` + + tables := []VirtualTableMetadata{} + table := VirtualTableMetadata{Keyspace: keyspaceName} + + iter := s.control.query(stmt, keyspaceName) + defer iter.Close() + + if iter.err != nil { + return nil, fmt.Errorf("failed to iterate virtual table metadata for keyspace: %w", iter.err) + } + + if iter.NumRows() == 0 { + return nil, ErrKeyspaceDoesNotExist + } + + for iter.Scan(&table.Name, &table.Comment) { + tables = append(tables, table) + table = VirtualTableMetadata{Keyspace: keyspaceName} + } + + return tables, nil +} + func (s *Session) scanColumnMetadataV1(keyspace string) ([]ColumnMetadata, error) { // V1 does not support the type column, and all returned rows are // of kind "regular". @@ -925,6 +1075,52 @@ func getColumnMetadata(session *Session, keyspaceName string) ([]ColumnMetadata, return columns, nil } +// query for only the column metadata in the specified keyspace from system_virtual_schema.columns +func getVirtualColumnMetadata(s *Session, keyspaceName string) ([]VirtualColumnMetadata, error) { + const stmt = ` + SELECT + table_name, + column_name, + clustering_order, + kind, + type + FROM system_virtual_schema.columns + WHERE keyspace_name = ?` + + var columns []VirtualColumnMetadata + + rows := s.control.query(stmt, keyspaceName).Scanner() + + for rows.Next() { + var ( + column = VirtualColumnMetadata{Keyspace: keyspaceName} + columnType string + ) + + err := rows.Scan( + &column.Table, + &column.Name, + &column.ClusteringOrder, + &column.Kind, + &columnType, + ) + + if err != nil { + return nil, err + } + + column.Type = getCassandraType(columnType, s.logger) + + columns = append(columns, column) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("failed to iterate virtual column metadata for keyspace: %w", err) + } + + return columns, nil +} + func getTypeInfo(t string, logger StdLogger) TypeInfo { if strings.HasPrefix(t, apacheCassandraTypePrefix) { t = apacheToCassandraType(t) diff --git a/metadata_test.go b/metadata_test.go index 6e3633ccc..8fdd05f8a 100644 --- a/metadata_test.go +++ b/metadata_test.go @@ -477,6 +477,51 @@ func TestCompileMetadata(t *testing.T) { ) } +func TestCompileVirtualMetadata(t *testing.T) { + keyspace := &VirtualKeyspaceMetadata{ + Name: "V1Keyspace", + } + tables := []VirtualTableMetadata{ + { + Keyspace: "V1Keyspace", + Name: "peers", + Comment: "some", + }, + } + columns := []VirtualColumnMetadata{ + // Here are the regular columns from the peers table for testing regular columns + {Keyspace: "V1Keyspace", Table: "peers", Kind: ColumnRegular, Name: "data_center", Type: NativeType{typ: TypeVarchar}}, + {Keyspace: "V1Keyspace", Table: "peers", Kind: ColumnRegular, Name: "host_id", Type: NativeType{typ: TypeVarchar}}, + {Keyspace: "V1Keyspace", Table: "peers", Kind: ColumnRegular, Name: "rack", Type: NativeType{typ: TypeVarchar}}, + {Keyspace: "V1Keyspace", Table: "peers", Kind: ColumnRegular, Name: "release_version", Type: NativeType{typ: TypeVarchar}}, + {Keyspace: "V1Keyspace", Table: "peers", Kind: ColumnRegular, Name: "rpc_address", Type: NativeType{typ: TypeVarchar}}, + {Keyspace: "V1Keyspace", Table: "peers", Kind: ColumnRegular, Name: "schema_version", Type: NativeType{typ: TypeVarchar}}, + {Keyspace: "V1Keyspace", Table: "peers", Kind: ColumnRegular, Name: "tokens", Type: NativeType{typ: TypeVarchar}}, + } + + compileVirtualMetadata(keyspace, tables, columns) + assertVirtualKeyspaceMetadata(t, keyspace, + &VirtualKeyspaceMetadata{ + Name: "V1Keyspace", + Tables: map[string]*VirtualTableMetadata{ + "peers": { + Keyspace: "V1Keyspace", + Name: "peers", + Columns: map[string]*VirtualColumnMetadata{ + "data_center": {Keyspace: "V1Keyspace", Table: "peers", Kind: ColumnRegular, Name: "data_center", Type: NativeType{typ: TypeVarchar}}, + "host_id": {Keyspace: "V1Keyspace", Table: "peers", Kind: ColumnRegular, Name: "host_id", Type: NativeType{typ: TypeVarchar}}, + "rack": {Keyspace: "V1Keyspace", Table: "peers", Kind: ColumnRegular, Name: "rack", Type: NativeType{typ: TypeVarchar}}, + "release_version": {Keyspace: "V1Keyspace", Table: "peers", Kind: ColumnRegular, Name: "release_version", Type: NativeType{typ: TypeVarchar}}, + "rpc_address": {Keyspace: "V1Keyspace", Table: "peers", Kind: ColumnRegular, Name: "rpc_address", Type: NativeType{typ: TypeVarchar}}, + "schema_version": {Keyspace: "V1Keyspace", Table: "peers", Kind: ColumnRegular, Name: "schema_version", Type: NativeType{typ: TypeVarchar}}, + "tokens": {Keyspace: "V1Keyspace", Table: "peers", Kind: ColumnRegular, Name: "tokens", Type: NativeType{typ: TypeVarchar}}, + }, + }, + }, + }, + ) +} + // Helper function for asserting that actual metadata returned was as expected func assertKeyspaceMetadata(t *testing.T, actual, expected *KeyspaceMetadata) { if len(expected.Tables) != len(actual.Tables) { @@ -565,7 +610,7 @@ func assertKeyspaceMetadata(t *testing.T, actual, expected *KeyspaceMetadata) { t.Errorf("Expected %s.Tables[%s].Columns[%s] but was not found", expected.Name, keyT, keyC) } else { if keyC != ac.Name { - t.Errorf("Expected %s.Tables[%s].Columns[%s].Name to be '%v' but was '%v'", expected.Name, keyT, keyC, keyC, at.Name) + t.Errorf("Expected %s.Tables[%s].Columns[%s].Name to be '%v' but was '%v'", expected.Name, keyT, keyC, keyC, ac.Name) } if expected.Name != ac.Keyspace { t.Errorf("Expected %s.Tables[%s].Columns[%s].Keyspace to be '%v' but was '%v'", expected.Name, keyT, keyC, expected.Name, ac.Keyspace) @@ -589,6 +634,61 @@ func assertKeyspaceMetadata(t *testing.T, actual, expected *KeyspaceMetadata) { } } +func assertVirtualKeyspaceMetadata(t *testing.T, actualKeyspace, expectedKeyspace *VirtualKeyspaceMetadata) { + if len(expectedKeyspace.Tables) != len(actualKeyspace.Tables) { + t.Errorf("Expected len(%s.Tables) to be %v but was %v", expectedKeyspace.Name, len(expectedKeyspace.Tables), len(actualKeyspace.Tables)) + } + for expectedTableName, expectedTable := range expectedKeyspace.Tables { + actualTable, found := actualKeyspace.Tables[expectedTableName] + if !found { + t.Errorf("Expected %s.Tables[%s] but was not found", expectedKeyspace.Name, expectedTableName) + } else { + if expectedTableName != actualTable.Name { + t.Errorf("Expected %s.Tables[%s].Name to be %v but was %v", expectedKeyspace.Name, expectedTableName, expectedTableName, actualTable.Name) + } + if expectedTableName == "peers" { + + } + if len(expectedTable.Columns) != len(actualTable.Columns) { + eKeys := make([]string, 0, len(expectedTable.Columns)) + for key := range expectedTable.Columns { + eKeys = append(eKeys, key) + } + aKeys := make([]string, 0, len(actualTable.Columns)) + for key := range actualTable.Columns { + aKeys = append(aKeys, key) + } + t.Errorf("Expected len(%s.Tables[%s].Columns) to be %v (keys:%v) but was %v (keys:%v)", expectedKeyspace.Name, expectedTableName, len(expectedTable.Columns), eKeys, len(actualTable.Columns), aKeys) + } else { + for expectedColumnName, expectedColumn := range expectedTable.Columns { + actualColumn, found := actualTable.Columns[expectedColumnName] + + if !found { + t.Errorf("Expected %s.Tables[%s].Columns[%s] but was not found", expectedKeyspace.Name, expectedTableName, expectedColumnName) + } else { + if expectedColumnName != actualColumn.Name { + t.Errorf("Expected %s.Tables[%s].Columns[%s].Name to be '%v' but was '%v'", expectedKeyspace.Name, expectedTableName, expectedColumnName, expectedColumnName, actualColumn.Name) + } + if expectedKeyspace.Name != actualColumn.Keyspace { + t.Errorf("Expected %s.Tables[%s].Columns[%s].Keyspace to be '%v' but was '%v'", expectedKeyspace.Name, expectedTableName, expectedColumnName, expectedKeyspace.Name, actualColumn.Keyspace) + } + if expectedTableName != actualColumn.Table { + t.Errorf("Expected %s.Tables[%s].Columns[%s].Table to be '%v' but was '%v'", expectedKeyspace.Name, expectedTableName, expectedColumnName, expectedTableName, actualColumn.Table) + } + if expectedColumn.Type.Type() != actualColumn.Type.Type() { + t.Errorf("Expected %s.Tables[%s].Columns[%s].Type.Type to be %v but was %v", expectedKeyspace.Name, expectedTableName, expectedColumnName, expectedColumn.Type.Type(), actualColumn.Type.Type()) + } + if expectedColumn.Kind != actualColumn.Kind { + t.Errorf("Expected %s.Tables[%s].Columns[%s].Kind to be '%v' but was '%v'", expectedKeyspace.Name, expectedTableName, expectedColumnName, expectedColumn.Kind, actualColumn.Kind) + } + } + } + } + } + } + +} + // Tests the cassandra type definition parser func TestTypeParser(t *testing.T) { // native type diff --git a/session.go b/session.go index ed1a078d3..33450b882 100644 --- a/session.go +++ b/session.go @@ -51,20 +51,21 @@ import ( // and automatically sets a default consistency level on all operations // that do not have a consistency level set. type Session struct { - cons Consistency - pageSize int - prefetch float64 - routingKeyInfoCache routingKeyInfoLRU - schemaDescriber *schemaDescriber - trace Tracer - queryObserver QueryObserver - batchObserver BatchObserver - connectObserver ConnectObserver - frameObserver FrameHeaderObserver - streamObserver StreamObserver - hostSource *ringDescriber - ringRefresher *refreshDebouncer - stmtsLRU *preparedLRU + cons Consistency + pageSize int + prefetch float64 + routingKeyInfoCache routingKeyInfoLRU + schemaDescriber *schemaDescriber + virtualSchemaDescriber *virtualSchemaDescriber + trace Tracer + queryObserver QueryObserver + batchObserver BatchObserver + connectObserver ConnectObserver + frameObserver FrameHeaderObserver + streamObserver StreamObserver + hostSource *ringDescriber + ringRefresher *refreshDebouncer + stmtsLRU *preparedLRU connCfg *ConnConfig @@ -164,6 +165,7 @@ func NewSession(cfg ClusterConfig) (*Session, error) { } s.schemaDescriber = newSchemaDescriber(s) + s.virtualSchemaDescriber = newVirtualSchemaDescriber(s) s.nodeEvents = newEventDebouncer("NodeEvents", s.handleNodeEvent, s.logger) s.schemaEvents = newEventDebouncer("SchemaEvents", s.handleSchemaEvent, s.logger) @@ -578,6 +580,17 @@ func (s *Session) KeyspaceMetadata(keyspace string) (*KeyspaceMetadata, error) { return s.schemaDescriber.getSchema(keyspace) } +// VirtualKeyspaceMetadata returns the schema metadata for the virtual keyspace specified. Returns an error if the keyspace does not exist. +func (s *Session) VirtualKeyspaceMetadata(keyspace string) (*VirtualKeyspaceMetadata, error) { + if s.Closed() { + return nil, ErrSessionClosed + } else if keyspace == "" { + return nil, ErrNoKeyspace + } + + return s.virtualSchemaDescriber.getSchema(keyspace) +} + func (s *Session) getConn() *Conn { hosts := s.ring.allHosts() for _, host := range hosts {