File tree Expand file tree Collapse file tree 1 file changed +17
-11
lines changed
Expand file tree Collapse file tree 1 file changed +17
-11
lines changed Original file line number Diff line number Diff line change @@ -832,23 +832,29 @@ impl Parseable {
832832 /// Updates schema by merging schemas stored by ingestors when running in Query mode
833833 pub async fn update_schema_when_distributed (
834834 & self ,
835- tables : & Vec < String > ,
835+ streams : & Vec < String > ,
836836 ) -> Result < ( ) , EventError > {
837837 if self . options . mode != Mode :: Query {
838838 return Ok ( ( ) ) ;
839839 }
840840
841- for table in tables {
842- if let Ok ( schemas) = self . storage . get_object_store ( ) . fetch_schemas ( table) . await {
843- let new_schema = Schema :: try_merge ( schemas) ?;
844- // commit schema merges the schema internally and updates the schema in storage.
845- self . storage
846- . get_object_store ( )
847- . commit_schema ( table, new_schema. clone ( ) )
848- . await ?;
841+ for stream_name in streams {
842+ let Ok ( schemas) = self
843+ . storage
844+ . get_object_store ( )
845+ . fetch_schemas ( stream_name)
846+ . await
847+ else {
848+ continue ;
849+ } ;
850+ let new_schema = Schema :: try_merge ( schemas) ?;
851+ // commit schema merges the schema internally and updates the schema in storage.
852+ self . storage
853+ . get_object_store ( )
854+ . commit_schema ( stream_name, new_schema. clone ( ) )
855+ . await ?;
849856
850- self . get_stream ( table) ?. commit_schema ( new_schema) ?;
851- }
857+ self . get_stream ( stream_name) ?. commit_schema ( new_schema) ?;
852858 }
853859
854860 Ok ( ( ) )
You can’t perform that action at this time.
0 commit comments