Skip to content

Commit af7c02d

Browse files
Merge branch 'main' into new-dashboard
2 parents b176c74 + 61697ea commit af7c02d

File tree

10 files changed

+237
-20
lines changed

10 files changed

+237
-20
lines changed

src/about.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,14 +71,15 @@ pub fn get_latest_release() -> &'static Option<LatestRelease> {
7171
}
7272

7373
// User Agent for Download API call
74-
// Format: Parseable/<UID>/<version>/<commit_hash> (<OS>; <Platform>)
75-
pub fn user_agent(uid: &Ulid) -> String {
74+
// Format: Parseable/<UID>/<version>/<commit_hash>/<send_analytics> (<OS>; <Platform>)
75+
pub fn user_agent(uid: &Ulid, send_analytics: bool) -> String {
7676
analytics::refresh_sys_info();
7777
format!(
78-
"Parseable/{}/{}/{} ({:?}; {})",
78+
"Parseable/{}/{}/{}/{} ({:?}; {})",
7979
uid,
8080
current().released_version,
8181
current().commit_hash,
82+
send_analytics,
8283
System::name().unwrap_or_default(),
8384
platform()
8485
)

src/cli.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,34 @@ pub struct Options {
317317
)]
318318
pub parquet_compression: Compression,
319319

320+
// Resource monitoring
321+
#[arg(
322+
long,
323+
env = "P_RESOURCE_CHECK_INTERVAL",
324+
default_value = "15",
325+
value_parser = validation::validate_seconds,
326+
help = "Resource monitoring check interval in seconds"
327+
)]
328+
pub resource_check_interval: u64,
329+
330+
#[arg(
331+
long,
332+
env = "P_CPU_THRESHOLD",
333+
default_value = "80.0",
334+
value_parser = validation::validate_percentage,
335+
help = "CPU utilization threshold percentage (0.0-100.0) for resource monitoring"
336+
)]
337+
pub cpu_utilization_threshold: f32,
338+
339+
#[arg(
340+
long,
341+
env = "P_MEMORY_THRESHOLD",
342+
default_value = "80.0",
343+
value_parser = validation::validate_percentage,
344+
help = "Memory utilization threshold percentage (0.0-100.0) for resource monitoring"
345+
)]
346+
pub memory_utilization_threshold: f32,
347+
320348
// Integration features
321349
#[arg(
322350
long,

src/handlers/http/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ pub mod cluster;
3636
pub mod correlation;
3737
pub mod health_check;
3838
pub mod ingest;
39+
pub mod resource_check;
3940
mod kinesis;
4041
pub mod llm;
4142
pub mod logstream;

src/handlers/http/modal/ingest_server.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::thread;
2121

