@@ -110,99 +110,115 @@ pub async fn resolve_parseable_metadata(
110
110
. as_ref ( )
111
111
. map ( |meta| serde_json:: from_slice ( meta) . expect ( "parseable config is valid json" ) ) ;
112
112
113
- // Env Change needs to be updated
114
- let check = determine_environment ( staging_metadata, remote_metadata) ;
115
- // flags for if metadata needs to be synced
116
- let mut overwrite_staging = false ;
117
- let mut overwrite_remote = false ;
118
-
119
- let res = match check {
120
- EnvChange :: None ( metadata) => {
121
- // overwrite staging anyways so that it matches remote in case of any divergence
122
- overwrite_staging = true ;
123
- if PARSEABLE . options . mode == Mode :: All {
124
- metadata. server_mode . standalone_after_distributed ( ) ?;
125
- }
126
- Ok ( metadata)
127
- } ,
128
- EnvChange :: NewRemote => {
129
- Err ( "Could not start the server because staging directory indicates stale data from previous deployment, please choose an empty staging directory and restart the server" )
130
- }
131
- EnvChange :: NewStaging ( mut metadata) => {
113
+ let env_change = determine_environment ( staging_metadata, remote_metadata) ;
132
114
133
- // if server is started in ingest mode,we need to make sure that query mode has been started
134
- // i.e the metadata is updated to reflect the server mode = Query
135
- if metadata. server_mode == Mode :: All && PARSEABLE . options . mode == Mode :: Ingest {
136
- Err ( "Starting Ingest Mode is not allowed, Since Query Server has not been started yet" )
137
- } else {
138
- create_dir_all ( PARSEABLE . options . staging_dir ( ) ) ?;
139
- metadata. staging = PARSEABLE . options . staging_dir ( ) . canonicalize ( ) ?;
140
- // this flag is set to true so that metadata is copied to staging
141
- overwrite_staging = true ;
142
- // overwrite remote in all and query mode
143
- // because staging dir has changed.
144
- match PARSEABLE . options . mode {
145
- Mode :: All => {
146
- metadata. server_mode . standalone_after_distributed ( )
147
- . map_err ( |err| {
148
- ObjectStorageError :: Custom ( err. to_string ( ) )
149
- } ) ?;
150
- overwrite_remote = true ;
151
- } ,
152
- Mode :: Query | Mode :: Prism => {
153
- overwrite_remote = true ;
154
- metadata. server_mode = PARSEABLE . options . mode ;
155
- metadata. staging = PARSEABLE . options . staging_dir ( ) . to_path_buf ( ) ;
156
- } ,
157
- Mode :: Ingest => {
158
- // if ingest server is started fetch the metadata from remote
159
- // update the server mode for local metadata
160
- metadata. server_mode = PARSEABLE . options . mode ;
161
- metadata. staging = PARSEABLE . options . staging_dir ( ) . to_path_buf ( ) ;
162
- } ,
163
- Mode :: Index => {
164
- // if index server is started fetch the metadata from remote
165
- // update the server mode for local metadata
166
- metadata. server_mode = PARSEABLE . options . mode ;
167
- metadata. staging = PARSEABLE . options . staging_dir ( ) . to_path_buf ( ) ;
168
- }
169
- }
170
- Ok ( metadata)
171
- }
172
- }
173
- EnvChange :: CreateBoth => {
174
- create_dir_all ( PARSEABLE . options . staging_dir ( ) ) ?;
175
- let metadata = StorageMetadata :: default ( ) ;
176
- // new metadata needs to be set
177
- // if mode is query or all then both staging and remote
178
- match PARSEABLE . options . mode {
179
- Mode :: All | Mode :: Query | Mode :: Prism => overwrite_remote = true ,
180
- _ => ( ) ,
181
- }
182
- // else only staging
183
- overwrite_staging = true ;
184
- Ok ( metadata)
185
- }
186
- } ;
187
-
188
- let mut metadata = res. map_err ( |err| {
189
- let err = format ! ( "{}. {}" , err, JOIN_COMMUNITY ) ;
190
- let err: Box < dyn std:: error:: Error + Send + Sync + ' static > = err. into ( ) ;
191
- ObjectStorageError :: UnhandledError ( err)
192
- } ) ?;
115
+ let ( mut metadata, overwrite_staging, overwrite_remote) = process_env_change ( env_change) ?;
193
116
194
117
metadata. server_mode = PARSEABLE . options . mode ;
118
+
195
119
if overwrite_remote {
196
120
put_remote_metadata ( & metadata) . await ?;
197
121
}
198
-
199
122
if overwrite_staging {
200
123
put_staging_metadata ( & metadata) ?;
201
124
}
202
125
203
126
Ok ( metadata)
204
127
}
205
128
129
+ fn process_env_change (
130
+ env_change : EnvChange ,
131
+ ) -> Result < ( StorageMetadata , bool , bool ) , ObjectStorageError > {
132
+ match env_change {
133
+ EnvChange :: None ( mut metadata) => handle_none_env ( & mut metadata) ,
134
+ EnvChange :: NewRemote => handle_new_remote_env ( ) ,
135
+ EnvChange :: NewStaging ( mut metadata) => handle_new_staging_env ( & mut metadata) ,
136
+ EnvChange :: CreateBoth => handle_create_both_env ( ) ,
137
+ }
138
+ }
139
+
140
+ fn handle_none_env (
141
+ metadata : & mut StorageMetadata ,
142
+ ) -> Result < ( StorageMetadata , bool , bool ) , ObjectStorageError > {
143
+ let overwrite_staging = true ;
144
+ let mut overwrite_remote = false ;
145
+
146
+ match PARSEABLE . options . mode {
147
+ Mode :: All => {
148
+ metadata. server_mode . standalone_after_distributed ( ) ?;
149
+ overwrite_remote = true ;
150
+ update_metadata_mode_and_staging ( metadata) ;
151
+ }
152
+ Mode :: Query => {
153
+ overwrite_remote = true ;
154
+ update_metadata_mode_and_staging ( metadata) ;
155
+ }
156
+ _ => { }
157
+ }
158
+ if PARSEABLE . options . mode == Mode :: All {
159
+ metadata. server_mode . standalone_after_distributed ( ) ?;
160
+ }
161
+ Ok ( ( metadata. clone ( ) , overwrite_staging, overwrite_remote) )
162
+ }
163
+
164
+ fn handle_new_remote_env ( ) -> Result < ( StorageMetadata , bool , bool ) , ObjectStorageError > {
165
+ Err ( ObjectStorageError :: UnhandledError ( format ! (
166
+ "Could not start the server because staging directory indicates stale data from previous deployment, please choose an empty staging directory and restart the server. {}" ,
167
+ JOIN_COMMUNITY
168
+ ) . into ( ) ) )
169
+ }
170
+
171
+ fn handle_new_staging_env (
172
+ metadata : & mut StorageMetadata ,
173
+ ) -> Result < ( StorageMetadata , bool , bool ) , ObjectStorageError > {
174
+ if metadata. server_mode == Mode :: All && PARSEABLE . options . mode == Mode :: Ingest {
175
+ return Err ( ObjectStorageError :: UnhandledError (
176
+ format ! (
177
+ "Starting Ingest Mode is not allowed, Since Query Server has not been started yet. {}" ,
178
+ JOIN_COMMUNITY
179
+ )
180
+ . into ( ) ,
181
+ ) ) ;
182
+ }
183
+ create_dir_all ( PARSEABLE . options . staging_dir ( ) ) ?;
184
+ metadata. staging = PARSEABLE . options . staging_dir ( ) . canonicalize ( ) ?;
185
+ let overwrite_staging = true ;
186
+ let mut overwrite_remote = false ;
187
+
188
+ match PARSEABLE . options . mode {
189
+ Mode :: All => {
190
+ metadata
191
+ . server_mode
192
+ . standalone_after_distributed ( )
193
+ . map_err ( |err| ObjectStorageError :: Custom ( err. to_string ( ) ) ) ?;
194
+ overwrite_remote = true ;
195
+ }
196
+ Mode :: Query | Mode :: Prism | Mode :: Ingest | Mode :: Index => {
197
+ update_metadata_mode_and_staging ( metadata) ;
198
+ if matches ! ( PARSEABLE . options. mode, Mode :: Query | Mode :: Prism ) {
199
+ overwrite_remote = true ;
200
+ }
201
+ }
202
+ }
203
+ Ok ( ( metadata. clone ( ) , overwrite_staging, overwrite_remote) )
204
+ }
205
+
206
+ fn handle_create_both_env ( ) -> Result < ( StorageMetadata , bool , bool ) , ObjectStorageError > {
207
+ create_dir_all ( PARSEABLE . options . staging_dir ( ) ) ?;
208
+ let metadata = StorageMetadata :: default ( ) ;
209
+ let overwrite_remote = matches ! (
210
+ PARSEABLE . options. mode,
211
+ Mode :: All | Mode :: Query | Mode :: Prism
212
+ ) ;
213
+ let overwrite_staging = true ;
214
+ Ok ( ( metadata, overwrite_staging, overwrite_remote) )
215
+ }
216
+
217
+ fn update_metadata_mode_and_staging ( metadata : & mut StorageMetadata ) {
218
+ metadata. server_mode = PARSEABLE . options . mode ;
219
+ metadata. staging = PARSEABLE . options . staging_dir ( ) . to_path_buf ( ) ;
220
+ }
221
+
206
222
pub fn determine_environment (
207
223
staging_metadata : Option < StorageMetadata > ,
208
224
remote_metadata : Option < StorageMetadata > ,
0 commit comments