Skip to content

Commit d62ff56

Browse files
add monitoring of task in local and object store sync
1 parent 0af4b7e commit d62ff56

File tree

1 file changed

+63
-43
lines changed

1 file changed

+63
-43
lines changed

src/sync.rs

Lines changed: 63 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@ pub async fn handler(mut cancel_rx: oneshot::Receiver<()>) -> anyhow::Result<()>
8484
loop {
8585
select! {
8686
_ = &mut cancel_rx => {
87-
// actix server finished .. stop other threads and stop the server
8887
remote_sync_inbox.send(()).unwrap_or(());
8988
localsync_inbox.send(()).unwrap_or(());
9089
if let Err(e) = localsync_handler.await {
@@ -96,12 +95,9 @@ pub async fn handler(mut cancel_rx: oneshot::Receiver<()>) -> anyhow::Result<()>
9695
return Ok(());
9796
},
9897
_ = &mut localsync_outbox => {
99-
// crash the server if localsync fails for any reason
100-
// panic!("Local Sync thread died. Server will fail now!")
10198
return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable"))
10299
},
103100
_ = &mut remote_sync_outbox => {
104-
// remote_sync failed, this is recoverable by just starting remote_sync thread again
105101
if let Err(e) = remote_sync_handler.await {
106102
error!("Error joining remote_sync_handler: {e:?}");
107103
}
@@ -125,16 +121,26 @@ pub fn object_store_sync() -> (
125121

126122
let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move {
127123
let mut sync_interval = interval_at(next_minute(), STORAGE_UPLOAD_INTERVAL);
128-
let mut joinset = JoinSet::new();
129124

130125
loop {
131126
select! {
132127
_ = sync_interval.tick() => {
133128
trace!("Syncing Parquets to Object Store... ");
134-
sync_all_streams(&mut joinset)
135-
},
136-
Some(res) = joinset.join_next(), if !joinset.is_empty() => {
137-
log_join_result(res, "object store sync");
129+
130+
// Monitor the duration of sync_all_streams execution
131+
monitor_task_duration(
132+
"object_store_sync_all_streams",
133+
Duration::from_secs(15),
134+
|| async {
135+
let mut joinset = JoinSet::new();
136+
sync_all_streams(&mut joinset);
137+
138+
// Wait for all spawned tasks to complete
139+
while let Some(res) = joinset.join_next().await {
140+
log_join_result(res, "object store sync");
141+
}
142+
}
143+
).await;
138144
},
139145
res = &mut inbox_rx => {
140146
match res {
@@ -147,10 +153,6 @@ pub fn object_store_sync() -> (
147153
}
148154
}
149155
}
150-
// Drain remaining joinset tasks
151-
while let Some(res) = joinset.join_next().await {
152-
log_join_result(res, "object store sync");
153-
}
154156
}));
155157

156158
match result {
@@ -184,32 +186,37 @@ pub fn local_sync() -> (
184186

185187
let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move {
186188
let mut sync_interval = interval_at(next_minute(), LOCAL_SYNC_INTERVAL);
187-
let mut joinset = JoinSet::new();
188189

189190
loop {
190191
select! {
191192
// Spawns a flush+conversion task every `LOCAL_SYNC_INTERVAL` seconds
192193
_ = sync_interval.tick() => {
193-
PARSEABLE.streams.flush_and_convert(&mut joinset, false, false)
194+
// Monitor the duration of flush_and_convert execution
195+
monitor_task_duration(
196+
"local_sync_flush_and_convert",
197+
Duration::from_secs(15),
198+
|| async {
199+
let mut joinset = JoinSet::new();
200+
PARSEABLE.streams.flush_and_convert(&mut joinset, false, false);
201+
202+
// Wait for all spawned tasks to complete
203+
while let Some(res) = joinset.join_next().await {
204+
log_join_result(res, "flush and convert");
205+
}
206+
}
207+
).await;
194208
},
195-
// Joins and logs errors in spawned tasks
196-
Some(res) = joinset.join_next(), if !joinset.is_empty() => {
197-
log_join_result(res, "flush and convert");
198-
}
199-
res = &mut inbox_rx => {match res{
200-
Ok(_) => break,
201-
Err(_) => {
202-
warn!("Inbox channel closed unexpectedly");
203-
break;
204-
}}
209+
res = &mut inbox_rx => {
210+
match res {
211+
Ok(_) => break,
212+
Err(_) => {
213+
warn!("Inbox channel closed unexpectedly");
214+
break;
215+
}
216+
}
205217
}
206218
}
207219
}
208-
209-
// Drain remaining joinset tasks
210-
while let Some(res) = joinset.join_next().await {
211-
log_join_result(res, "flush and convert");
212-
}
213220
}));
214221

215222
match result {
@@ -228,21 +235,34 @@ pub fn local_sync() -> (
228235
(handle, outbox_rx, inbox_tx)
229236
}
230237

231-
// local sync at the start of the server
238+
// local and object store sync at the start of the server
232239
pub async fn sync_start() -> anyhow::Result<()> {
233-
let mut local_sync_joinset = JoinSet::new();
234-
PARSEABLE
235-
.streams
236-
.flush_and_convert(&mut local_sync_joinset, true, false);
237-
while let Some(res) = local_sync_joinset.join_next().await {
238-
log_join_result(res, "flush and convert");
239-
}
240+
// Monitor local sync duration at startup
241+
monitor_task_duration("startup_local_sync", Duration::from_secs(15), || async {
242+
let mut local_sync_joinset = JoinSet::new();
243+
PARSEABLE
244+
.streams
245+
.flush_and_convert(&mut local_sync_joinset, true, false);
246+
while let Some(res) = local_sync_joinset.join_next().await {
247+
log_join_result(res, "flush and convert");
248+
}
249+
})
250+
.await;
251+
252+
// Monitor object store sync duration at startup
253+
monitor_task_duration(
254+
"startup_object_store_sync",
255+
Duration::from_secs(15),
256+
|| async {
257+
let mut object_store_joinset = JoinSet::new();
258+
sync_all_streams(&mut object_store_joinset);
259+
while let Some(res) = object_store_joinset.join_next().await {
260+
log_join_result(res, "object store sync");
261+
}
262+
},
263+
)
264+
.await;
240265

241-
let mut object_store_joinset = JoinSet::new();
242-
sync_all_streams(&mut object_store_joinset);
243-
while let Some(res) = object_store_joinset.join_next().await {
244-
log_join_result(res, "object store sync");
245-
}
246266
Ok(())
247267
}
248268

0 commit comments

Comments
 (0)