Skip to content

Commit d572881

Browse files
fix: date level stats
read all stream.json files for a dataset get sum of stats for a given date from the manifest list in snapshot no need to calculate querier / ingestor stats separately
1 parent 3d844b3 commit d572881

File tree

3 files changed

+17
-32
lines changed

3 files changed

+17
-32
lines changed

src/handlers/http/cluster/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ pub async fn sync_role_update_with_ingestors(
393393
.await
394394
}
395395

396-
pub fn fetch_daily_stats_from_ingestors(
396+
pub fn fetch_daily_stats(
397397
date: &str,
398398
stream_meta_list: &[ObjectStoreFormat],
399399
) -> Result<Stats, StreamError> {

src/handlers/http/modal/query/querier_logstream.rs

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,10 @@ use crate::{
3636
handlers::http::{
3737
base_path_without_preceding_slash,
3838
cluster::{
39-
self, fetch_daily_stats_from_ingestors, fetch_stats_from_ingestors,
40-
sync_streams_with_ingestors,
39+
self, fetch_daily_stats, fetch_stats_from_ingestors, sync_streams_with_ingestors,
4140
utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats},
4241
},
43-
logstream::{error::StreamError, get_stats_date},
42+
logstream::error::StreamError,
4443
modal::{NodeMetadata, NodeType},
4544
},
4645
hottier::HotTierManager,
@@ -154,22 +153,18 @@ pub async fn get_stats(
154153
}
155154

156155
if !date_value.is_empty() {
157-
let querier_stats = get_stats_date(&stream_name, date_value).await?;
158-
159156
// this function requires all the ingestor stream jsons
160157
let path = RelativePathBuf::from_iter([&stream_name, STREAM_ROOT_DIRECTORY]);
161158
let obs = PARSEABLE
162159
.storage
163160
.get_object_store()
164161
.get_objects(
165162
Some(&path),
166-
Box::new(|file_name| {
167-
file_name.starts_with(".ingestor") && file_name.ends_with("stream.json")
168-
}),
163+
Box::new(|file_name| file_name.ends_with("stream.json")),
169164
)
170165
.await?;
171166

172-
let mut ingestor_stream_jsons = Vec::new();
167+
let mut stream_jsons = Vec::new();
173168
for ob in obs {
174169
let stream_metadata: ObjectStoreFormat = match serde_json::from_slice(&ob) {
175170
Ok(d) => d,
@@ -178,16 +173,15 @@ pub async fn get_stats(
178173
continue;
179174
}
180175
};
181-
ingestor_stream_jsons.push(stream_metadata);
176+
stream_jsons.push(stream_metadata);
182177
}
183178

184-
let ingestor_stats =
185-
fetch_daily_stats_from_ingestors(date_value, &ingestor_stream_jsons)?;
179+
let stats = fetch_daily_stats(date_value, &stream_jsons)?;
186180

187181
let total_stats = Stats {
188-
events: querier_stats.events + ingestor_stats.events,
189-
ingestion: querier_stats.ingestion + ingestor_stats.ingestion,
190-
storage: querier_stats.storage + ingestor_stats.storage,
182+
events: stats.events,
183+
ingestion: stats.ingestion,
184+
storage: stats.storage,
191185
};
192186
let stats = serde_json::to_value(total_stats)?;
193187

src/prism/home/mod.rs

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,7 @@ use crate::{
3030
alerts::{get_alerts_info, AlertError, AlertsInfo, ALERTS},
3131
correlation::{CorrelationError, CORRELATIONS},
3232
event::format::LogSource,
33-
handlers::http::{
34-
cluster::fetch_daily_stats_from_ingestors,
35-
logstream::{error::StreamError, get_stats_date},
36-
},
33+
handlers::http::{cluster::fetch_daily_stats, logstream::error::StreamError},
3734
parseable::PARSEABLE,
3835
rbac::{map::SessionKey, role::Action, Users},
3936
storage::{ObjectStorageError, ObjectStoreFormat, STREAM_ROOT_DIRECTORY},
@@ -221,9 +218,9 @@ async fn stats_for_date(
221218
};
222219

223220
// Process each stream concurrently
224-
let stream_stats_futures = stream_wise_meta.iter().map(|(stream, meta)| {
225-
get_stream_stats_for_date(stream.clone(), date.clone(), meta.clone())
226-
});
221+
let stream_stats_futures = stream_wise_meta
222+
.values()
223+
.map(|meta| get_stream_stats_for_date(date.clone(), meta.clone()));
227224

228225
let stream_stats_results = futures::future::join_all(stream_stats_futures).await;
229226

@@ -246,18 +243,12 @@ async fn stats_for_date(
246243
}
247244

248245
async fn get_stream_stats_for_date(
249-
stream: String,
250246
date: String,
251247
meta: Vec<ObjectStoreFormat>,
252248
) -> Result<(u64, u64, u64), PrismHomeError> {
253-
let querier_stats = get_stats_date(&stream, &date).await?;
254-
let ingestor_stats = fetch_daily_stats_from_ingestors(&date, &meta)?;
255-
256-
Ok((
257-
querier_stats.events + ingestor_stats.events,
258-
querier_stats.ingestion + ingestor_stats.ingestion,
259-
querier_stats.storage + ingestor_stats.storage,
260-
))
249+
let stats = fetch_daily_stats(&date, &meta)?;
250+
251+
Ok((stats.events, stats.ingestion, stats.storage))
261252
}
262253

263254
pub async fn generate_home_search_response(

0 commit comments

Comments
 (0)