diff --git a/src/cli.rs b/src/cli.rs index 52b6a05ca..cda2361c5 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -317,6 +317,34 @@ pub struct Options { )] pub parquet_compression: Compression, + // Resource monitoring + #[arg( + long, + env = "P_RESOURCE_CHECK_INTERVAL", + default_value = "15", + value_parser = validation::validate_seconds, + help = "Resource monitoring check interval in seconds" + )] + pub resource_check_interval: u64, + + #[arg( + long, + env = "P_CPU_THRESHOLD", + default_value = "80.0", + value_parser = validation::validate_percentage, + help = "CPU utilization threshold percentage (0.0-100.0) for resource monitoring" + )] + pub cpu_utilization_threshold: f32, + + #[arg( + long, + env = "P_MEMORY_THRESHOLD", + default_value = "80.0", + value_parser = validation::validate_percentage, + help = "Memory utilization threshold percentage (0.0-100.0) for resource monitoring" + )] + pub memory_utilization_threshold: f32, + // Integration features #[arg( long, diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index 874a2aed5..6b0e71cbd 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -36,6 +36,7 @@ pub mod cluster; pub mod correlation; pub mod health_check; pub mod ingest; +pub mod resource_check; mod kinesis; pub mod llm; pub mod logstream; diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index 8a8e1d1b1..c202a92c6 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -21,6 +21,7 @@ use std::thread; use actix_web::web; use actix_web::Scope; +use actix_web::middleware::from_fn; use actix_web_prometheus::PrometheusMetrics; use async_trait::async_trait; use base64::Engine; @@ -39,7 +40,7 @@ use crate::{ http::{ base_path, ingest, logstream, middleware::{DisAllowRootUser, RouteExt}, - role, + resource_check, role, }, }, migration, @@ -67,7 +68,10 @@ impl ParseableServer for IngestServer { .service( // Base path "{url}/api/v1" web::scope(&base_path()) - .service(Server::get_ingest_factory()) + .service( + Server::get_ingest_factory() + .wrap(from_fn(resource_check::check_resource_utilization_middleware)) + ) .service(Self::logstream_api()) .service(Server::get_about_factory()) .service(Self::analytics_factory()) @@ -77,7 +81,10 @@ impl ParseableServer for IngestServer { .service(Server::get_metrics_webscope()) .service(Server::get_readiness_factory()), ) - .service(Server::get_ingest_otel_factory()); + .service( + Server::get_ingest_otel_factory() + .wrap(from_fn(resource_check::check_resource_utilization_middleware)) + ); } async fn load_metadata(&self) -> anyhow::Result> { @@ -223,7 +230,8 @@ impl IngestServer { web::post() .to(ingest::post_event) .authorize_for_stream(Action::Ingest), - ), + ) + .wrap(from_fn(resource_check::check_resource_utilization_middleware)), ) .service( web::resource("/sync") diff --git a/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs index 3504ef0fa..1019d5d8a 100644 --- a/src/handlers/http/modal/mod.rs +++ b/src/handlers/http/modal/mod.rs @@ -45,7 +45,7 @@ use crate::{ utils::get_node_id, }; -use super::{audit, cross_origin_config, health_check, API_BASE_PATH, API_VERSION}; +use super::{audit, cross_origin_config, health_check, resource_check, API_BASE_PATH, API_VERSION}; pub mod ingest; pub mod ingest_server; @@ -107,6 +107,10 @@ pub trait ParseableServer { &PARSEABLE.options.trusted_ca_certs_path, )?; + // Start resource monitor + let (resource_shutdown_tx, resource_shutdown_rx) = oneshot::channel(); + resource_check::spawn_resource_monitor(resource_shutdown_rx); + // fn that creates the app let create_app_fn = move || { App::new() @@ -142,6 +146,9 @@ pub trait ParseableServer { health_check::shutdown().await; + // Shutdown resource monitor + let _ = resource_shutdown_tx.send(()); + // Initiate graceful shutdown info!("Graceful shutdown of HTTP server triggered"); srv_handle.stop(true).await; diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index b43fa68a9..75b10ce7b 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -22,13 +22,14 @@ use std::thread; use crate::handlers::airplane; use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular}; use crate::handlers::http::middleware::{DisAllowRootUser, RouteExt}; -use crate::handlers::http::{base_path, prism_base_path}; +use crate::handlers::http::{base_path, prism_base_path, resource_check}; use crate::handlers::http::{logstream, MAX_EVENT_PAYLOAD_SIZE}; use crate::handlers::http::{rbac, role}; use crate::hottier::HotTierManager; use crate::rbac::role::Action; use crate::sync::sync_start; use crate::{analytics, migration, storage, sync}; +use actix_web::middleware::from_fn; use actix_web::web::{resource, ServiceConfig}; use actix_web::{web, Scope}; use actix_web_prometheus::PrometheusMetrics; @@ -53,7 +54,10 @@ impl ParseableServer for QueryServer { .service( web::scope(&base_path()) .service(Server::get_correlation_webscope()) - .service(Server::get_query_factory()) + .service( + Server::get_query_factory() + .wrap(from_fn(resource_check::check_resource_utilization_middleware)) + ) .service(Server::get_liveness_factory()) .service(Server::get_readiness_factory()) .service(Server::get_about_factory()) @@ -66,7 +70,10 @@ impl ParseableServer for QueryServer { .service(Server::get_oauth_webscope(oidc_client)) .service(Self::get_user_role_webscope()) .service(Server::get_roles_webscope()) - .service(Server::get_counts_webscope()) + .service( + Server::get_counts_webscope() + .wrap(from_fn(resource_check::check_resource_utilization_middleware)) + ) .service(Server::get_metrics_webscope()) .service(Server::get_alerts_webscope()) .service(Self::get_cluster_web_scope()), diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index d22e5de02..ece5374de 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -25,6 +25,7 @@ use crate::handlers::http::alerts; use crate::handlers::http::base_path; use crate::handlers::http::health_check; use crate::handlers::http::prism_base_path; +use crate::handlers::http::resource_check; use crate::handlers::http::query; use crate::handlers::http::users::dashboards; use crate::handlers::http::users::filters; @@ -37,6 +38,7 @@ use crate::sync::sync_start; use actix_web::web; use actix_web::web::resource; +use actix_web::middleware::from_fn; use actix_web::Resource; use actix_web::Scope; use actix_web_prometheus::PrometheusMetrics; @@ -71,8 +73,14 @@ impl ParseableServer for Server { .service( web::scope(&base_path()) .service(Self::get_correlation_webscope()) - .service(Self::get_query_factory()) - .service(Self::get_ingest_factory()) + .service( + Self::get_query_factory() + .wrap(from_fn(resource_check::check_resource_utilization_middleware)) + ) + .service( + Self::get_ingest_factory() + .wrap(from_fn(resource_check::check_resource_utilization_middleware)) + ) .service(Self::get_liveness_factory()) .service(Self::get_readiness_factory()) .service(Self::get_about_factory()) @@ -85,7 +93,10 @@ impl ParseableServer for Server { .service(Self::get_oauth_webscope(oidc_client)) .service(Self::get_user_role_webscope()) .service(Self::get_roles_webscope()) - .service(Self::get_counts_webscope()) + .service( + Self::get_counts_webscope() + .wrap(from_fn(resource_check::check_resource_utilization_middleware)) + ) .service(Self::get_alerts_webscope()) .service(Self::get_metrics_webscope()), ) @@ -95,7 +106,10 @@ impl ParseableServer for Server { .service(Server::get_prism_logstream()) .service(Server::get_prism_datasets()), ) - .service(Self::get_ingest_otel_factory()) + .service( + Self::get_ingest_otel_factory() + .wrap(from_fn(resource_check::check_resource_utilization_middleware)) + ) .service(Self::get_generated()); } @@ -353,13 +367,14 @@ impl Server { .route( web::put() .to(logstream::put_stream) - .authorize_for_stream(Action::CreateStream), + .authorize_for_stream(Action::CreateStream) ) // POST "/logstream/{logstream}" ==> Post logs to given log stream .route( web::post() .to(ingest::post_event) - .authorize_for_stream(Action::Ingest), + .authorize_for_stream(Action::Ingest) + .wrap(from_fn(resource_check::check_resource_utilization_middleware)), ) // DELETE "/logstream/{logstream}" ==> Delete log stream .route( @@ -368,7 +383,7 @@ impl Server { .authorize_for_stream(Action::DeleteStream), ) .app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), - ) + ) .service( // GET "/logstream/{logstream}/info" ==> Get info for given log stream web::resource("/info").route( diff --git a/src/handlers/http/resource_check.rs b/src/handlers/http/resource_check.rs new file mode 100644 index 000000000..e1f08285f --- /dev/null +++ b/src/handlers/http/resource_check.rs @@ -0,0 +1,129 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::sync::{atomic::AtomicBool, Arc, LazyLock}; + +use actix_web::{ + body::MessageBody, + dev::{ServiceRequest, ServiceResponse}, + error::Error, + error::ErrorServiceUnavailable, + middleware::Next, +}; +use tokio::{select, time::{interval, Duration}}; +use tracing::{warn, trace, info}; + +use crate::analytics::{SYS_INFO, refresh_sys_info}; +use crate::parseable::PARSEABLE; + +static RESOURCE_CHECK_ENABLED:LazyLock> = LazyLock::new(|| Arc::new(AtomicBool::new(false))); + +/// Spawn a background task to monitor system resources +pub fn spawn_resource_monitor(shutdown_rx: tokio::sync::oneshot::Receiver<()>) { + tokio::spawn(async move { + let resource_check_interval = PARSEABLE.options.resource_check_interval; + let mut check_interval = interval(Duration::from_secs(resource_check_interval)); + let mut shutdown_rx = shutdown_rx; + + let cpu_threshold = PARSEABLE.options.cpu_utilization_threshold; + let memory_threshold = PARSEABLE.options.memory_utilization_threshold; + + info!("Resource monitor started with thresholds - CPU: {:.1}%, Memory: {:.1}%", + cpu_threshold, memory_threshold); + loop { + select! { + _ = check_interval.tick() => { + trace!("Checking system resource utilization..."); + + refresh_sys_info(); + let (used_memory, total_memory, cpu_usage) = tokio::task::spawn_blocking(|| { + let sys = SYS_INFO.lock().unwrap(); + let used_memory = sys.used_memory() as f32; + let total_memory = sys.total_memory() as f32; + let cpu_usage = sys.global_cpu_usage(); + (used_memory, total_memory, cpu_usage) + }).await.unwrap(); + + let mut resource_ok = true; + + // Calculate memory usage percentage + let memory_usage = if total_memory > 0.0 { + (used_memory / total_memory) * 100.0 + } else { + 0.0 + }; + + // Log current resource usage every few checks for debugging + info!("Current resource usage - CPU: {:.1}%, Memory: {:.1}% ({:.1}GB/{:.1}GB)", + cpu_usage, memory_usage, + used_memory / 1024.0 / 1024.0 / 1024.0, + total_memory / 1024.0 / 1024.0 / 1024.0); + + // Check memory utilization + if memory_usage > memory_threshold { + warn!("High memory usage detected: {:.1}% (threshold: {:.1}%)", + memory_usage, memory_threshold); + resource_ok = false; + } + + // Check CPU utilization + if cpu_usage > cpu_threshold { + warn!("High CPU usage detected: {:.1}% (threshold: {:.1}%)", + cpu_usage, cpu_threshold); + resource_ok = false; + } + + let previous_state = RESOURCE_CHECK_ENABLED.load(std::sync::atomic::Ordering::SeqCst); + RESOURCE_CHECK_ENABLED.store(resource_ok, std::sync::atomic::Ordering::SeqCst); + + // Log state changes + if previous_state != resource_ok { + if resource_ok { + info!("Resource utilization back to normal - requests will be accepted"); + } else { + warn!("Resource utilization too high - requests will be rejected"); + } + } + }, + _ = &mut shutdown_rx => { + trace!("Resource monitor shutting down"); + break; + } + } + } + }); +} + +/// Middleware to check system resource utilization before processing requests +/// Returns 503 Service Unavailable if resources are over-utilized +pub async fn check_resource_utilization_middleware( + req: ServiceRequest, + next: Next, +) -> Result, Error> { + + let resource_ok = RESOURCE_CHECK_ENABLED.load(std::sync::atomic::Ordering::SeqCst); + + if !resource_ok { + let error_msg = "Server resources over-utilized"; + warn!("Rejecting request to {} due to resource constraints", req.path()); + return Err(ErrorServiceUnavailable(error_msg)); + } + + // Continue processing the request if resource utilization is within limits + next.call(req).await +} diff --git a/src/option.rs b/src/option.rs index db9c94097..7378deb8a 100644 --- a/src/option.rs +++ b/src/option.rs @@ -175,6 +175,25 @@ pub mod validation { } } + pub fn validate_percentage(percentage: &str) -> Result { + if let Ok(percentage) = percentage.parse::() { + if (0.0..=100.0).contains(&percentage) { + Ok(percentage) + } else { + Err("Invalid percentage value. It should be between 0.0 and 100.0".to_string()) + } + } else { + Err("Invalid percentage value. It should be a decimal number like 80.0".to_string()) + } + } + + pub fn validate_seconds(s: &str) -> Result { + if let Ok(seconds) = s.parse::() { + Ok(seconds) + } else { + Err("Invalid value for seconds. It should be a positive integer".to_string()) + } + } pub fn validate_dataset_fields_allowed_limit(s: &str) -> Result { if let Ok(size) = s.parse::() { if (1..=DATASET_FIELD_COUNT_LIMIT).contains(&size) {