Skip to content

fix: date level stats #1312

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 15, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ pub async fn sync_role_update_with_ingestors(
.await
}

pub fn fetch_daily_stats_from_ingestors(
pub fn fetch_daily_stats(
date: &str,
stream_meta_list: &[ObjectStoreFormat],
) -> Result<Stats, StreamError> {
Expand Down
24 changes: 9 additions & 15 deletions src/handlers/http/modal/query/querier_logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,10 @@ use crate::{
handlers::http::{
base_path_without_preceding_slash,
cluster::{
self, fetch_daily_stats_from_ingestors, fetch_stats_from_ingestors,
sync_streams_with_ingestors,
self, fetch_daily_stats, fetch_stats_from_ingestors, sync_streams_with_ingestors,
utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats},
},
logstream::{error::StreamError, get_stats_date},
logstream::error::StreamError,
modal::{NodeMetadata, NodeType},
},
hottier::HotTierManager,
Expand Down Expand Up @@ -154,22 +153,18 @@ pub async fn get_stats(
}

if !date_value.is_empty() {
let querier_stats = get_stats_date(&stream_name, date_value).await?;

// this function requires all the ingestor stream jsons
let path = RelativePathBuf::from_iter([&stream_name, STREAM_ROOT_DIRECTORY]);
let obs = PARSEABLE
.storage
.get_object_store()
.get_objects(
Some(&path),
Box::new(|file_name| {
file_name.starts_with(".ingestor") && file_name.ends_with("stream.json")
}),
Box::new(|file_name| file_name.ends_with("stream.json")),
)
.await?;

let mut ingestor_stream_jsons = Vec::new();
let mut stream_jsons = Vec::new();
for ob in obs {
let stream_metadata: ObjectStoreFormat = match serde_json::from_slice(&ob) {
Ok(d) => d,
Expand All @@ -178,16 +173,15 @@ pub async fn get_stats(
continue;
}
};
ingestor_stream_jsons.push(stream_metadata);
stream_jsons.push(stream_metadata);
}

let ingestor_stats =
fetch_daily_stats_from_ingestors(date_value, &ingestor_stream_jsons)?;
let stats = fetch_daily_stats(date_value, &stream_jsons)?;

let total_stats = Stats {
events: querier_stats.events + ingestor_stats.events,
ingestion: querier_stats.ingestion + ingestor_stats.ingestion,
storage: querier_stats.storage + ingestor_stats.storage,
events: stats.events,
ingestion: stats.ingestion,
storage: stats.storage,
};
let stats = serde_json::to_value(total_stats)?;

Expand Down
23 changes: 7 additions & 16 deletions src/prism/home/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@ use crate::{
alerts::{get_alerts_info, AlertError, AlertsInfo, ALERTS},
correlation::{CorrelationError, CORRELATIONS},
event::format::LogSource,
handlers::http::{
cluster::fetch_daily_stats_from_ingestors,
logstream::{error::StreamError, get_stats_date},
},
handlers::http::{cluster::fetch_daily_stats, logstream::error::StreamError},
parseable::PARSEABLE,
rbac::{map::SessionKey, role::Action, Users},
storage::{ObjectStorageError, ObjectStoreFormat, STREAM_ROOT_DIRECTORY},
Expand Down Expand Up @@ -221,9 +218,9 @@ async fn stats_for_date(
};

// Process each stream concurrently
let stream_stats_futures = stream_wise_meta.iter().map(|(stream, meta)| {
get_stream_stats_for_date(stream.clone(), date.clone(), meta.clone())
});
let stream_stats_futures = stream_wise_meta
.values()
.map(|meta| get_stream_stats_for_date(date.clone(), meta.clone()));

let stream_stats_results = futures::future::join_all(stream_stats_futures).await;

Expand All @@ -246,18 +243,12 @@ async fn stats_for_date(
}

async fn get_stream_stats_for_date(
stream: String,
date: String,
meta: Vec<ObjectStoreFormat>,
) -> Result<(u64, u64, u64), PrismHomeError> {
let querier_stats = get_stats_date(&stream, &date).await?;
let ingestor_stats = fetch_daily_stats_from_ingestors(&date, &meta)?;

Ok((
querier_stats.events + ingestor_stats.events,
querier_stats.ingestion + ingestor_stats.ingestion,
querier_stats.storage + ingestor_stats.storage,
))
let stats = fetch_daily_stats(&date, &meta)?;

Ok((stats.events, stats.ingestion, stats.storage))
}

pub async fn generate_home_search_response(
Expand Down
Loading