Skip to content

Commit ad7f67e

Browse files
fix: stats enhancement (#799)
Till now the ingested events count and the ingested size would be fixed for the lifetime of a stream. Even if older data was deleted by retention stats were not updated all, leading to confusion. This PR adds a new way to track stats with three different qualifiers for each stat type. Now we have lifetime, current and deleted qualifiers for events ingested, ingested size and stored size, so that the users have much better view of their data and its compression, storage used etc. This change will be enhanced further with these updated metrics visible in console. Fixes #788
1 parent 4023b56 commit ad7f67e

19 files changed

+904
-168
lines changed

server/src/analytics.rs

Lines changed: 101 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use crate::option::{Mode, CONFIG};
2424
use crate::storage;
2525
use crate::{metadata, stats};
2626

27+
use crate::stats::Stats;
2728
use actix_web::{web, HttpRequest, Responder};
2829
use chrono::{DateTime, Utc};
2930
use clokwerk::{AsyncScheduler, Interval};
@@ -70,6 +71,12 @@ pub struct Report {
7071
total_events_count: u64,
7172
total_json_bytes: u64,
7273
total_parquet_bytes: u64,
74+
current_events_count: u64,
75+
current_json_bytes: u64,
76+
current_parquet_bytes: u64,
77+
deleted_events_count: u64,
78+
deleted_json_bytes: u64,
79+
deleted_parquet_bytes: u64,
7380
metrics: HashMap<String, Value>,
7481
}
7582

@@ -112,6 +119,12 @@ impl Report {
112119
total_events_count: ingestor_metrics.3,
113120
total_json_bytes: ingestor_metrics.4,
114121
total_parquet_bytes: ingestor_metrics.5,
122+
current_events_count: ingestor_metrics.6,
123+
current_json_bytes: ingestor_metrics.7,
124+
current_parquet_bytes: ingestor_metrics.8,
125+
deleted_events_count: ingestor_metrics.9,
126+
deleted_json_bytes: ingestor_metrics.10,
127+
deleted_parquet_bytes: ingestor_metrics.11,
115128
metrics: build_metrics().await,
116129
})
117130
}
@@ -132,26 +145,70 @@ fn total_streams() -> usize {
132145
metadata::STREAM_INFO.list_streams().len()
133146
}
134147

