@@ -110,98 +110,110 @@ pub async fn resolve_parseable_metadata(
110
110
. as_ref ( )
111
111
. map ( |meta| serde_json:: from_slice ( meta) . expect ( "parseable config is valid json" ) ) ;
112
112
113
- // Env Change needs to be updated
114
- let check = determine_environment ( staging_metadata, remote_metadata) ;
115
- // flags for if metadata needs to be synced
116
- let mut overwrite_staging = false ;
117
- let mut overwrite_remote = false ;
113
+ let env_change = determine_environment ( staging_metadata, remote_metadata) ;
118
114
119
- let res: Result < StorageMetadata , & ' static str > = match check {
120
- EnvChange :: None ( mut metadata) => {
121
- // overwrite staging anyways so that it matches remote in case of any divergence
122
- overwrite_staging = true ;
123
- match PARSEABLE . options . mode {
124
- Mode :: All => {
125
- metadata. server_mode . standalone_after_distributed ( ) ?;
126
- overwrite_remote = true ;
127
- update_metadata_mode_and_staging ( & mut metadata) ;
128
- }
129
- Mode :: Query => {
130
- overwrite_remote = true ;
131
- update_metadata_mode_and_staging ( & mut metadata) ;
132
- }
133
- _=> { }
134
- }
135
- if PARSEABLE . options . mode == Mode :: All {
136
- metadata. server_mode . standalone_after_distributed ( ) ?;
137
- }
138
- Ok ( metadata)
139
- } ,
140
- EnvChange :: NewRemote => {
141
- Err ( "Could not start the server because staging directory indicates stale data from previous deployment, please choose an empty staging directory and restart the server" )
142
- }
143
- EnvChange :: NewStaging ( mut metadata) => {
144
- // If server is started in ingest mode, ensure query mode has been started
145
- if metadata. server_mode == Mode :: All && PARSEABLE . options . mode == Mode :: Ingest {
146
- return Err ( ObjectStorageError :: UnhandledError ( format ! (
147
- "Starting Ingest Mode is not allowed, Since Query Server has not been started yet. {}" ,
148
- JOIN_COMMUNITY
149
- ) . into ( ) ) ) ;
150
- }
151
- create_dir_all ( PARSEABLE . options . staging_dir ( ) ) ?;
152
- metadata. staging = PARSEABLE . options . staging_dir ( ) . canonicalize ( ) ?;
153
- // this flag is set to true so that metadata is copied to staging
154
- overwrite_staging = true ;
155
- // overwrite remote in all and query mode
156
- // because staging dir has changed.
157
- match PARSEABLE . options . mode {
158
- Mode :: All => {
159
- metadata. server_mode . standalone_after_distributed ( )
160
- . map_err ( |err| ObjectStorageError :: Custom ( err. to_string ( ) ) ) ?;
161
- overwrite_remote = true ;
162
- }
163
- Mode :: Query | Mode :: Prism | Mode :: Ingest | Mode :: Index => {
164
- update_metadata_mode_and_staging ( & mut metadata) ;
165
- if matches ! ( PARSEABLE . options. mode, Mode :: Query | Mode :: Prism ) {
166
- overwrite_remote = true ;
167
- }
168
- }
169
- }
170
- Ok ( metadata)
171
- }
172
- EnvChange :: CreateBoth => {
173
- create_dir_all ( PARSEABLE . options . staging_dir ( ) ) ?;
174
- let metadata = StorageMetadata :: default ( ) ;
175
- // new metadata needs to be set
176
- // if mode is query or all then both staging and remote
177
- match PARSEABLE . options . mode {
178
- Mode :: All | Mode :: Query | Mode :: Prism => overwrite_remote = true ,
179
- _ => ( ) ,
180
- }
181
- // else only staging
182
- overwrite_staging = true ;
183
- Ok ( metadata)
184
- }
185
- } ;
186
-
187
- let mut metadata = res. map_err ( |err| {
188
- let err = format ! ( "{}. {}" , err, JOIN_COMMUNITY ) ;
189
- let err: Box < dyn std:: error:: Error + Send + Sync + ' static > = err. into ( ) ;
190
- ObjectStorageError :: UnhandledError ( err)
191
- } ) ?;
115
+ let ( mut metadata, overwrite_staging, overwrite_remote) = process_env_change ( env_change) ?;
192
116
193
117
metadata. server_mode = PARSEABLE . options . mode ;
118
+
194
119
if overwrite_remote {
195
120
put_remote_metadata ( & metadata) . await ?;
196
121
}
197
-
198
122
if overwrite_staging {
199
123
put_staging_metadata ( & metadata) ?;
200
124
}
201
125
202
126
Ok ( metadata)
203
127
}
204
128
129
+ fn process_env_change (
130
+ env_change : EnvChange ,
131
+ ) -> Result < ( StorageMetadata , bool , bool ) , ObjectStorageError > {
132
+ match env_change {
133
+ EnvChange :: None ( mut metadata) => handle_none_env ( & mut metadata) ,
134
+ EnvChange :: NewRemote => handle_new_remote_env ( ) ,
135
+ EnvChange :: NewStaging ( mut metadata) => handle_new_staging_env ( & mut metadata) ,
136
+ EnvChange :: CreateBoth => handle_create_both_env ( ) ,
137
+ }
138
+ }
139
+
140
+ fn handle_none_env (
141
+ metadata : & mut StorageMetadata ,
142
+ ) -> Result < ( StorageMetadata , bool , bool ) , ObjectStorageError > {
143
+ let overwrite_staging = true ;
144
+ let mut overwrite_remote = false ;
145
+
146
+ match PARSEABLE . options . mode {
147
+ Mode :: All => {
148
+ metadata. server_mode . standalone_after_distributed ( ) ?;
149
+ overwrite_remote = true ;
150
+ update_metadata_mode_and_staging ( metadata) ;
151
+ }
152
+ Mode :: Query => {
153
+ overwrite_remote = true ;
154
+ update_metadata_mode_and_staging ( metadata) ;
155
+ }
156
+ _ => { }
157
+ }
158
+ if PARSEABLE . options . mode == Mode :: All {
159
+ metadata. server_mode . standalone_after_distributed ( ) ?;
160
+ }
161
+ Ok ( ( metadata. clone ( ) , overwrite_staging, overwrite_remote) )
162
+ }
163
+
164
+ fn handle_new_remote_env ( ) -> Result < ( StorageMetadata , bool , bool ) , ObjectStorageError > {
165
+ Err ( ObjectStorageError :: UnhandledError ( format ! (
166
+ "Could not start the server because staging directory indicates stale data from previous deployment, please choose an empty staging directory and restart the server. {}" ,
167
+ JOIN_COMMUNITY
168
+ ) . into ( ) ) )
169
+ }
170
+
171
+ fn handle_new_staging_env (
172
+ metadata : & mut StorageMetadata ,
173
+ ) -> Result < ( StorageMetadata , bool , bool ) , ObjectStorageError > {
174
+ if metadata. server_mode == Mode :: All && PARSEABLE . options . mode == Mode :: Ingest {
175
+ return Err ( ObjectStorageError :: UnhandledError (
176
+ format ! (
177
+ "Starting Ingest Mode is not allowed, Since Query Server has not been started yet. {}" ,
178
+ JOIN_COMMUNITY
179
+ )
180
+ . into ( ) ,
181
+ ) ) ;
182
+ }
183
+ create_dir_all ( PARSEABLE . options . staging_dir ( ) ) ?;
184
+ metadata. staging = PARSEABLE . options . staging_dir ( ) . canonicalize ( ) ?;
185
+ let overwrite_staging = true ;
186
+ let mut overwrite_remote = false ;
187
+
188
+ match PARSEABLE . options . mode {
189
+ Mode :: All => {
190
+ metadata
191
+ . server_mode
192
+ . standalone_after_distributed ( )
193
+ . map_err ( |err| ObjectStorageError :: Custom ( err. to_string ( ) ) ) ?;
194
+ overwrite_remote = true ;
195
+ }
196
+ Mode :: Query | Mode :: Prism | Mode :: Ingest | Mode :: Index => {
197
+ update_metadata_mode_and_staging ( metadata) ;
198
+ if matches ! ( PARSEABLE . options. mode, Mode :: Query | Mode :: Prism ) {
199
+ overwrite_remote = true ;
200
+ }
201
+ }
202
+ }
203
+ Ok ( ( metadata. clone ( ) , overwrite_staging, overwrite_remote) )
204
+ }
205
+
206
+ fn handle_create_both_env ( ) -> Result < ( StorageMetadata , bool , bool ) , ObjectStorageError > {
207
+ create_dir_all ( PARSEABLE . options . staging_dir ( ) ) ?;
208
+ let metadata = StorageMetadata :: default ( ) ;
209
+ let overwrite_remote = matches ! (
210
+ PARSEABLE . options. mode,
211
+ Mode :: All | Mode :: Query | Mode :: Prism
212
+ ) ;
213
+ let overwrite_staging = true ;
214
+ Ok ( ( metadata, overwrite_staging, overwrite_remote) )
215
+ }
216
+
205
217
fn update_metadata_mode_and_staging ( metadata : & mut StorageMetadata ) {
206
218
metadata. server_mode = PARSEABLE . options . mode ;
207
219
metadata. staging = PARSEABLE . options . staging_dir ( ) . to_path_buf ( ) ;
0 commit comments