-
Notifications
You must be signed in to change notification settings - Fork 638
Virtual keyspace metadata support immplementation #1794
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -50,6 +50,12 @@ type KeyspaceMetadata struct { | |
UserTypes map[string]*UserTypeMetadata | ||
} | ||
|
||
// schema metadata for a virtual keyspace | ||
type VirtualKeyspaceMetadata struct { | ||
Name string | ||
Tables map[string]*VirtualTableMetadata | ||
} | ||
|
||
// schema metadata for a table (a.k.a. column family) | ||
type TableMetadata struct { | ||
Keyspace string | ||
|
@@ -66,6 +72,13 @@ type TableMetadata struct { | |
OrderedColumns []string | ||
} | ||
|
||
type VirtualTableMetadata struct { | ||
Keyspace string | ||
Name string | ||
Comment string | ||
Columns map[string]*VirtualColumnMetadata | ||
} | ||
|
||
// schema metadata for a column | ||
type ColumnMetadata struct { | ||
Keyspace string | ||
|
@@ -80,6 +93,15 @@ type ColumnMetadata struct { | |
Index ColumnIndexMetadata | ||
} | ||
|
||
type VirtualColumnMetadata struct { | ||
Keyspace string | ||
Table string | ||
Name string | ||
ClusteringOrder string | ||
Kind ColumnKind | ||
Type TypeInfo | ||
} | ||
|
||
// FunctionMetadata holds metadata for function constructs | ||
type FunctionMetadata struct { | ||
Keyspace string | ||
|
@@ -231,6 +253,13 @@ type schemaDescriber struct { | |
cache map[string]*KeyspaceMetadata | ||
} | ||
|
||
// queries the cluster for schema information for a virtual keyspace | ||
type virtualSchemaDescriber struct { | ||
session *Session | ||
mu sync.Mutex | ||
cache map[string]*VirtualKeyspaceMetadata | ||
} | ||
|
||
// creates a session bound schema describer which will query and cache | ||
// keyspace metadata | ||
func newSchemaDescriber(session *Session) *schemaDescriber { | ||
|
@@ -240,6 +269,15 @@ func newSchemaDescriber(session *Session) *schemaDescriber { | |
} | ||
} | ||
|
||
// creates a session bound schema describer which will query and cache | ||
// virtual keyspace metadata | ||
func newVirtualSchemaDescriber(session *Session) *virtualSchemaDescriber { | ||
return &virtualSchemaDescriber{ | ||
session: session, | ||
cache: map[string]*VirtualKeyspaceMetadata{}, | ||
} | ||
} | ||
|
||
// returns the cached KeyspaceMetadata held by the describer for the named | ||
// keyspace. | ||
func (s *schemaDescriber) getSchema(keyspaceName string) (*KeyspaceMetadata, error) { | ||
|
@@ -260,6 +298,23 @@ func (s *schemaDescriber) getSchema(keyspaceName string) (*KeyspaceMetadata, err | |
return metadata, nil | ||
} | ||
|
||
// returns the cached VirtualKeyspaceMetadata held by the describer for the named | ||
// keyspace. | ||
func (s *virtualSchemaDescriber) getSchema(keyspaceName string) (*VirtualKeyspaceMetadata, error) { | ||
s.mu.Lock() | ||
defer s.mu.Unlock() | ||
metadata, found := s.cache[keyspaceName] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't you invalidate it at some point. Probably on Another problem if your cluster half on new version half on another. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with your point about invalidation, but virtual metadata is only requested on the user's purpose, not by some node event, also virtual tables can't be changed, if we talk about some There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you clear cache a the betting of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I totally agree with you, but in my opinion, it is good to keep an existing pattern of processing the metadata (virtual or not) |
||
if !found { | ||
err := s.refreshSchema(keyspaceName) | ||
if err != nil { | ||
return nil, err | ||
} | ||
metadata = s.cache[keyspaceName] | ||
} | ||
|
||
return metadata, nil | ||
} | ||
|
||
// clears the already cached keyspace metadata | ||
func (s *schemaDescriber) clearSchema(keyspaceName string) { | ||
s.mu.Lock() | ||
|
@@ -314,6 +369,43 @@ func (s *schemaDescriber) refreshSchema(keyspaceName string) error { | |
return nil | ||
} | ||
|
||
// forcibly updates the current VirtualKeyspaceMetadata held by the virtual schema describer | ||
// for a given named keyspace. | ||
func (s *virtualSchemaDescriber) refreshSchema(keyspaceName string) error { | ||
var wg sync.WaitGroup | ||
|
||
var ( | ||
tables []VirtualTableMetadata | ||
columns []VirtualColumnMetadata | ||
tableErr, columnErr error | ||
) | ||
|
||
wg.Add(2) | ||
go func() { | ||
defer wg.Done() | ||
tables, tableErr = getVirtualTableMetadata(s.session, keyspaceName) | ||
}() | ||
go func() { | ||
defer wg.Done() | ||
columns, columnErr = getVirtualColumnMetadata(s.session, keyspaceName) | ||
}() | ||
wg.Wait() | ||
|
||
if columnErr != nil { | ||
return columnErr | ||
} | ||
if tableErr != nil { | ||
return tableErr | ||
} | ||
|
||
keyspaceMetadata := &VirtualKeyspaceMetadata{Name: keyspaceName} | ||
compileVirtualMetadata(keyspaceMetadata, tables, columns) | ||
|
||
s.cache[keyspaceName] = keyspaceMetadata | ||
|
||
return nil | ||
} | ||
|
||
// "compiles" derived information about keyspace, table, and column metadata | ||
// for a keyspace from the basic queried metadata objects returned by | ||
// getKeyspaceMetadata, getTableMetadata, and getColumnMetadata respectively; | ||
|
@@ -395,6 +487,33 @@ func compileMetadata( | |
} | ||
} | ||
|
||
// "compiles" derived information about virtual keyspace, table, and column metadata | ||
// for a keyspace from the basic queried metadata objects returned by | ||
// getVirtualTableMetadata, and getVirtualColumnMetadata respectively; | ||
// Links the metadata objects together. | ||
func compileVirtualMetadata( | ||
keyspace *VirtualKeyspaceMetadata, | ||
tables []VirtualTableMetadata, | ||
columns []VirtualColumnMetadata, | ||
) { | ||
keyspace.Tables = make(map[string]*VirtualTableMetadata) | ||
for i := range tables { | ||
tables[i].Columns = make(map[string]*VirtualColumnMetadata) | ||
|
||
keyspace.Tables[tables[i].Name] = &tables[i] | ||
} | ||
|
||
for i := range columns { | ||
col := &columns[i] | ||
table, ok := keyspace.Tables[col.Table] | ||
if !ok { | ||
continue | ||
} | ||
|
||
table.Columns[col.Name] = col | ||
} | ||
} | ||
|
||
// Compiles derived information from TableMetadata which have had | ||
// ColumnMetadata added already. V1 protocol does not return as much | ||
// column metadata as V2+ (because V1 doesn't support the "type" column in the | ||
|
@@ -738,6 +857,37 @@ func getTableMetadata(session *Session, keyspaceName string) ([]TableMetadata, e | |
return tables, nil | ||
} | ||
|
||
// query for only the table metadata in the specified keyspace from system_virtual_schema.tables | ||
func getVirtualTableMetadata(s *Session, keyspaceName string) ([]VirtualTableMetadata, error) { | ||
const stmt = ` | ||
SELECT | ||
table_name, | ||
comment | ||
FROM system_virtual_schema.tables | ||
WHERE keyspace_name = ?` | ||
|
||
tables := []VirtualTableMetadata{} | ||
table := VirtualTableMetadata{Keyspace: keyspaceName} | ||
|
||
iter := s.control.query(stmt, keyspaceName) | ||
defer iter.Close() | ||
|
||
if iter.err != nil { | ||
return nil, fmt.Errorf("failed to iterate virtual table metadata for keyspace: %w", iter.err) | ||
} | ||
|
||
if iter.NumRows() == 0 { | ||
return nil, ErrKeyspaceDoesNotExist | ||
} | ||
|
||
for iter.Scan(&table.Name, &table.Comment) { | ||
tables = append(tables, table) | ||
table = VirtualTableMetadata{Keyspace: keyspaceName} | ||
} | ||
|
||
return tables, nil | ||
} | ||
|
||
func (s *Session) scanColumnMetadataV1(keyspace string) ([]ColumnMetadata, error) { | ||
// V1 does not support the type column, and all returned rows are | ||
// of kind "regular". | ||
|
@@ -925,6 +1075,52 @@ func getColumnMetadata(session *Session, keyspaceName string) ([]ColumnMetadata, | |
return columns, nil | ||
} | ||
|
||
// query for only the column metadata in the specified keyspace from system_virtual_schema.columns | ||
func getVirtualColumnMetadata(s *Session, keyspaceName string) ([]VirtualColumnMetadata, error) { | ||
const stmt = ` | ||
SELECT | ||
table_name, | ||
column_name, | ||
clustering_order, | ||
kind, | ||
type | ||
FROM system_virtual_schema.columns | ||
WHERE keyspace_name = ?` | ||
|
||
var columns []VirtualColumnMetadata | ||
|
||
rows := s.control.query(stmt, keyspaceName).Scanner() | ||
|
||
for rows.Next() { | ||
var ( | ||
column = VirtualColumnMetadata{Keyspace: keyspaceName} | ||
columnType string | ||
) | ||
|
||
err := rows.Scan( | ||
&column.Table, | ||
&column.Name, | ||
&column.ClusteringOrder, | ||
&column.Kind, | ||
&columnType, | ||
) | ||
|
||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
column.Type = getCassandraType(columnType, s.logger) | ||
|
||
columns = append(columns, column) | ||
} | ||
|
||
if err := rows.Err(); err != nil { | ||
return nil, fmt.Errorf("failed to iterate virtual column metadata for keyspace: %w", err) | ||
} | ||
|
||
return columns, nil | ||
} | ||
|
||
func getTypeInfo(t string, logger StdLogger) TypeInfo { | ||
if strings.HasPrefix(t, apacheCassandraTypePrefix) { | ||
t = apacheToCassandraType(t) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would recommend, in this particular case, use
sync.Map
instead of Mutex + map.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is preferable to use mutex, to keep the existing pattern, and avoid unnecessary code complexity.