diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index 4b5456806..bedacc25d 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -393,7 +393,7 @@ pub async fn sync_role_update_with_ingestors( .await } -pub fn fetch_daily_stats_from_ingestors( +pub fn fetch_daily_stats( date: &str, stream_meta_list: &[ObjectStoreFormat], ) -> Result { diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index b5d65be5d..18cc0f074 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -524,6 +524,8 @@ pub mod error { HotTierValidation(#[from] HotTierValidationError), #[error("{0}")] HotTierError(#[from] HotTierError), + #[error("Invalid query parameter: {0}")] + InvalidQueryParameter(String), } impl actix_web::ResponseError for StreamError { @@ -559,6 +561,7 @@ pub mod error { StreamError::HotTierNotEnabled(_) => StatusCode::FORBIDDEN, StreamError::HotTierValidation(_) => StatusCode::BAD_REQUEST, StreamError::HotTierError(_) => StatusCode::INTERNAL_SERVER_ERROR, + StreamError::InvalidQueryParameter(_) => StatusCode::BAD_REQUEST, } } diff --git a/src/handlers/http/modal/query/querier_logstream.rs b/src/handlers/http/modal/query/querier_logstream.rs index fc2242cdc..2be17af23 100644 --- a/src/handlers/http/modal/query/querier_logstream.rs +++ b/src/handlers/http/modal/query/querier_logstream.rs @@ -17,7 +17,7 @@ */ use core::str; -use std::fs; +use std::{collections::HashMap, fs}; use actix_web::{ web::{self, Path}, @@ -36,18 +36,18 @@ use crate::{ handlers::http::{ base_path_without_preceding_slash, cluster::{ - self, fetch_daily_stats_from_ingestors, fetch_stats_from_ingestors, - sync_streams_with_ingestors, + self, fetch_daily_stats, fetch_stats_from_ingestors, sync_streams_with_ingestors, utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}, }, - logstream::{error::StreamError, get_stats_date}, + logstream::error::StreamError, modal::{NodeMetadata, NodeType}, }, hottier::HotTierManager, parseable::{StreamNotFound, PARSEABLE}, - stats::{self, Stats}, + stats, storage::{ObjectStoreFormat, StreamType, STREAM_ROOT_DIRECTORY}, }; +const STATS_DATE_QUERY_PARAM: &str = "date"; pub async fn delete(stream_name: Path) -> Result { let stream_name = stream_name.into_inner(); @@ -142,20 +142,15 @@ pub async fn get_stats( return Err(StreamNotFound(stream_name.clone()).into()); } - let query_string = req.query_string(); - if !query_string.is_empty() { - let date_key = query_string.split('=').collect::>()[0]; - let date_value = query_string.split('=').collect::>()[1]; - if date_key != "date" { - return Err(StreamError::Custom { - msg: "Invalid query parameter".to_string(), - status: StatusCode::BAD_REQUEST, - }); - } + let query_map = web::Query::>::from_query(req.query_string()) + .map_err(|_| StreamError::InvalidQueryParameter(STATS_DATE_QUERY_PARAM.to_string()))?; - if !date_value.is_empty() { - let querier_stats = get_stats_date(&stream_name, date_value).await?; + if !query_map.is_empty() { + let date_value = query_map.get(STATS_DATE_QUERY_PARAM).ok_or_else(|| { + StreamError::InvalidQueryParameter(STATS_DATE_QUERY_PARAM.to_string()) + })?; + if !date_value.is_empty() { // this function requires all the ingestor stream jsons let path = RelativePathBuf::from_iter([&stream_name, STREAM_ROOT_DIRECTORY]); let obs = PARSEABLE @@ -163,13 +158,11 @@ pub async fn get_stats( .get_object_store() .get_objects( Some(&path), - Box::new(|file_name| { - file_name.starts_with(".ingestor") && file_name.ends_with("stream.json") - }), + Box::new(|file_name| file_name.ends_with("stream.json")), ) .await?; - let mut ingestor_stream_jsons = Vec::new(); + let mut stream_jsons = Vec::new(); for ob in obs { let stream_metadata: ObjectStoreFormat = match serde_json::from_slice(&ob) { Ok(d) => d, @@ -178,20 +171,14 @@ pub async fn get_stats( continue; } }; - ingestor_stream_jsons.push(stream_metadata); + stream_jsons.push(stream_metadata); } - let ingestor_stats = - fetch_daily_stats_from_ingestors(date_value, &ingestor_stream_jsons)?; + let stats = fetch_daily_stats(date_value, &stream_jsons)?; - let total_stats = Stats { - events: querier_stats.events + ingestor_stats.events, - ingestion: querier_stats.ingestion + ingestor_stats.ingestion, - storage: querier_stats.storage + ingestor_stats.storage, - }; - let stats = serde_json::to_value(total_stats)?; + let stats = serde_json::to_value(stats)?; - return Ok((web::Json(stats), StatusCode::OK)); + return Ok(web::Json(stats)); } } @@ -238,5 +225,5 @@ pub async fn get_stats( let stats = serde_json::to_value(stats)?; - Ok((web::Json(stats), StatusCode::OK)) + Ok(web::Json(stats)) } diff --git a/src/prism/home/mod.rs b/src/prism/home/mod.rs index 82e24eb9d..c249b2d97 100644 --- a/src/prism/home/mod.rs +++ b/src/prism/home/mod.rs @@ -30,10 +30,7 @@ use crate::{ alerts::{get_alerts_info, AlertError, AlertsInfo, ALERTS}, correlation::{CorrelationError, CORRELATIONS}, event::format::LogSource, - handlers::http::{ - cluster::fetch_daily_stats_from_ingestors, - logstream::{error::StreamError, get_stats_date}, - }, + handlers::http::{cluster::fetch_daily_stats, logstream::error::StreamError}, parseable::PARSEABLE, rbac::{map::SessionKey, role::Action, Users}, storage::{ObjectStorageError, ObjectStoreFormat, STREAM_ROOT_DIRECTORY}, @@ -221,9 +218,9 @@ async fn stats_for_date( }; // Process each stream concurrently - let stream_stats_futures = stream_wise_meta.iter().map(|(stream, meta)| { - get_stream_stats_for_date(stream.clone(), date.clone(), meta.clone()) - }); + let stream_stats_futures = stream_wise_meta + .values() + .map(|meta| get_stream_stats_for_date(date.clone(), meta)); let stream_stats_results = futures::future::join_all(stream_stats_futures).await; @@ -246,18 +243,12 @@ async fn stats_for_date( } async fn get_stream_stats_for_date( - stream: String, date: String, - meta: Vec, + meta: &[ObjectStoreFormat], ) -> Result<(u64, u64, u64), PrismHomeError> { - let querier_stats = get_stats_date(&stream, &date).await?; - let ingestor_stats = fetch_daily_stats_from_ingestors(&date, &meta)?; - - Ok(( - querier_stats.events + ingestor_stats.events, - querier_stats.ingestion + ingestor_stats.ingestion, - querier_stats.storage + ingestor_stats.storage, - )) + let stats = fetch_daily_stats(&date, meta)?; + + Ok((stats.events, stats.ingestion, stats.storage)) } pub async fn generate_home_search_response(