Skip to content

Commit db59c28

Browse files
fix for stats for each date
update stats in manifest list in snapshot for each date load daily stats on server start update GET /stats endpoint to accept query param if given /stats/date={date in yyyy-mm-dd} format, date level stats is returned
1 parent 2b6739c commit db59c28

File tree

11 files changed

+205
-186
lines changed

11 files changed

+205
-186
lines changed

server/src/catalog.rs

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ use std::{io::ErrorKind, sync::Arc};
2020

2121
use self::{column::Column, snapshot::ManifestItem};
2222
use crate::handlers::http::base_path_without_preceding_slash;
23-
use crate::metrics::{EVENTS_INGESTED_SIZE_TODAY, EVENTS_INGESTED_TODAY, STORAGE_SIZE_TODAY};
23+
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
2424
use crate::option::CONFIG;
25-
use crate::stats::{event_labels, storage_size_labels, update_deleted_stats};
25+
use crate::stats::{event_labels_date, storage_size_labels_date, update_deleted_stats};
2626
use crate::{
2727
catalog::manifest::Manifest,
2828
event::DEFAULT_TIMESTAMP_KEY,
@@ -102,22 +102,6 @@ pub async fn update_snapshot(
102102
stream_name: &str,
103103
change: manifest::File,
104104
) -> Result<(), ObjectStorageError> {
105-
// get current snapshot
106-
let event_labels = event_labels(stream_name, "json");
107-
let storage_size_labels = storage_size_labels(stream_name);
108-
let events_ingested = EVENTS_INGESTED_TODAY
109-
.get_metric_with_label_values(&event_labels)
110-
.unwrap()
111-
.get() as u64;
112-
let ingestion_size = EVENTS_INGESTED_SIZE_TODAY
113-
.get_metric_with_label_values(&event_labels)
114-
.unwrap()
115-
.get() as u64;
116-
let storage_size = STORAGE_SIZE_TODAY
117-
.get_metric_with_label_values(&storage_size_labels)
118-
.unwrap()
119-
.get() as u64;
120-
121105
let mut meta = storage.get_object_store_format(stream_name).await?;
122106
let meta_clone = meta.clone();
123107
let manifests = &mut meta.snapshot.manifest_list;
@@ -132,6 +116,21 @@ pub async fn update_snapshot(
132116
lower_bound
133117
}
134118
};
119+
let date = lower_bound.date_naive().format("%Y-%m-%d").to_string();
120+
let event_labels = event_labels_date(stream_name, "json", &date);
121+
let storage_size_labels = storage_size_labels_date(stream_name, &date);
122+
let events_ingested = EVENTS_INGESTED_DATE
123+
.get_metric_with_label_values(&event_labels)
124+
.unwrap()
125+
.get() as u64;
126+
let ingestion_size = EVENTS_INGESTED_SIZE_DATE
127+
.get_metric_with_label_values(&event_labels)
128+
.unwrap()
129+
.get() as u64;
130+
let storage_size = EVENTS_STORAGE_SIZE_DATE
131+
.get_metric_with_label_values(&storage_size_labels)
132+
.unwrap()
133+
.get() as u64;
135134
let pos = manifests.iter().position(|item| {
136135
item.time_lower_bound <= lower_bound && lower_bound < item.time_upper_bound
137136
});
@@ -149,6 +148,25 @@ pub async fn update_snapshot(
149148
for m in manifests.iter_mut() {
150149
let p = manifest_path("").to_string();
151150
if m.manifest_path.contains(&p) {
151+
let date = m
152+
.time_lower_bound
153+
.date_naive()
154+
.format("%Y-%m-%d")
155+
.to_string();
156+
let event_labels = event_labels_date(stream_name, "json", &date);
157+
let storage_size_labels = storage_size_labels_date(stream_name, &date);
158+
let events_ingested = EVENTS_INGESTED_DATE
159+
.get_metric_with_label_values(&event_labels)
160+
.unwrap()
161+
.get() as u64;
162+
let ingestion_size = EVENTS_INGESTED_SIZE_DATE
163+
.get_metric_with_label_values(&event_labels)
164+
.unwrap()
165+
.get() as u64;
166+
let storage_size = EVENTS_STORAGE_SIZE_DATE
167+
.get_metric_with_label_values(&storage_size_labels)
168+
.unwrap()
169+
.get() as u64;
152170
ch = true;
153171
m.events_ingested = events_ingested;
154172
m.ingestion_size = ingestion_size;

server/src/event.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ impl Event {
8282
self.origin_format,
8383
self.origin_size,
8484
num_rows,
85+
self.parsed_timestamp,
8586
)?;
8687

8788
crate::livetail::LIVETAIL.process(&self.stream_name, &self.rb);

server/src/handlers/http/logstream.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@ use crate::handlers::{
2525
CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY,
2626
};
2727
use crate::metadata::STREAM_INFO;
28+
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
2829
use crate::option::{Mode, CONFIG};
2930
use crate::static_schema::{convert_static_schema_to_arrow_schema, StaticSchema};
31+
use crate::stats::{event_labels_date, storage_size_labels_date, Stats};
3032
use crate::storage::{retention::Retention, LogStream, StorageDir, StreamInfo};
3133
use crate::{
3234
catalog::{self, remove_manifest_from_snapshot},
@@ -482,6 +484,29 @@ pub async fn put_enable_cache(
482484
StatusCode::OK,
483485
))
484486
}
487+
pub async fn get_stats_date(stream_name: &str, date: &str) -> Result<Stats, StreamError> {
488+
let event_labels = event_labels_date(stream_name, "json", date);
489+
let storage_size_labels = storage_size_labels_date(stream_name, date);
490+
let events_ingested = EVENTS_INGESTED_DATE
491+
.get_metric_with_label_values(&event_labels)
492+
.unwrap()
493+
.get() as u64;
494+
let ingestion_size = EVENTS_INGESTED_SIZE_DATE
495+
.get_metric_with_label_values(&event_labels)
496+
.unwrap()
497+
.get() as u64;
498+
let storage_size = EVENTS_STORAGE_SIZE_DATE
499+
.get_metric_with_label_values(&storage_size_labels)
500+
.unwrap()
501+
.get() as u64;
502+
503+
let stats = Stats {
504+
events: events_ingested,
505+
ingestion: ingestion_size,
506+
storage: storage_size,
507+
};
508+
Ok(stats)
509+
}
485510

486511
pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError> {
487512
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
@@ -490,6 +515,25 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
490515
return Err(StreamError::StreamNotFound(stream_name));
491516
}
492517

518+
let query_string = req.query_string();
519+
if !query_string.is_empty() {
520+
let date_key = query_string.split('=').collect::<Vec<&str>>()[0];
521+
let date_value = query_string.split('=').collect::<Vec<&str>>()[1];
522+
if date_key != "date" {
523+
return Err(StreamError::Custom {
524+
msg: "Invalid query parameter".to_string(),
525+
status: StatusCode::BAD_REQUEST,
526+
});
527+
}
528+
529+
if !date_value.is_empty() {
530+
let stats = get_stats_date(&stream_name, date_value).await?;
531+
let stats = serde_json::to_value(stats)?;
532+
533+
return Ok((web::Json(stats), StatusCode::OK));
534+
}
535+
}
536+
493537
let stats = stats::get_current_stats(&stream_name, "json")
494538
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;
495539

server/src/handlers/http/modal/ingest_server.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,6 @@ impl IngestServer {
337337
}
338338

339339
metrics::fetch_stats_from_storage().await;
340-
metrics::reset_daily_metric_from_global();
341340

342341
let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync();
343342
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =

server/src/handlers/http/modal/query_server.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,6 @@ impl QueryServer {
184184

185185
// load data from stats back to prometheus metrics
186186
metrics::fetch_stats_from_storage().await;
187-
metrics::reset_daily_metric_from_global();
188187
// track all parquet files already in the data directory
189188
storage::retention::load_retention_from_global();
190189

server/src/handlers/http/modal/server.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -496,7 +496,6 @@ impl Server {
496496
DASHBOARDS.load().await?;
497497

498498
metrics::fetch_stats_from_storage().await;
499-
metrics::reset_daily_metric_from_global();
500499
storage::retention::load_retention_from_global();
501500

502501
let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync();

server/src/metadata.rs

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
use arrow_array::RecordBatch;
2020
use arrow_schema::{Field, Fields, Schema};
21-
use chrono::Local;
21+
use chrono::{Local, NaiveDateTime};
2222
use itertools::Itertools;
2323
use once_cell::sync::Lazy;
2424
use std::collections::HashMap;
@@ -27,10 +27,10 @@ use std::sync::{Arc, RwLock};
2727
use self::error::stream_info::{CheckAlertError, LoadError, MetadataError};
2828
use crate::alerts::Alerts;
2929
use crate::metrics::{
30-
EVENTS_INGESTED, EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_TODAY, EVENTS_INGESTED_TODAY,
31-
LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE,
30+
EVENTS_INGESTED, EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_DATE,
31+
EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE,
3232
};
33-
use crate::storage::{LogStream, ObjectStorage, StorageDir};
33+
use crate::storage::{LogStream, ObjectStorage, ObjectStoreFormat, StorageDir};
3434
use crate::utils::arrow::MergedRecordReader;
3535
use derive_more::{Deref, DerefMut};
3636

@@ -244,7 +244,8 @@ impl StreamInfo {
244244
let alerts = storage.get_alerts(&stream.name).await?;
245245
let schema = storage.get_schema_on_server_start(&stream.name).await?;
246246
let meta = storage.get_stream_metadata(&stream.name).await?;
247-
247+
let meta_clone = meta.clone();
248+
let stream_name = stream.name.clone();
248249
let schema = update_schema_from_staging(&stream.name, schema);
249250
let schema = HashMap::from_iter(
250251
schema
@@ -268,10 +269,30 @@ impl StreamInfo {
268269
let mut map = self.write().expect(LOCK_EXPECT);
269270

270271
map.insert(stream.name, metadata);
272+
Self::load_daily_metrics(meta_clone, &stream_name);
271273

272274
Ok(())
273275
}
274276

277+
fn load_daily_metrics(meta: ObjectStoreFormat, stream_name: &str) {
278+
let manifests = meta.snapshot.manifest_list;
279+
for manifest in manifests {
280+
let manifest_date = manifest.time_lower_bound.date_naive().to_string();
281+
let events_ingested = manifest.events_ingested;
282+
let ingestion_size = manifest.ingestion_size;
283+
let storage_size = manifest.storage_size;
284+
EVENTS_INGESTED_DATE
285+
.with_label_values(&[stream_name, "json", &manifest_date])
286+
.set(events_ingested as i64);
287+
EVENTS_INGESTED_SIZE_DATE
288+
.with_label_values(&[stream_name, "json", &manifest_date])
289+
.set(ingestion_size as i64);
290+
EVENTS_STORAGE_SIZE_DATE
291+
.with_label_values(&["data", stream_name, "parquet", &manifest_date])
292+
.set(storage_size as i64);
293+
}
294+
}
295+
275296
pub fn list_streams(&self) -> Vec<String> {
276297
self.read()
277298
.expect(LOCK_EXPECT)
@@ -286,18 +307,20 @@ impl StreamInfo {
286307
origin: &'static str,
287308
size: u64,
288309
num_rows: u64,
310+
parsed_timestamp: NaiveDateTime,
289311
) -> Result<(), MetadataError> {
312+
let parsed_date = parsed_timestamp.date().to_string();
290313
EVENTS_INGESTED
291314
.with_label_values(&[stream_name, origin])
292315
.add(num_rows as i64);
293-
EVENTS_INGESTED_TODAY
294-
.with_label_values(&[stream_name, origin])
316+
EVENTS_INGESTED_DATE
317+
.with_label_values(&[stream_name, origin, parsed_date.as_str()])
295318
.add(num_rows as i64);
296319
EVENTS_INGESTED_SIZE
297320
.with_label_values(&[stream_name, origin])
298321
.add(size as i64);
299-
EVENTS_INGESTED_SIZE_TODAY
300-
.with_label_values(&[stream_name, origin])
322+
EVENTS_INGESTED_SIZE_DATE
323+
.with_label_values(&[stream_name, origin, parsed_date.as_str()])
301324
.add(size as i64);
302325
LIFETIME_EVENTS_INGESTED
303326
.with_label_values(&[stream_name, origin])

0 commit comments

Comments
 (0)