Skip to content

Virtual keyspace metadata support immplementation #1794

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2208,6 +2208,24 @@ func TestGetKeyspaceMetadata(t *testing.T) {
}
}

func TestGetVirtualKeyspaceMetadata(t *testing.T) {
session := createSession(t)
defer session.Close()

virtualKeyspaceMatadata, err := session.VirtualKeyspaceMetadata("system_views")
if err != nil {
t.Fatal(err)
}
if len(virtualKeyspaceMatadata.Tables) == 0 {
t.Fatal("virtualKeyspaceMatadata.Tables is empty")
}
for _, table := range virtualKeyspaceMatadata.Tables {
if table.Keyspace != "system_views" {
t.Fatalf("Expected table keyspace to be 'system_views' but got '%s'", table.Keyspace)
}
}
}

// Integration test of just querying for data from the system.schema_keyspace table where the keyspace DOES NOT exist.
func TestGetKeyspaceMetadataFails(t *testing.T) {
session := createSession(t)
Expand Down
196 changes: 196 additions & 0 deletions metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ type KeyspaceMetadata struct {
UserTypes map[string]*UserTypeMetadata
}

// schema metadata for a virtual keyspace
type VirtualKeyspaceMetadata struct {
Name string
Tables map[string]*VirtualTableMetadata
}

// schema metadata for a table (a.k.a. column family)
type TableMetadata struct {
Keyspace string
Expand All @@ -66,6 +72,13 @@ type TableMetadata struct {
OrderedColumns []string
}

type VirtualTableMetadata struct {
Keyspace string
Name string
Comment string
Columns map[string]*VirtualColumnMetadata
}

// schema metadata for a column
type ColumnMetadata struct {
Keyspace string
Expand All @@ -80,6 +93,15 @@ type ColumnMetadata struct {
Index ColumnIndexMetadata
}

type VirtualColumnMetadata struct {
Keyspace string
Table string
Name string
ClusteringOrder string
Kind ColumnKind
Type TypeInfo
}

// FunctionMetadata holds metadata for function constructs
type FunctionMetadata struct {
Keyspace string
Expand Down Expand Up @@ -231,6 +253,13 @@ type schemaDescriber struct {
cache map[string]*KeyspaceMetadata
}

// queries the cluster for schema information for a virtual keyspace
type virtualSchemaDescriber struct {
session *Session
mu sync.Mutex
cache map[string]*VirtualKeyspaceMetadata
Comment on lines +259 to +260
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend, in this particular case, use sync.Map instead of Mutex + map.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is preferable to use mutex, to keep the existing pattern, and avoid unnecessary code complexity.

}

// creates a session bound schema describer which will query and cache
// keyspace metadata
func newSchemaDescriber(session *Session) *schemaDescriber {
Expand All @@ -240,6 +269,15 @@ func newSchemaDescriber(session *Session) *schemaDescriber {
}
}

// creates a session bound schema describer which will query and cache
// virtual keyspace metadata
func newVirtualSchemaDescriber(session *Session) *virtualSchemaDescriber {
return &virtualSchemaDescriber{
session: session,
cache: map[string]*VirtualKeyspaceMetadata{},
}
}

// returns the cached KeyspaceMetadata held by the describer for the named
// keyspace.
func (s *schemaDescriber) getSchema(keyspaceName string) (*KeyspaceMetadata, error) {
Expand All @@ -260,6 +298,23 @@ func (s *schemaDescriber) getSchema(keyspaceName string) (*KeyspaceMetadata, err
return metadata, nil
}

// returns the cached VirtualKeyspaceMetadata held by the describer for the named
// keyspace.
func (s *virtualSchemaDescriber) getSchema(keyspaceName string) (*VirtualKeyspaceMetadata, error) {
s.mu.Lock()
defer s.mu.Unlock()
metadata, found := s.cache[keyspaceName]
Copy link
Contributor

@dkropachev dkropachev Aug 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't you invalidate it at some point.
Say you upgrade cluster from one version to another, driver will still keep old schema.

Probably on handleNodeUp you could read release_version select release_version from system.local where key = 'local' and invalidate or update cache if it has changed.

Another problem if your cluster half on new version half on another.
Now you depends on where have your control connection landed, if it has landed on a node with new version, session will give you a new schema, while old nodes don't support it.
Way to solve it is to run queries not on control connection, but rather on the node with oldest version available, if you were do so, you can't pick all the nodes from the cluster, you have to adhere to HostSelectionPolicy, say if it is dcAwareRR you will need to filter out all nodes from other datacenters.

Copy link
Contributor Author

@tengu-alt tengu-alt Aug 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with your point about invalidation, but virtual metadata is only requested on the user's purpose, not by some node event, also virtual tables can't be changed, if we talk about some schemaAgreement case. In my opinion, it is unnecessary to invalidate it on every getSchema request, and i think it is unnecessary to remove a control connection, it is good to keep the existing pattern of the metadata requests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you clear cache a the betting of getSchema it means that there is going to be no cache, why just not drop it.
If you drop the cache then there is no point in having virtualSchemaDescriber, you can just move code to Session.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I totally agree with you, but in my opinion, it is good to keep an existing pattern of processing the metadata (virtual or not)

if !found {
err := s.refreshSchema(keyspaceName)
if err != nil {
return nil, err
}
metadata = s.cache[keyspaceName]
}

return metadata, nil
}

// clears the already cached keyspace metadata
func (s *schemaDescriber) clearSchema(keyspaceName string) {
s.mu.Lock()
Expand Down Expand Up @@ -314,6 +369,43 @@ func (s *schemaDescriber) refreshSchema(keyspaceName string) error {
return nil
}

// forcibly updates the current VirtualKeyspaceMetadata held by the virtual schema describer
// for a given named keyspace.
func (s *virtualSchemaDescriber) refreshSchema(keyspaceName string) error {
var wg sync.WaitGroup

var (
tables []VirtualTableMetadata
columns []VirtualColumnMetadata
tableErr, columnErr error
)

wg.Add(2)
go func() {
defer wg.Done()
tables, tableErr = getVirtualTableMetadata(s.session, keyspaceName)
}()
go func() {
defer wg.Done()
columns, columnErr = getVirtualColumnMetadata(s.session, keyspaceName)
}()
wg.Wait()

if columnErr != nil {
return columnErr
}
if tableErr != nil {
return tableErr
}

keyspaceMetadata := &VirtualKeyspaceMetadata{Name: keyspaceName}
compileVirtualMetadata(keyspaceMetadata, tables, columns)

s.cache[keyspaceName] = keyspaceMetadata

return nil
}

// "compiles" derived information about keyspace, table, and column metadata
// for a keyspace from the basic queried metadata objects returned by
// getKeyspaceMetadata, getTableMetadata, and getColumnMetadata respectively;
Expand Down Expand Up @@ -395,6 +487,33 @@ func compileMetadata(
}
}

// "compiles" derived information about virtual keyspace, table, and column metadata
// for a keyspace from the basic queried metadata objects returned by
// getVirtualTableMetadata, and getVirtualColumnMetadata respectively;
// Links the metadata objects together.
func compileVirtualMetadata(
keyspace *VirtualKeyspaceMetadata,
tables []VirtualTableMetadata,
columns []VirtualColumnMetadata,
) {
keyspace.Tables = make(map[string]*VirtualTableMetadata)
for i := range tables {
tables[i].Columns = make(map[string]*VirtualColumnMetadata)

keyspace.Tables[tables[i].Name] = &tables[i]
}

for i := range columns {
col := &columns[i]
table, ok := keyspace.Tables[col.Table]
if !ok {
continue
}

table.Columns[col.Name] = col
}
}

// Compiles derived information from TableMetadata which have had
// ColumnMetadata added already. V1 protocol does not return as much
// column metadata as V2+ (because V1 doesn't support the "type" column in the
Expand Down Expand Up @@ -738,6 +857,37 @@ func getTableMetadata(session *Session, keyspaceName string) ([]TableMetadata, e
return tables, nil
}

// query for only the table metadata in the specified keyspace from system_virtual_schema.tables
func getVirtualTableMetadata(s *Session, keyspaceName string) ([]VirtualTableMetadata, error) {
const stmt = `
SELECT
table_name,
comment
FROM system_virtual_schema.tables
WHERE keyspace_name = ?`

tables := []VirtualTableMetadata{}
table := VirtualTableMetadata{Keyspace: keyspaceName}

iter := s.control.query(stmt, keyspaceName)
defer iter.Close()

if iter.err != nil {
return nil, fmt.Errorf("failed to iterate virtual table metadata for keyspace: %w", iter.err)
}

if iter.NumRows() == 0 {
return nil, ErrKeyspaceDoesNotExist
}

for iter.Scan(&table.Name, &table.Comment) {
tables = append(tables, table)
table = VirtualTableMetadata{Keyspace: keyspaceName}
}

return tables, nil
}

func (s *Session) scanColumnMetadataV1(keyspace string) ([]ColumnMetadata, error) {
// V1 does not support the type column, and all returned rows are
// of kind "regular".
Expand Down Expand Up @@ -925,6 +1075,52 @@ func getColumnMetadata(session *Session, keyspaceName string) ([]ColumnMetadata,
return columns, nil
}

// query for only the column metadata in the specified keyspace from system_virtual_schema.columns
func getVirtualColumnMetadata(s *Session, keyspaceName string) ([]VirtualColumnMetadata, error) {
const stmt = `
SELECT
table_name,
column_name,
clustering_order,
kind,
type
FROM system_virtual_schema.columns
WHERE keyspace_name = ?`

var columns []VirtualColumnMetadata

rows := s.control.query(stmt, keyspaceName).Scanner()

for rows.Next() {
var (
column = VirtualColumnMetadata{Keyspace: keyspaceName}
columnType string
)

err := rows.Scan(
&column.Table,
&column.Name,
&column.ClusteringOrder,
&column.Kind,
&columnType,
)

if err != nil {
return nil, err
}

column.Type = getCassandraType(columnType, s.logger)

columns = append(columns, column)
}

if err := rows.Err(); err != nil {
return nil, fmt.Errorf("failed to iterate virtual column metadata for keyspace: %w", err)
}

return columns, nil
}

func getTypeInfo(t string, logger StdLogger) TypeInfo {
if strings.HasPrefix(t, apacheCassandraTypePrefix) {
t = apacheToCassandraType(t)
Expand Down
Loading