Skip to content

Commit 5d82a1a

Browse files
fix: multiple fixes around system stability under load
1. perform object store sync for all streams in parallel 2. remove restriction of multi threading to utilise all available cores 3. add atomicity in conversion by - i. each conversion task processes one minute of arrows ii. move arrow files to inprocess folder to maintain atomicity iii. add a init sync task to process all pending files iv. add tokio sleep of 5 secs in shutdown task to let complete ongoing jobs v. remove unwrap of write locks to avoid thread poisoning
1 parent 970a5a5 commit 5d82a1a

File tree

7 files changed

+481
-213
lines changed

7 files changed

+481
-213
lines changed

src/handlers/http/health_check.rs

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,12 @@ use actix_web::{
2929
use http::StatusCode;
3030
use once_cell::sync::Lazy;
3131
use tokio::{sync::Mutex, task::JoinSet};
32-
use tracing::{error, info, warn};
32+
use tracing::{error, info};
3333

34-
use crate::parseable::PARSEABLE;
34+
use crate::{parseable::PARSEABLE, storage::object_storage::sync_all_streams};
3535

3636
// Create a global variable to store signal status
37-
static SIGNAL_RECEIVED: Lazy<Arc<Mutex<bool>>> = Lazy::new(|| Arc::new(Mutex::new(false)));
37+
pub static SIGNAL_RECEIVED: Lazy<Arc<Mutex<bool>>> = Lazy::new(|| Arc::new(Mutex::new(false)));
3838

3939
pub async fn liveness() -> HttpResponse {
4040
HttpResponse::new(StatusCode::OK)
@@ -60,28 +60,33 @@ pub async fn shutdown() {
6060
let mut shutdown_flag = SIGNAL_RECEIVED.lock().await;
6161
*shutdown_flag = true;
6262

63-
let mut joinset = JoinSet::new();
63+
//sleep for 5 secs to allow any ongoing requests to finish
64+
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
65+
let mut local_sync_joinset = JoinSet::new();
6466

6567
// Sync staging
66-
PARSEABLE.streams.flush_and_convert(&mut joinset, true);
68+
PARSEABLE
69+
.streams
70+
.flush_and_convert(&mut local_sync_joinset, false, true);
6771

68-
while let Some(res) = joinset.join_next().await {
72+
while let Some(res) = local_sync_joinset.join_next().await {
6973
match res {
7074
Ok(Ok(_)) => info!("Successfully converted arrow files to parquet."),
71-
Ok(Err(err)) => warn!("Failed to convert arrow files to parquet. {err:?}"),
75+
Ok(Err(err)) => error!("Failed to convert arrow files to parquet. {err:?}"),
7276
Err(err) => error!("Failed to join async task: {err}"),
7377
}
7478
}
7579

76-
if let Err(e) = PARSEABLE
77-
.storage
78-
.get_object_store()
79-
.upload_files_from_staging()
80-
.await
81-
{
82-
warn!("Failed to sync local data with object store. {:?}", e);
83-
} else {
84-
info!("Successfully synced all data to S3.");
80+
// Sync object store
81+
let mut object_store_joinset = JoinSet::new();
82+
sync_all_streams(&mut object_store_joinset);
83+
84+
while let Some(res) = object_store_joinset.join_next().await {
85+
match res {
86+
Ok(Ok(_)) => info!("Successfully synced all data to S3."),
87+
Ok(Err(err)) => error!("Failed to sync local data with object store. {err:?}"),
88+
Err(err) => error!("Failed to join async task: {err}"),
89+
}
8590
}
8691
}
8792

src/handlers/http/modal/query_server.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use crate::handlers::http::{logstream, MAX_EVENT_PAYLOAD_SIZE};
2727
use crate::handlers::http::{rbac, role};
2828
use crate::hottier::HotTierManager;
2929
use crate::rbac::role::Action;
30+
use crate::sync::sync_start;
3031
use crate::{analytics, migration, storage, sync};
3132
use actix_web::web::{resource, ServiceConfig};
3233
use actix_web::{web, Scope};
@@ -126,6 +127,13 @@ impl ParseableServer for QueryServer {
126127
if init_cluster_metrics_schedular().is_ok() {
127128
info!("Cluster metrics scheduler started successfully");
128129
}
130+
131+
// local sync on init
132+
tokio::spawn(async {
133+
if let Err(e) = sync_start().await {
134+
tracing::warn!("local sync on server start failed: {e}");
135+
}
136+
});
129137
if let Some(hot_tier_manager) = HotTierManager::global() {
130138
hot_tier_manager.put_internal_stream_hot_tier().await?;
131139
hot_tier_manager.download_from_s3()?;

src/handlers/http/modal/server.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use crate::metrics;
3333
use crate::migration;
3434
use crate::storage;
3535
use crate::sync;
36+
use crate::sync::sync_start;
3637

3738
use actix_web::web;
3839
use actix_web::web::resource;
@@ -122,6 +123,13 @@ impl ParseableServer for Server {
122123

123124
storage::retention::load_retention_from_global();
124125

126+
// local sync on init
127+
tokio::spawn(async {
128+
if let Err(e) = sync_start().await {
129+
tracing::warn!("local sync on server start failed: {e}");
130+
}
131+
});
132+
125133
if let Some(hot_tier_manager) = HotTierManager::global() {
126134
hot_tier_manager.download_from_s3()?;
127135
};

src/parseable/staging/reader.rs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use arrow_ipc::{reader::StreamReader, root_as_message_unchecked, MessageHeader};
3030
use arrow_schema::Schema;
3131
use byteorder::{LittleEndian, ReadBytesExt};
3232
use itertools::kmerge_by;
33-
use tracing::{error, warn};
33+
use tracing::error;
3434

3535
use crate::{
3636
event::DEFAULT_TIMESTAMP_KEY,
@@ -85,20 +85,22 @@ impl MergedReverseRecordReader {
8585
pub fn try_new(file_paths: &[PathBuf]) -> Self {
8686
let mut readers = Vec::with_capacity(file_paths.len());
8787
for path in file_paths {
88-
let Ok(file) = File::open(path) else {
89-
warn!("Error when trying to read file: {path:?}");
90-
continue;
91-
};
92-
93-
let reader = match get_reverse_reader(file) {
94-
Ok(r) => r,
88+
match File::open(path) {
9589
Err(err) => {
96-
error!("Invalid file detected, ignoring it: {path:?}; error = {err}");
90+
error!("Error when trying to read file: {path:?}; error = {err}");
9791
continue;
9892
}
99-
};
100-
101-
readers.push(reader);
93+
Ok(file) => {
94+
let reader = match get_reverse_reader(file) {
95+
Ok(r) => r,
96+
Err(err) => {
97+
error!("Invalid file detected, ignoring it: {path:?}; error = {err}");
98+
continue;
99+
}
100+
};
101+
readers.push(reader);
102+
}
103+
}
102104
}
103105

104106
Self { readers }

0 commit comments

Comments
 (0)