Skip to content

Commit 97cc3cf

Browse files
authored
Merge branch 'main' into counts-api-update
2 parents cb70c09 + cfd1348 commit 97cc3cf

File tree

9 files changed

+642
-295
lines changed

9 files changed

+642
-295
lines changed

src/handlers/http/health_check.rs

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,12 @@ use actix_web::{
2929
use http::StatusCode;
3030
use once_cell::sync::Lazy;
3131
use tokio::{sync::Mutex, task::JoinSet};
32-
use tracing::{error, info, warn};
32+
use tracing::{error, info};
3333

34-
use crate::parseable::PARSEABLE;
34+
use crate::{parseable::PARSEABLE, storage::object_storage::sync_all_streams};
3535

3636
// Create a global variable to store signal status
37-
static SIGNAL_RECEIVED: Lazy<Arc<Mutex<bool>>> = Lazy::new(|| Arc::new(Mutex::new(false)));
37+
pub static SIGNAL_RECEIVED: Lazy<Arc<Mutex<bool>>> = Lazy::new(|| Arc::new(Mutex::new(false)));
3838

3939
pub async fn liveness() -> HttpResponse {
4040
HttpResponse::new(StatusCode::OK)
@@ -60,28 +60,33 @@ pub async fn shutdown() {
6060
let mut shutdown_flag = SIGNAL_RECEIVED.lock().await;
6161
*shutdown_flag = true;
6262

63-
let mut joinset = JoinSet::new();
63+
//sleep for 5 secs to allow any ongoing requests to finish
64+
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
65+
let mut local_sync_joinset = JoinSet::new();
6466

6567
// Sync staging
66-
PARSEABLE.streams.flush_and_convert(&mut joinset, true);
68+
PARSEABLE
69+
.streams
70+
.flush_and_convert(&mut local_sync_joinset, false, true);
6771

68-
while let Some(res) = joinset.join_next().await {
72+
while let Some(res) = local_sync_joinset.join_next().await {
6973
match res {
7074
Ok(Ok(_)) => info!("Successfully converted arrow files to parquet."),
71-
Ok(Err(err)) => warn!("Failed to convert arrow files to parquet. {err:?}"),
75+
Ok(Err(err)) => error!("Failed to convert arrow files to parquet. {err:?}"),
7276
Err(err) => error!("Failed to join async task: {err}"),
7377
}
7478
}
7579

76-
if let Err(e) = PARSEABLE
77-
.storage
78-
.get_object_store()
79-
.upload_files_from_staging()
80-
.await
81-
{
82-
warn!("Failed to sync local data with object store. {:?}", e);
83-
} else {
84-
info!("Successfully synced all data to S3.");
80+
// Sync object store
81+
let mut object_store_joinset = JoinSet::new();
82+
sync_all_streams(&mut object_store_joinset);
83+
84+
while let Some(res) = object_store_joinset.join_next().await {
85+
match res {
86+
Ok(Ok(_)) => info!("Successfully synced all data to S3."),
87+
Ok(Err(err)) => error!("Failed to sync local data with object store. {err:?}"),
88+
Err(err) => error!("Failed to join async task: {err}"),
89+
}
8590
}
8691
}
8792

src/handlers/http/modal/ingest_server.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use tokio::sync::oneshot;
3131
use tokio::sync::OnceCell;
3232

3333
use crate::handlers::http::modal::NodeType;
34+
use crate::sync::sync_start;
3435
use crate::{
3536
analytics,
3637
handlers::{
@@ -114,6 +115,13 @@ impl ParseableServer for IngestServer {
114115

115116
migration::run_migration(&PARSEABLE).await?;
116117

118+
// local sync on init
119+
let startup_sync_handle = tokio::spawn(async {
120+
if let Err(e) = sync_start().await {
121+
tracing::warn!("local sync on server start failed: {e}");
122+
}
123+
});
124+
117125
// Run sync on a background thread
118126
let (cancel_tx, cancel_rx) = oneshot::channel();
119127
thread::spawn(|| sync::handler(cancel_rx));
@@ -124,7 +132,9 @@ impl ParseableServer for IngestServer {
124132
let result = self.start(shutdown_rx, prometheus.clone(), None).await;
125133
// Cancel sync jobs
126134
cancel_tx.send(()).expect("Cancellation should not fail");
127-
135+
if let Err(join_err) = startup_sync_handle.await {
136+
tracing::warn!("startup sync task panicked: {join_err}");
137+
}
128138
result
129139
}
130140
}

src/handlers/http/modal/query_server.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use crate::handlers::http::{logstream, MAX_EVENT_PAYLOAD_SIZE};
2727
use crate::handlers::http::{rbac, role};
2828
use crate::hottier::HotTierManager;
2929
use crate::rbac::role::Action;
30+
use crate::sync::sync_start;
3031
use crate::{analytics, migration, storage, sync};
3132
use actix_web::web::{resource, ServiceConfig};
3233
use actix_web::{web, Scope};
@@ -126,6 +127,13 @@ impl ParseableServer for QueryServer {
126127
if init_cluster_metrics_schedular().is_ok() {
127128
info!("Cluster metrics scheduler started successfully");
128129
}
130+
131+
// local sync on init
132+
let startup_sync_handle = tokio::spawn(async {
133+
if let Err(e) = sync_start().await {
134+
tracing::warn!("local sync on server start failed: {e}");
135+
}
136+
});
129137
if let Some(hot_tier_manager) = HotTierManager::global() {
130138
hot_tier_manager.put_internal_stream_hot_tier().await?;
131139
hot_tier_manager.download_from_s3()?;
@@ -142,7 +150,9 @@ impl ParseableServer for QueryServer {
142150
.await?;
143151
// Cancel sync jobs
144152
cancel_tx.send(()).expect("Cancellation should not fail");
145-
153+
if let Err(join_err) = startup_sync_handle.await {
154+
tracing::warn!("startup sync task panicked: {join_err}");
155+
}
146156
Ok(result)
147157
}
148158
}

src/handlers/http/modal/server.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use crate::metrics;
3333
use crate::migration;
3434
use crate::storage;
3535
use crate::sync;
36+
use crate::sync::sync_start;
3637

3738
use actix_web::web;
3839
use actix_web::web::resource;
@@ -122,6 +123,13 @@ impl ParseableServer for Server {
122123

123124
storage::retention::load_retention_from_global();
124125

126+
// local sync on init
127+
let startup_sync_handle = tokio::spawn(async {
128+
if let Err(e) = sync_start().await {
129+
tracing::warn!("local sync on server start failed: {e}");
130+
}
131+
});
132+
125133
if let Some(hot_tier_manager) = HotTierManager::global() {
126134
hot_tier_manager.download_from_s3()?;
127135
};
@@ -142,7 +150,9 @@ impl ParseableServer for Server {
142150
.await;
143151
// Cancel sync jobs
144152
cancel_tx.send(()).expect("Cancellation should not fail");
145-
153+
if let Err(join_err) = startup_sync_handle.await {
154+
tracing::warn!("startup sync task panicked: {join_err}");
155+
}
146156
return result;
147157
}
148158
}

src/parseable/staging/reader.rs

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use arrow_ipc::{reader::StreamReader, root_as_message_unchecked, MessageHeader};
3030
use arrow_schema::Schema;
3131
use byteorder::{LittleEndian, ReadBytesExt};
3232
use itertools::kmerge_by;
33-
use tracing::{error, warn};
33+
use tracing::error;
3434

3535
use crate::{
3636
event::DEFAULT_TIMESTAMP_KEY,
@@ -48,18 +48,26 @@ impl MergedRecordReader {
4848

4949
for file in files {
5050
//remove empty files before reading
51-
if file.metadata().unwrap().len() == 0 {
52-
error!("Invalid file detected, removing it: {:?}", file);
53-
remove_file(file).unwrap();
54-
} else {
55-
let Ok(reader) =
56-
StreamReader::try_new(BufReader::new(File::open(file).unwrap()), None)
57-
else {
58-
error!("Invalid file detected, ignoring it: {:?}", file);
51+
match file.metadata() {
52+
Err(err) => {
53+
error!("Error when trying to read file: {file:?}; error = {err}");
5954
continue;
60-
};
61-
62-
readers.push(reader);
55+
}
56+
Ok(metadata) if metadata.len() == 0 => {
57+
error!("Empty file detected, removing it: {:?}", file);
58+
remove_file(file).unwrap();
59+
continue;
60+
}
61+
Ok(_) => {
62+
let Ok(reader) =
63+
StreamReader::try_new(BufReader::new(File::open(file).unwrap()), None)
64+
else {
65+
error!("Invalid file detected, ignoring it: {:?}", file);
66+
continue;
67+
};
68+
69+
readers.push(reader);
70+
}
6371
}
6472
}
6573

@@ -85,20 +93,22 @@ impl MergedReverseRecordReader {
8593
pub fn try_new(file_paths: &[PathBuf]) -> Self {
8694
let mut readers = Vec::with_capacity(file_paths.len());
8795
for path in file_paths {
88-
let Ok(file) = File::open(path) else {
89-
warn!("Error when trying to read file: {path:?}");
90-
continue;
91-
};
92-
93-
let reader = match get_reverse_reader(file) {
94-
Ok(r) => r,
96+
match File::open(path) {
9597
Err(err) => {
96-
error!("Invalid file detected, ignoring it: {path:?}; error = {err}");
98+
error!("Error when trying to read file: {path:?}; error = {err}");
9799
continue;
98100
}
99-
};
100-
101-
readers.push(reader);
101+
Ok(file) => {
102+
let reader = match get_reverse_reader(file) {
103+
Ok(r) => r,
104+
Err(err) => {
105+
error!("Invalid file detected, ignoring it: {path:?}; error = {err}");
106+
continue;
107+
}
108+
};
109+
readers.push(reader);
110+
}
111+
}
102112
}
103113

104114
Self { readers }

0 commit comments

Comments
 (0)