Skip to content

fix: multiple fixes around system stability under load #1346

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jun 16, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 21 additions & 16 deletions src/handlers/http/health_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ use actix_web::{
use http::StatusCode;
use once_cell::sync::Lazy;
use tokio::{sync::Mutex, task::JoinSet};
use tracing::{error, info, warn};
use tracing::{error, info};

use crate::parseable::PARSEABLE;
use crate::{parseable::PARSEABLE, storage::object_storage::sync_all_streams};

// Create a global variable to store signal status
static SIGNAL_RECEIVED: Lazy<Arc<Mutex<bool>>> = Lazy::new(|| Arc::new(Mutex::new(false)));
pub static SIGNAL_RECEIVED: Lazy<Arc<Mutex<bool>>> = Lazy::new(|| Arc::new(Mutex::new(false)));

pub async fn liveness() -> HttpResponse {
HttpResponse::new(StatusCode::OK)
Expand All @@ -60,28 +60,33 @@ pub async fn shutdown() {
let mut shutdown_flag = SIGNAL_RECEIVED.lock().await;
*shutdown_flag = true;

let mut joinset = JoinSet::new();
//sleep for 5 secs to allow any ongoing requests to finish
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
let mut local_sync_joinset = JoinSet::new();

// Sync staging
PARSEABLE.streams.flush_and_convert(&mut joinset, true);
PARSEABLE
.streams
.flush_and_convert(&mut local_sync_joinset, false, true);

while let Some(res) = joinset.join_next().await {
while let Some(res) = local_sync_joinset.join_next().await {
match res {
Ok(Ok(_)) => info!("Successfully converted arrow files to parquet."),
Ok(Err(err)) => warn!("Failed to convert arrow files to parquet. {err:?}"),
Ok(Err(err)) => error!("Failed to convert arrow files to parquet. {err:?}"),
Err(err) => error!("Failed to join async task: {err}"),
}
}

if let Err(e) = PARSEABLE
.storage
.get_object_store()
.upload_files_from_staging()
.await
{
warn!("Failed to sync local data with object store. {:?}", e);
} else {
info!("Successfully synced all data to S3.");
// Sync object store
let mut object_store_joinset = JoinSet::new();
sync_all_streams(&mut object_store_joinset);

while let Some(res) = object_store_joinset.join_next().await {
match res {
Ok(Ok(_)) => info!("Successfully synced all data to S3."),
Ok(Err(err)) => error!("Failed to sync local data with object store. {err:?}"),
Err(err) => error!("Failed to join async task: {err}"),
}
}
}

Expand Down
8 changes: 8 additions & 0 deletions src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::handlers::http::{logstream, MAX_EVENT_PAYLOAD_SIZE};
use crate::handlers::http::{rbac, role};
use crate::hottier::HotTierManager;
use crate::rbac::role::Action;
use crate::sync::sync_start;
use crate::{analytics, migration, storage, sync};
use actix_web::web::{resource, ServiceConfig};
use actix_web::{web, Scope};
Expand Down Expand Up @@ -126,6 +127,13 @@ impl ParseableServer for QueryServer {
if init_cluster_metrics_schedular().is_ok() {
info!("Cluster metrics scheduler started successfully");
}

// local sync on init
tokio::spawn(async {
if let Err(e) = sync_start().await {
tracing::warn!("local sync on server start failed: {e}");
}
});
if let Some(hot_tier_manager) = HotTierManager::global() {
hot_tier_manager.put_internal_stream_hot_tier().await?;
hot_tier_manager.download_from_s3()?;
Expand Down
8 changes: 8 additions & 0 deletions src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::metrics;
use crate::migration;
use crate::storage;
use crate::sync;
use crate::sync::sync_start;

use actix_web::web;
use actix_web::web::resource;
Expand Down Expand Up @@ -122,6 +123,13 @@ impl ParseableServer for Server {

storage::retention::load_retention_from_global();

// local sync on init
tokio::spawn(async {
if let Err(e) = sync_start().await {
tracing::warn!("local sync on server start failed: {e}");
}
});

if let Some(hot_tier_manager) = HotTierManager::global() {
hot_tier_manager.download_from_s3()?;
};
Expand Down
155 changes: 115 additions & 40 deletions src/otel/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,6 @@ fn flatten_span_record(span_record: &Span) -> Vec<Map<String, Value>> {
span_records_json
}


#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -360,13 +359,21 @@ mod tests {
KeyValue {
key: "service.name".to_string(),
value: Some(AnyValue {
value: Some(opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue("test-service".to_string())),
value: Some(
opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(
"test-service".to_string(),
),
),
}),
},
KeyValue {
key: "http.method".to_string(),
value: Some(AnyValue {
value: Some(opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue("GET".to_string())),
value: Some(
opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue(
"GET".to_string(),
),
),
}),
},
]
Expand Down Expand Up @@ -398,7 +405,8 @@ mod tests {
assert_eq!(
result.get("span_status_description").unwrap(),
&Value::String(expected_description.to_string()),
"Status description should match expected value for code {}", code
"Status description should match expected value for code {}",
code
);
assert_eq!(
result.get("span_status_message").unwrap(),
Expand Down Expand Up @@ -432,7 +440,8 @@ mod tests {
assert_eq!(
result.get("span_kind_description").unwrap(),
&Value::String(expected_description.to_string()),
"Span kind description should match expected value for kind {}", kind
"Span kind description should match expected value for kind {}",
kind
);
}
}
Expand All @@ -459,7 +468,8 @@ mod tests {
assert_eq!(
result.get("span_flags_description").unwrap(),
&Value::String(expected_description.to_string()),
"Span flags description should match expected value for flags {}", flags
"Span flags description should match expected value for flags {}",
flags
);
}
}
Expand Down Expand Up @@ -488,7 +498,10 @@ mod tests {

// Check first event
let first_event = &result[0];
assert!(first_event.contains_key("event_time_unix_nano"), "Should contain timestamp");
assert!(
first_event.contains_key("event_time_unix_nano"),
"Should contain timestamp"
);
assert_eq!(
first_event.get("event_name").unwrap(),
&Value::String("request.start".to_string()),
Expand All @@ -499,7 +512,10 @@ mod tests {
&Value::Number(2.into()),
"Dropped attributes count should be preserved"
);
assert!(first_event.contains_key("service.name"), "Should contain flattened attributes");
assert!(
first_event.contains_key("service.name"),
"Should contain flattened attributes"
);

// Check second event
let second_event = &result[1];
Expand All @@ -518,16 +534,14 @@ mod tests {
#[test]
fn test_flatten_links_structure() {
// Test that links are properly flattened with all expected fields
let links = vec![
Link {
trace_id: sample_trace_id(),
span_id: sample_span_id(),
trace_state: "state1".to_string(),
attributes: sample_attributes(),
dropped_attributes_count: 1,
flags: 0,
},
];
let links = vec![Link {
trace_id: sample_trace_id(),
span_id: sample_span_id(),
trace_state: "state1".to_string(),
attributes: sample_attributes(),
dropped_attributes_count: 1,
flags: 0,
}];

let result = flatten_links(&links);

Expand All @@ -549,7 +563,10 @@ mod tests {
&Value::Number(1.into()),
"Dropped attributes count should be preserved"
);
assert!(link.contains_key("service.name"), "Should contain flattened attributes");
assert!(
link.contains_key("service.name"),
"Should contain flattened attributes"
);
}

#[test]
Expand Down Expand Up @@ -611,12 +628,30 @@ mod tests {
&Value::String("SPAN_KIND_SERVER".to_string()),
"All records should contain span kind description"
);
assert!(record.contains_key("span_trace_id"), "Should contain trace ID");
assert!(record.contains_key("span_span_id"), "Should contain span ID");
assert!(record.contains_key("span_start_time_unix_nano"), "Should contain start time");
assert!(record.contains_key("span_end_time_unix_nano"), "Should contain end time");
assert!(record.contains_key("service.name"), "Should contain span attributes");
assert!(record.contains_key("span_status_code"), "Should contain status");
assert!(
record.contains_key("span_trace_id"),
"Should contain trace ID"
);
assert!(
record.contains_key("span_span_id"),
"Should contain span ID"
);
assert!(
record.contains_key("span_start_time_unix_nano"),
"Should contain start time"
);
assert!(
record.contains_key("span_end_time_unix_nano"),
"Should contain end time"
);
assert!(
record.contains_key("service.name"),
"Should contain span attributes"
);
assert!(
record.contains_key("span_status_code"),
"Should contain status"
);
}

// One record should be an event, one should be a link
Expand Down Expand Up @@ -650,17 +685,30 @@ mod tests {

let result = flatten_span_record(&span);

assert_eq!(result.len(), 1, "Should have exactly one record for span without events/links");
assert_eq!(
result.len(),
1,
"Should have exactly one record for span without events/links"
);

let record = &result[0];
assert_eq!(
record.get("span_name").unwrap(),
&Value::String("simple-span".to_string()),
"Should contain span name"
);
assert!(!record.contains_key("event_name"), "Should not contain event fields");
assert!(!record.contains_key("link_trace_id"), "Should not contain link fields");
assert!(!record.contains_key("span_status_code"), "Should not contain status when none provided");
assert!(
!record.contains_key("event_name"),
"Should not contain event fields"
);
assert!(
!record.contains_key("link_trace_id"),
"Should not contain link fields"
);
assert!(
!record.contains_key("span_status_code"),
"Should not contain status when none provided"
);
}

#[test]
Expand Down Expand Up @@ -705,10 +753,16 @@ mod tests {
assert_eq!(hex_span_id, "12345678", "Span ID should be lowercase hex");
}
if let Some(Value::String(hex_parent_span_id)) = record.get("span_parent_span_id") {
assert_eq!(hex_parent_span_id, "87654321", "Parent span ID should be lowercase hex");
assert_eq!(
hex_parent_span_id, "87654321",
"Parent span ID should be lowercase hex"
);
}
if let Some(Value::String(link_trace_id)) = record.get("link_trace_id") {
assert_eq!(link_trace_id, "ffabcdef", "Link trace ID should be lowercase hex");
assert_eq!(
link_trace_id, "ffabcdef",
"Link trace ID should be lowercase hex"
);
}
}
}
Expand Down Expand Up @@ -823,15 +877,36 @@ mod tests {
fn test_known_field_list_completeness() {
// Test that the OTEL_TRACES_KNOWN_FIELD_LIST contains all expected fields
let expected_fields = [
"scope_name", "scope_version", "scope_schema_url", "scope_dropped_attributes_count",
"resource_schema_url", "resource_dropped_attributes_count",
"span_trace_id", "span_span_id", "span_name", "span_parent_span_id", "name",
"span_kind", "span_kind_description", "span_start_time_unix_nano", "span_end_time_unix_nano",
"event_name", "event_time_unix_nano", "event_dropped_attributes_count",
"link_span_id", "link_trace_id", "link_dropped_attributes_count",
"span_dropped_events_count", "span_dropped_links_count", "span_dropped_attributes_count",
"span_trace_state", "span_flags", "span_flags_description",
"span_status_code", "span_status_description", "span_status_message",
"scope_name",
"scope_version",
"scope_schema_url",
"scope_dropped_attributes_count",
"resource_schema_url",
"resource_dropped_attributes_count",
"span_trace_id",
"span_span_id",
"span_name",
"span_parent_span_id",
"name",
"span_kind",
"span_kind_description",
"span_start_time_unix_nano",
"span_end_time_unix_nano",
"event_name",
"event_time_unix_nano",
"event_dropped_attributes_count",
"link_span_id",
"link_trace_id",
"link_dropped_attributes_count",
"span_dropped_events_count",
"span_dropped_links_count",
"span_dropped_attributes_count",
"span_trace_state",
"span_flags",
"span_flags_description",
"span_status_code",
"span_status_description",
"span_status_message",
];

assert_eq!(
Expand Down
Loading
Loading