2222
use actix_web::web;
2323
use actix_web::Scope;
24+
use actix_web::middleware::from_fn;
2425
use actix_web_prometheus::PrometheusMetrics;
2526
use async_trait::async_trait;
2627
use base64::Engine;
@@ -39,7 +40,7 @@ use crate::{
3940
http::{
4041
base_path, ingest, logstream,
4142
middleware::{DisAllowRootUser, RouteExt},
42-
role,
43+
resource_check, role,
4344
},
4445
},
4546
migration,
@@ -67,7 +68,10 @@ impl ParseableServer for IngestServer {
6768
.service(
6869
// Base path "{url}/api/v1"
6970
web::scope(&base_path())
70-
.service(Server::get_ingest_factory())
71+
.service(
72+
Server::get_ingest_factory()
73+
.wrap(from_fn(resource_check::check_resource_utilization_middleware))
74+
)
7175
.service(Self::logstream_api())
7276
.service(Server::get_about_factory())
7377
.service(Self::analytics_factory())
@@ -77,7 +81,10 @@ impl ParseableServer for IngestServer {
7781
.service(Server::get_metrics_webscope())
7882
.service(Server::get_readiness_factory()),
7983
)
80-
.service(Server::get_ingest_otel_factory());
84+
.service(
85+
Server::get_ingest_otel_factory()
86+
.wrap(from_fn(resource_check::check_resource_utilization_middleware))
87+
);
8188
}
8289

8390
async fn load_metadata(&self) -> anyhow::Result<Option<Bytes>> {
@@ -223,7 +230,8 @@ impl IngestServer {
223230
web::post()
224231
.to(ingest::post_event)
225232
.authorize_for_stream(Action::Ingest),
226-
),
233+
)
234+
.wrap(from_fn(resource_check::check_resource_utilization_middleware)),
227235
)
228236
.service(
229237
web::resource("/sync")

src/handlers/http/modal/mod.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ use crate::{
4545
utils::get_node_id,
4646
};
4747

48-
use super::{audit, cross_origin_config, health_check, API_BASE_PATH, API_VERSION};
48+
use super::{audit, cross_origin_config, health_check, resource_check, API_BASE_PATH, API_VERSION};
4949

5050
pub mod ingest;
5151
pub mod ingest_server;
@@ -107,6 +107,10 @@ pub trait ParseableServer {
107107
&PARSEABLE.options.trusted_ca_certs_path,
108108
)?;
109109

110+
// Start resource monitor
111+
let (resource_shutdown_tx, resource_shutdown_rx) = oneshot::channel();
112+
resource_check::spawn_resource_monitor(resource_shutdown_rx);
113+
110114
// fn that creates the app
111115
let create_app_fn = move || {
112116
App::new()
@@ -142,6 +146,9 @@ pub trait ParseableServer {
142146

143147
health_check::shutdown().await;
144148

149+
// Shutdown resource monitor
150+
let _ = resource_shutdown_tx.send(());
151+
145152
// Initiate graceful shutdown
146153
info!("Graceful shutdown of HTTP server triggered");
147154
srv_handle.stop(true).await;

src/handlers/http/modal/query_server.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,14 @@ use std::thread;
2222
use crate::handlers::airplane;
2323
use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular};
2424
use crate::handlers::http::middleware::{DisAllowRootUser, RouteExt};
25-
use crate::handlers::http::{base_path, prism_base_path};
25+
use crate::handlers::http::{base_path, prism_base_path, resource_check};
2626
use crate::handlers::http::{logstream, MAX_EVENT_PAYLOAD_SIZE};
2727
use crate::handlers::http::{rbac, role};
2828
use crate::hottier::HotTierManager;
2929
use crate::rbac::role::Action;
3030
use crate::sync::sync_start;
3131
use crate::{analytics, migration, storage, sync};
32+
use actix_web::middleware::from_fn;
3233
use actix_web::web::{resource, ServiceConfig};
3334
use actix_web::{web, Scope};
3435
use actix_web_prometheus::PrometheusMetrics;
@@ -53,7 +54,10 @@ impl ParseableServer for QueryServer {
5354
.service(
5455
web::scope(&base_path())
5556
.service(Server::get_correlation_webscope())
56-
.service(Server::get_query_factory())
57+
.service(
58+
Server::get_query_factory()
59+
.wrap(from_fn(resource_check::check_resource_utilization_middleware))
60+
)
5761
.service(Server::get_liveness_factory())
5862
.service(Server::get_readiness_factory())
5963
.service(Server::get_about_factory())
@@ -66,7 +70,10 @@ impl ParseableServer for QueryServer {
6670
.service(Server::get_oauth_webscope(oidc_client))
6771
.service(Self::get_user_role_webscope())
6872
.service(Server::get_roles_webscope())
69-
.service(Server::get_counts_webscope())
73+
.service(
74+
Server::get_counts_webscope()
75+
.wrap(from_fn(resource_check::check_resource_utilization_middleware))
76+
)
7077
.service(Server::get_metrics_webscope())
7178
.service(Server::get_alerts_webscope())
7279
.service(Self::get_cluster_web_scope()),

src/handlers/http/modal/server.rs

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use crate::handlers::http::alerts;
2525
use crate::handlers::http::base_path;
2626
use crate::handlers::http::health_check;
2727
use crate::handlers::http::prism_base_path;
28+
use crate::handlers::http::resource_check;
2829
use crate::handlers::http::query;
2930
use crate::handlers::http::users::dashboards;
3031
use crate::handlers::http::users::filters;
@@ -37,6 +38,7 @@ use crate::sync::sync_start;
3738

3839
use actix_web::web;
3940
use actix_web::web::resource;
41+
use actix_web::middleware::from_fn;
4042
use actix_web::Resource;
4143
use actix_web::Scope;
4244
use actix_web_prometheus::PrometheusMetrics;
@@ -71,8 +73,14 @@ impl ParseableServer for Server {
7173
.service(
7274
web::scope(&base_path())
7375
.service(Self::get_correlation_webscope())
74-
.service(Self::get_query_factory())
75-
.service(Self::get_ingest_factory())
76+
.service(
77+
Self::get_query_factory()
78+
.wrap(from_fn(resource_check::check_resource_utilization_middleware))
79+
)
80+
.service(
81+
Self::get_ingest_factory()
82+
.wrap(from_fn(resource_check::check_resource_utilization_middleware))
83+
)
7684
.service(Self::get_liveness_factory())
7785
.service(Self::get_readiness_factory())
7886
.service(Self::get_about_factory())
@@ -85,7 +93,10 @@ impl ParseableServer for Server {
8593
.service(Self::get_oauth_webscope(oidc_client))
8694
.service(Self::get_user_role_webscope())
8795
.service(Self::get_roles_webscope())
88-
.service(Self::get_counts_webscope())
96+
.service(
97+
Self::get_counts_webscope()
98+
.wrap(from_fn(resource_check::check_resource_utilization_middleware))
99+
)
89100
.service(Self::get_alerts_webscope())
90101
.service(Self::get_metrics_webscope()),
91102
)
@@ -95,7 +106,10 @@ impl ParseableServer for Server {
95106
.service(Server::get_prism_logstream())
96107
.service(Server::get_prism_datasets()),
97108
)
98-
.service(Self::get_ingest_otel_factory())
109+
.service(
110+
Self::get_ingest_otel_factory()
111+
.wrap(from_fn(resource_check::check_resource_utilization_middleware))
112+
)
99113
.service(Self::get_generated());
100114
}
101115

@@ -363,13 +377,14 @@ impl Server {
363377
.route(
364378
web::put()
365379
.to(logstream::put_stream)
366-
.authorize_for_stream(Action::CreateStream),
380+
.authorize_for_stream(Action::CreateStream)
367381
)
368382
// POST "/logstream/{logstream}" ==> Post logs to given log stream
369383
.route(
370384
web::post()
371385
.to(ingest::post_event)
372-
.authorize_for_stream(Action::Ingest),
386+
.authorize_for_stream(Action::Ingest)
387+
.wrap(from_fn(resource_check::check_resource_utilization_middleware)),
373388
)
374389
// DELETE "/logstream/{logstream}" ==> Delete log stream
375390
.route(
@@ -378,7 +393,7 @@ impl Server {
378393
.authorize_for_stream(Action::DeleteStream),
379394
)
380395
.app_data(web::JsonConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)),
381-
)
396+
)
382397
.service(
383398
// GET "/logstream/{logstream}/info" ==> Get info for given log stream
384399
web::resource("/info").route(

src/handlers/http/resource_check.rs

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License as
6+
* published by the Free Software Foundation, either version 3 of the
7+
* License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Affero General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Affero General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*
17+
*/
18+
19+
use std::sync::{atomic::AtomicBool, Arc, LazyLock};
20+
21+
use actix_web::{
22+
body::MessageBody,
23+
dev::{ServiceRequest, ServiceResponse},
24+
error::Error,
25+
error::ErrorServiceUnavailable,
26+
middleware::Next,
27+
};
28+
use tokio::{select, time::{interval, Duration}};
29+
use tracing::{warn, trace, info};
30+
31+
use crate::analytics::{SYS_INFO, refresh_sys_info};
32+
use crate::parseable::PARSEABLE;
33+
34+
static RESOURCE_CHECK_ENABLED:LazyLock<Arc<AtomicBool>> = LazyLock::new(|| Arc::new(AtomicBool::new(false)));
35+
36+
/// Spawn a background task to monitor system resources
37+
pub fn spawn_resource_monitor(shutdown_rx: tokio::sync::oneshot::Receiver<()>) {
38+
tokio::spawn(async move {
39+
let resource_check_interval = PARSEABLE.options.resource_check_interval;
40+
let mut check_interval = interval(Duration::from_secs(resource_check_interval));
41+
let mut shutdown_rx = shutdown_rx;
42+
43+
let cpu_threshold = PARSEABLE.options.cpu_utilization_threshold;
44+
let memory_threshold = PARSEABLE.options.memory_utilization_threshold;
45+
46+
info!("Resource monitor started with thresholds - CPU: {:.1}%, Memory: {:.1}%",
47+
cpu_threshold, memory_threshold);
48+
loop {
49+
select! {
50+
_ = check_interval.tick() => {
51+
trace!("Checking system resource utilization...");
52+
53+
refresh_sys_info();
54+
let (used_memory, total_memory, cpu_usage) = tokio::task::spawn_blocking(|| {
55+
let sys = SYS_INFO.lock().unwrap();
56+
let used_memory = sys.used_memory() as f32;
57+
let total_memory = sys.total_memory() as f32;
58+
let cpu_usage = sys.global_cpu_usage();
59+
(used_memory, total_memory, cpu_usage)
60+
}).await.unwrap();
61+
62+
let mut resource_ok = true;
63+
64+
// Calculate memory usage percentage
65+
let memory_usage = if total_memory > 0.0 {
66+
(used_memory / total_memory) * 100.0
67+
} else {
68+
0.0
69+
};
70+
71+
// Log current resource usage every few checks for debugging
72+
info!("Current resource usage - CPU: {:.1}%, Memory: {:.1}% ({:.1}GB/{:.1}GB)",
73+
cpu_usage, memory_usage,
74+
used_memory / 1024.0 / 1024.0 / 1024.0,
75+
total_memory / 1024.0 / 1024.0 / 1024.0);
76+
77+
// Check memory utilization
78+
if memory_usage > memory_threshold {
79+
warn!("High memory usage detected: {:.1}% (threshold: {:.1}%)",
80+
memory_usage, memory_threshold);
81+
resource_ok = false;
82+
}
83+
84+
// Check CPU utilization
85+
if cpu_usage > cpu_threshold {
86+
warn!("High CPU usage detected: {:.1}% (threshold: {:.1}%)",
87+
cpu_usage, cpu_threshold);
88+
resource_ok = false;
89+
}
90+
91+
let previous_state = RESOURCE_CHECK_ENABLED.load(std::sync::atomic::Ordering::SeqCst);
92+
RESOURCE_CHECK_ENABLED.store(resource_ok, std::sync::atomic::Ordering::SeqCst);
93+
94+
// Log state changes
95+
if previous_state != resource_ok {
96+
if resource_ok {
97+
info!("Resource utilization back to normal - requests will be accepted");
98+
} else {
99+
warn!("Resource utilization too high - requests will be rejected");
100+
}
101+
}
102+
},
103+
_ = &mut shutdown_rx => {
104+
trace!("Resource monitor shutting down");
105+
break;
106+
}
107+
}
108+
}
109+
});
110+
}
111+
112+
/// Middleware to check system resource utilization before processing requests
113+
/// Returns 503 Service Unavailable if resources are over-utilized
114+
pub async fn check_resource_utilization_middleware(
115+
req: ServiceRequest,
116+
next: Next<impl MessageBody>,
117+
) -> Result<ServiceResponse<impl MessageBody>, Error> {
118+
119+
let resource_ok = RESOURCE_CHECK_ENABLED.load(std::sync::atomic::Ordering::SeqCst);
120+
121+
if !resource_ok {
122+
let error_msg = "Server resources over-utilized";
123+
warn!("Rejecting request to {} due to resource constraints", req.path());
124+
return Err(ErrorServiceUnavailable(error_msg));
125+
}
126+
127+
// Continue processing the request if resource utilization is within limits
128+
next.call(req).await
129+
}

0 commit comments

Comments
 (0)