135-
fn total_event_stats() -> (u64, u64, u64) {
148+
fn total_event_stats() -> (Stats, Stats, Stats) {
136149
let mut total_events: u64 = 0;
137150
let mut total_parquet_bytes: u64 = 0;
138151
let mut total_json_bytes: u64 = 0;
139152

153+
let mut current_events: u64 = 0;
154+
let mut current_parquet_bytes: u64 = 0;
155+
let mut current_json_bytes: u64 = 0;
156+
157+
let mut deleted_events: u64 = 0;
158+
let mut deleted_parquet_bytes: u64 = 0;
159+
let mut deleted_json_bytes: u64 = 0;
160+
140161
for stream in metadata::STREAM_INFO.list_streams() {
141162
let Some(stats) = stats::get_current_stats(&stream, "json") else {
142163
continue;
143164
};
144-
total_events += stats.events;
145-
total_parquet_bytes += stats.storage;
146-
total_json_bytes += stats.ingestion;
165+
total_events += stats.lifetime_stats.events;
166+
total_parquet_bytes += stats.lifetime_stats.storage;
167+
total_json_bytes += stats.lifetime_stats.ingestion;
168+
169+
current_events += stats.current_stats.events;
170+
current_parquet_bytes += stats.current_stats.storage;
171+
current_json_bytes += stats.current_stats.ingestion;
172+
173+
deleted_events += stats.deleted_stats.events;
174+
deleted_parquet_bytes += stats.deleted_stats.storage;
175+
deleted_json_bytes += stats.deleted_stats.ingestion;
147176
}
148-
(total_events, total_json_bytes, total_parquet_bytes)
177+
178+
(
179+
Stats {
180+
events: total_events,
181+
ingestion: total_json_bytes,
182+
storage: total_parquet_bytes,
183+
},
184+
Stats {
185+
events: current_events,
186+
ingestion: current_json_bytes,
187+
storage: current_parquet_bytes,
188+
},
189+
Stats {
190+
events: deleted_events,
191+
ingestion: deleted_json_bytes,
192+
storage: deleted_parquet_bytes,
193+
},
194+
)
149195
}
150196

151-
async fn fetch_ingestors_metrics() -> anyhow::Result<(u64, u64, usize, u64, u64, u64)> {
197+
async fn fetch_ingestors_metrics(
198+
) -> anyhow::Result<(u64, u64, usize, u64, u64, u64, u64, u64, u64, u64, u64, u64)> {
152199
let event_stats = total_event_stats();
153-
let mut node_metrics =
154-
NodeMetrics::new(total_streams(), event_stats.0, event_stats.1, event_stats.2);
200+
let mut node_metrics = NodeMetrics::new(
201+
total_streams(),
202+
event_stats.0.events,
203+
event_stats.0.ingestion,
204+
event_stats.0.storage,
205+
event_stats.1.events,
206+
event_stats.1.ingestion,
207+
event_stats.1.storage,
208+
event_stats.2.events,
209+
event_stats.2.ingestion,
210+
event_stats.2.storage,
211+
);
155212

156213
let mut vec = vec![];
157214
let mut active_ingestors = 0u64;
@@ -198,6 +255,12 @@ async fn fetch_ingestors_metrics() -> anyhow::Result<(u64, u64, usize, u64, u64,
198255
node_metrics.total_events_count,
199256
node_metrics.total_json_bytes,
200257
node_metrics.total_parquet_bytes,
258+
node_metrics.current_events_count,
259+
node_metrics.current_json_bytes,
260+
node_metrics.current_parquet_bytes,
261+
node_metrics.deleted_events_count,
262+
node_metrics.deleted_json_bytes,
263+
node_metrics.deleted_parquet_bytes,
201264
))
202265
}
203266

@@ -255,30 +318,56 @@ struct NodeMetrics {
255318
total_events_count: u64,
256319
total_json_bytes: u64,
257320
total_parquet_bytes: u64,
321+
current_events_count: u64,
322+
current_json_bytes: u64,
323+
current_parquet_bytes: u64,
324+
deleted_events_count: u64,
325+
deleted_json_bytes: u64,
326+
deleted_parquet_bytes: u64,
258327
}
259328

260329
impl NodeMetrics {
261330
fn build() -> Self {
262331
let event_stats = total_event_stats();
263332
Self {
264333
stream_count: total_streams(),
265-
total_events_count: event_stats.0,
266-
total_json_bytes: event_stats.1,
267-
total_parquet_bytes: event_stats.2,
334+
total_events_count: event_stats.0.events,
335+
total_json_bytes: event_stats.0.ingestion,
336+
total_parquet_bytes: event_stats.0.storage,
337+
338+
current_events_count: event_stats.1.events,
339+
current_json_bytes: event_stats.1.ingestion,
340+
current_parquet_bytes: event_stats.1.storage,
341+
342+
deleted_events_count: event_stats.2.events,
343+
deleted_json_bytes: event_stats.2.ingestion,
344+
deleted_parquet_bytes: event_stats.2.storage,
268345
}
269346
}
270-
347+
#[allow(clippy::too_many_arguments)]
271348
fn new(
272349
stream_count: usize,
273350
total_events_count: u64,
274351
total_json_bytes: u64,
275352
total_parquet_bytes: u64,
353+
current_events_count: u64,
354+
current_json_bytes: u64,
355+
current_parquet_bytes: u64,
356+
deleted_events_count: u64,
357+
deleted_json_bytes: u64,
358+
deleted_parquet_bytes: u64,
276359
) -> Self {
277360
Self {
278361
stream_count,
279362
total_events_count,
280363
total_json_bytes,
281364
total_parquet_bytes,
365+
current_events_count,
366+
current_json_bytes,
367+
current_parquet_bytes,
368+
deleted_events_count,
369+
deleted_json_bytes,
370+
deleted_parquet_bytes,
282371
}
283372
}
284373

server/src/catalog.rs

Lines changed: 48 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ use std::{io::ErrorKind, sync::Arc};
2020

2121
use self::{column::Column, snapshot::ManifestItem};
2222
use crate::handlers::http::base_path_without_preceding_slash;
23+
use crate::metrics::{EVENTS_INGESTED_SIZE_TODAY, EVENTS_INGESTED_TODAY, STORAGE_SIZE_TODAY};
2324
use crate::option::CONFIG;
25+
use crate::stats::{event_labels, storage_size_labels, update_deleted_stats};
2426
use crate::{
2527
catalog::manifest::Manifest,
2628
event::DEFAULT_TIMESTAMP_KEY,
@@ -101,13 +103,28 @@ pub async fn update_snapshot(
101103
change: manifest::File,
102104
) -> Result<(), ObjectStorageError> {
103105
// get current snapshot
106+
let event_labels = event_labels(stream_name, "json");
107+
let storage_size_labels = storage_size_labels(stream_name);
108+
let events_ingested = EVENTS_INGESTED_TODAY
109+
.get_metric_with_label_values(&event_labels)
110+
.unwrap()
111+
.get() as u64;
112+
let ingestion_size = EVENTS_INGESTED_SIZE_TODAY
113+
.get_metric_with_label_values(&event_labels)
114+
.unwrap()
115+
.get() as u64;
116+
let storage_size = STORAGE_SIZE_TODAY
117+
.get_metric_with_label_values(&storage_size_labels)
118+
.unwrap()
119+
.get() as u64;
120+
104121
let mut meta = storage.get_object_store_format(stream_name).await?;
105122
let meta_clone = meta.clone();
106123
let manifests = &mut meta.snapshot.manifest_list;
107-
let time_partition: Option<String> = meta_clone.time_partition;
124+
let time_partition = &meta_clone.time_partition;
108125
let lower_bound = match time_partition {
109126
Some(time_partition) => {
110-
let (lower_bound, _) = get_file_bounds(&change, time_partition);
127+
let (lower_bound, _) = get_file_bounds(&change, time_partition.to_string());
111128
lower_bound
112129
}
113130
None => {
@@ -129,12 +146,18 @@ pub async fn update_snapshot(
129146
let path = partition_path(stream_name, info.time_lower_bound, info.time_upper_bound);
130147

131148
let mut ch = false;
132-
for m in manifests.iter() {
149+
for m in manifests.iter_mut() {
133150
let p = manifest_path("").to_string();
134151
if m.manifest_path.contains(&p) {
135152
ch = true;
153+
m.events_ingested = events_ingested;
154+
m.ingestion_size = ingestion_size;
155+
m.storage_size = storage_size;
136156
}
137157
}
158+
159+
meta.snapshot.manifest_list = manifests.to_vec();
160+
storage.put_snapshot(stream_name, meta.snapshot).await?;
138161
if ch {
139162
if let Some(mut manifest) = storage.get_manifest(&path).await? {
140163
manifest.apply_change(change);
@@ -148,7 +171,10 @@ pub async fn update_snapshot(
148171
storage.clone(),
149172
stream_name,
150173
false,
151-
meta,
174+
meta_clone,
175+
events_ingested,
176+
ingestion_size,
177+
storage_size,
152178
)
153179
.await?;
154180
}
@@ -159,7 +185,10 @@ pub async fn update_snapshot(
159185
storage.clone(),
160186
stream_name,
161187
true,
162-
meta,
188+
meta_clone,
189+
events_ingested,
190+
ingestion_size,
191+
storage_size,
163192
)
164193
.await?;
165194
}
@@ -170,21 +199,28 @@ pub async fn update_snapshot(
170199
storage.clone(),
171200
stream_name,
172201
true,
173-
meta,
202+
meta_clone,
203+
events_ingested,
204+
ingestion_size,
205+
storage_size,
174206
)
175207
.await?;
176208
}
177209

178210
Ok(())
179211
}
180212

213+
#[allow(clippy::too_many_arguments)]
181214
async fn create_manifest(
182215
lower_bound: DateTime<Utc>,
183216
change: manifest::File,
184217
storage: Arc<dyn ObjectStorage + Send>,
185218
stream_name: &str,
186219
update_snapshot: bool,
187220
mut meta: ObjectStoreFormat,
221+
events_ingested: u64,
222+
ingestion_size: u64,
223+
storage_size: u64,
188224
) -> Result<(), ObjectStorageError> {
189225
let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc();
190226
let upper_bound = lower_bound
@@ -216,6 +252,9 @@ async fn create_manifest(
216252
manifest_path: path.to_string(),
217253
time_lower_bound: lower_bound,
218254
time_upper_bound: upper_bound,
255+
events_ingested,
256+
ingestion_size,
257+
storage_size,
219258
};
220259
manifests.push(new_snapshot_entry);
221260
meta.snapshot.manifest_list = manifests;
@@ -233,6 +272,8 @@ pub async fn remove_manifest_from_snapshot(
233272
if !dates.is_empty() {
234273
// get current snapshot
235274
let mut meta = storage.get_object_store_format(stream_name).await?;
275+
let meta_for_stats = meta.clone();
276+
update_deleted_stats(storage.clone(), stream_name, meta_for_stats, dates.clone()).await?;
236277
let manifests = &mut meta.snapshot.manifest_list;
237278
// Filter out items whose manifest_path contains any of the dates_to_delete
238279
manifests.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date)));
@@ -308,8 +349,6 @@ pub async fn get_first_event(
308349
);
309350
// Convert dates vector to Bytes object
310351
let dates_bytes = Bytes::from(serde_json::to_vec(&dates).unwrap());
311-
// delete the stream
312-
313352
let ingestor_first_event_at =
314353
handlers::http::cluster::send_retention_cleanup_request(
315354
&url,
@@ -333,7 +372,7 @@ pub async fn get_first_event(
333372

334373
/// Partition the path to which this manifest belongs.
335374
/// Useful when uploading the manifest file.
336-
fn partition_path(
375+
pub fn partition_path(
337376
stream: &str,
338377
lower_bound: DateTime<Utc>,
339378
upper_bound: DateTime<Utc>,

server/src/catalog/snapshot.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use chrono::{DateTime, Utc};
2222

2323
use crate::query::PartialTimeFilter;
2424

25-
pub const CURRENT_SNAPSHOT_VERSION: &str = "v1";
25+
pub const CURRENT_SNAPSHOT_VERSION: &str = "v2";
2626
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
2727
pub struct Snapshot {
2828
pub version: String,
@@ -76,4 +76,7 @@ pub struct ManifestItem {
7676
pub manifest_path: String,
7777
pub time_lower_bound: DateTime<Utc>,
7878
pub time_upper_bound: DateTime<Utc>,
79+
pub events_ingested: u64,
80+
pub ingestion_size: u64,
81+
pub storage_size: u64,
7982
}

0 commit comments

Comments
 (0)