Skip to content

feat: add resource utilisation middleware to monitor CPU and memory. #1352

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
19 changes: 19 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,25 @@ pub struct Options {
)]
pub parquet_compression: Compression,

// Resource monitoring
#[arg(
long,
env = "P_CPU_THRESHOLD",
default_value = "80.0",
value_parser = validation::validate_percentage,
help = "CPU utilization threshold percentage (0.0-100.0) for resource monitoring"
)]
pub cpu_utilization_threshold: f32,

#[arg(
long,
env = "P_MEMORY_THRESHOLD",
default_value = "80.0",
value_parser = validation::validate_percentage,
help = "Memory utilization threshold percentage (0.0-100.0) for resource monitoring"
)]
pub memory_utilization_threshold: f32,

// Integration features
#[arg(
long,
Expand Down
1 change: 1 addition & 0 deletions src/handlers/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub mod cluster;
pub mod correlation;
pub mod health_check;
pub mod ingest;
pub mod resource_check;
mod kinesis;
pub mod llm;
pub mod logstream;
Expand Down
8 changes: 7 additions & 1 deletion src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::{
http::{
base_path, ingest, logstream,
middleware::{DisAllowRootUser, RouteExt},
role,
resource_check, role,
},
},
migration,
Expand Down Expand Up @@ -126,12 +126,18 @@ impl ParseableServer for IngestServer {
let (cancel_tx, cancel_rx) = oneshot::channel();
thread::spawn(|| sync::handler(cancel_rx));

// Start resource monitor
let (resource_shutdown_tx, resource_shutdown_rx) = oneshot::channel();
resource_check::spawn_resource_monitor(resource_shutdown_rx);

tokio::spawn(airplane::server());

// Ingestors shouldn't have to deal with OpenId auth flow
let result = self.start(shutdown_rx, prometheus.clone(), None).await;
// Cancel sync jobs
cancel_tx.send(()).expect("Cancellation should not fail");
// Shutdown resource monitor
let _ = resource_shutdown_tx.send(());
if let Err(join_err) = startup_sync_handle.await {
tracing::warn!("startup sync task panicked: {join_err}");
}
Expand Down
3 changes: 2 additions & 1 deletion src/handlers/http/modal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use crate::{
utils::get_node_id,
};

use super::{audit, cross_origin_config, health_check, API_BASE_PATH, API_VERSION};
use super::{audit, cross_origin_config, health_check, resource_check, API_BASE_PATH, API_VERSION};

pub mod ingest;
pub mod ingest_server;
Expand Down Expand Up @@ -113,6 +113,7 @@ pub trait ParseableServer {
.wrap(prometheus.clone())
.configure(|config| Self::configure_routes(config, oidc_client.clone()))
.wrap(from_fn(health_check::check_shutdown_middleware))
.wrap(from_fn(resource_check::check_resource_utilization_middleware))
.wrap(from_fn(audit::audit_log_middleware))
.wrap(actix_web::middleware::Logger::default())
.wrap(actix_web::middleware::Compress::default())
Expand Down
7 changes: 7 additions & 0 deletions src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::handlers::http::alerts;
use crate::handlers::http::base_path;
use crate::handlers::http::health_check;
use crate::handlers::http::prism_base_path;
use crate::handlers::http::resource_check;
use crate::handlers::http::query;
use crate::handlers::http::users::dashboards;
use crate::handlers::http::users::filters;
Expand Down Expand Up @@ -138,6 +139,10 @@ impl ParseableServer for Server {
let (cancel_tx, cancel_rx) = oneshot::channel();
thread::spawn(|| sync::handler(cancel_rx));

// Start resource monitor
let (resource_shutdown_tx, resource_shutdown_rx) = oneshot::channel();
resource_check::spawn_resource_monitor(resource_shutdown_rx);

if PARSEABLE.options.send_analytics {
analytics::init_analytics_scheduler()?;
}
Expand All @@ -150,6 +155,8 @@ impl ParseableServer for Server {
.await;
// Cancel sync jobs
cancel_tx.send(()).expect("Cancellation should not fail");
// Shutdown resource monitor
let _ = resource_shutdown_tx.send(());
if let Err(join_err) = startup_sync_handle.await {
tracing::warn!("startup sync task panicked: {join_err}");
}
Expand Down
127 changes: 127 additions & 0 deletions src/handlers/http/resource_check.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

use actix_web::{
body::MessageBody,
dev::{ServiceRequest, ServiceResponse},
error::Error,
error::ErrorServiceUnavailable,
middleware::Next,
};
use tokio::{select, time::{interval, Duration}};
use tokio::sync::RwLock;
use tracing::{warn, trace, info};

use crate::analytics::{SYS_INFO, refresh_sys_info};
use crate::parseable::PARSEABLE;

static RESOURCE_CHECK_ENABLED: RwLock<bool> = RwLock::const_new(true);

/// Spawn a background task to monitor system resources
pub fn spawn_resource_monitor(shutdown_rx: tokio::sync::oneshot::Receiver<()>) {
tokio::spawn(async move {
let mut check_interval = interval(Duration::from_secs(30));
let mut shutdown_rx = shutdown_rx;

let cpu_threshold = PARSEABLE.options.cpu_utilization_threshold;
let memory_threshold = PARSEABLE.options.memory_utilization_threshold;

info!("Resource monitor started with thresholds - CPU: {:.1}%, Memory: {:.1}%",
cpu_threshold, memory_threshold);
loop {
select! {
_ = check_interval.tick() => {
trace!("Checking system resource utilization...");

refresh_sys_info();
let (used_memory, total_memory, cpu_usage) = tokio::task::spawn_blocking(|| {
let sys = SYS_INFO.lock().unwrap();
let used_memory = sys.used_memory() as f32;
let total_memory = sys.total_memory() as f32;
let cpu_usage = sys.global_cpu_usage();
(used_memory, total_memory, cpu_usage)
}).await.unwrap();

let mut resource_ok = true;

// Calculate memory usage percentage
let memory_usage = if total_memory > 0.0 {
(used_memory / total_memory) * 100.0
} else {
0.0
};

// Log current resource usage every few checks for debugging
info!("Current resource usage - CPU: {:.1}%, Memory: {:.1}% ({:.1}GiB/{:.1}GiB)",
cpu_usage, memory_usage,
used_memory / 1024.0 / 1024.0 / 1024.0,
total_memory / 1024.0 / 1024.0 / 1024.0);

// Check memory utilization
if memory_usage > memory_threshold {
warn!("High memory usage detected: {:.1}% (threshold: {:.1}%)",
memory_usage, memory_threshold);
resource_ok = false;
}

// Check CPU utilization
if cpu_usage > cpu_threshold {
warn!("High CPU usage detected: {:.1}% (threshold: {:.1}%)",
cpu_usage, cpu_threshold);
resource_ok = false;
}

let previous_state = *RESOURCE_CHECK_ENABLED.read().await;
*RESOURCE_CHECK_ENABLED.write().await = resource_ok;

// Log state changes
if previous_state != resource_ok {
if resource_ok {
info!("Resource utilization back to normal - requests will be accepted");
} else {
warn!("Resource utilization too high - requests will be rejected");
}
}
},
_ = &mut shutdown_rx => {
trace!("Resource monitor shutting down");
break;
}
}
}
});
}

/// Middleware to check system resource utilization before processing requests
/// Returns 503 Service Unavailable if resources are over-utilized
pub async fn check_resource_utilization_middleware(
req: ServiceRequest,
next: Next<impl MessageBody>,
) -> Result<ServiceResponse<impl MessageBody>, Error> {

let resource_ok = *RESOURCE_CHECK_ENABLED.read().await;

if !resource_ok {
let error_msg = "Server resources over-utilized";
warn!("Rejecting request to {} due to resource constraints", req.path());
return Err(ErrorServiceUnavailable(error_msg));
}

// Continue processing the request if resource utilization is within limits
next.call(req).await
}
12 changes: 12 additions & 0 deletions src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,18 @@ pub mod validation {
}
}

pub fn validate_percentage(percentage: &str) -> Result<f32, String> {
if let Ok(percentage) = percentage.parse::<f32>() {
if (0.0..=100.0).contains(&percentage) {
Ok(percentage)
} else {
Err("Invalid percentage value. It should be between 0.0 and 100.0".to_string())
}
} else {
Err("Invalid percentage value. It should be a decimal number like 80.0".to_string())
}
}

pub fn validate_dataset_fields_allowed_limit(s: &str) -> Result<usize, String> {
if let Ok(size) = s.parse::<usize>() {
if (1..=DATASET_FIELD_COUNT_LIMIT).contains(&size) {
Expand Down
Loading