Skip to content

Commit 89d2cda

Browse files
committed
add ability to store filters and dashboards in memory
1 parent ba5b44e commit 89d2cda

File tree

8 files changed

+185
-37
lines changed

8 files changed

+185
-37
lines changed

server/src/handlers/http/modal/query_server.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ use crate::handlers::http::middleware::RouteExt;
2222
use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION};
2323

2424
use crate::rbac::role::Action;
25+
use crate::users::dashboards::DASHBOARDS;
26+
use crate::users::filters::FILTERS;
2527
use crate::{analytics, banner, metadata, metrics, migration, rbac, storage};
2628
use actix_web::web;
2729
use actix_web::web::ServiceConfig;
@@ -175,6 +177,10 @@ impl QueryServer {
175177
log::warn!("could not populate local metadata. {:?}", e);
176178
}
177179

180+
FILTERS.load().await?;
181+
DASHBOARDS.load().await?;
182+
183+
178184
// load data from stats back to prometheus metrics
179185
metrics::fetch_stats_from_storage().await;
180186
metrics::reset_daily_metric_from_global();

server/src/handlers/http/modal/server.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ use crate::migration;
3434
use crate::rbac;
3535
use crate::storage;
3636
use crate::sync;
37+
use crate::users::dashboards::DASHBOARDS;
38+
use crate::users::filters::FILTERS;
3739
use std::sync::Arc;
3840

3941
use actix_web::web::resource;
@@ -470,6 +472,10 @@ impl Server {
470472
log::warn!("could not populate local metadata. {:?}", err);
471473
}
472474

475+
FILTERS.load().await?;
476+
DASHBOARDS.load().await?;
477+
478+
473479
metrics::fetch_stats_from_storage().await;
474480
metrics::reset_daily_metric_from_global();
475481
storage::retention::load_retention_from_global();

server/src/handlers/http/users/dashboards.rs

Lines changed: 10 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::{
22
handlers::http::ingest::PostError,
33
option::CONFIG,
44
storage::{object_storage::dashboard_path, ObjectStorageError},
5+
users::dashboards::{Dashboard, DASHBOARDS},
56
};
67
use actix_web::{http::header::ContentType, web, HttpRequest, HttpResponse, Responder};
78
use bytes::Bytes;
@@ -44,13 +45,18 @@ pub async fn get(req: HttpRequest) -> Result<impl Responder, DashboardError> {
4445
.get("dashboard_id")
4546
.ok_or(DashboardError::Metadata("No Dashboard Id Provided"))?;
4647

48+
if let Some(dashboard) = DASHBOARDS.find(dash_id) {
49+
return Ok((web::Json(dashboard), StatusCode::OK));
50+
}
51+
52+
//if dashboard is not in memory fetch from s3
4753
let dash_file_path = dashboard_path(user_id, &format!("{}.json", dash_id));
4854
let resource = CONFIG
4955
.storage()
5056
.get_object_store()
5157
.get_object(&dash_file_path)
5258
.await?;
53-
let resource = serde_json::from_slice::<JsonValue>(&resource)?;
59+
let resource = serde_json::from_slice::<Dashboard>(&resource)?;
5460

5561
Ok((web::Json(resource), StatusCode::OK))
5662
}
@@ -68,6 +74,9 @@ pub async fn post(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostErr
6874

6975
let dash_file_path = dashboard_path(user_id, &format!("{}.json", dash_id));
7076

77+
let dashboard = serde_json::from_slice::<Dashboard>(&body)?;
78+
DASHBOARDS.update(dashboard);
79+
7180
let store = CONFIG.storage().get_object_store();
7281
store.put_object(&dash_file_path, body).await?;
7382

@@ -93,32 +102,6 @@ pub async fn delete(req: HttpRequest) -> Result<HttpResponse, PostError> {
93102
Ok(HttpResponse::Ok().finish())
94103
}
95104

96-
// #[derive(Debug, Serialize, Deserialize)]
97-
// pub struct Dashboard {
98-
// version: String,
99-
// name: String,
100-
// id: String,
101-
// time_filter: TimeFilter
102-
// refresh_interval: u64,
103-
// pannels: Vec<Pannel>,
104-
// }
105-
//
106-
// #[derive(Debug, Serialize, Deserialize)]
107-
// pub struct Pannel {
108-
// stream_name: String,
109-
// query: String,
110-
// chart_type: String,
111-
// columns: Vec<String>,
112-
// headers: Vec<String>,
113-
// dimensions: (u64, u64),
114-
// }
115-
//
116-
// #[derive(Debug, Serialize, Deserialize)]
117-
// pub struct TimeFilter {
118-
// to: String,
119-
// from: String
120-
// }
121-
122105
#[derive(Debug, thiserror::Error)]
123106
pub enum DashboardError {
124107
#[error("Failed to connect to storage: {0}")]

server/src/handlers/http/users/filters.rs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::{
22
handlers::{http::ingest::PostError, STREAM_NAME_HEADER_KEY},
33
option::CONFIG,
44
storage::{object_storage::filter_path, ObjectStorageError},
5+
users::filters::{Filter, FILTERS},
56
};
67
use actix_web::{http::header::ContentType, web, HttpRequest, HttpResponse, Responder};
78
use bytes::Bytes;
@@ -62,14 +63,19 @@ pub async fn get(req: HttpRequest) -> Result<impl Responder, FiltersError> {
6263
.to_str()
6364
.map_err(|_| FiltersError::Metadata("Non ASCII Stream Name Provided"))?;
6465

66+
if let Some(filter) = FILTERS.find(filt_id) {
67+
return Ok((web::Json(filter), StatusCode::OK));
68+
}
69+
70+
// if it is not in memory go to s3
6571
let path = filter_path(user_id, stream_name, &format!("{}.json", filt_id));
6672
let resource = CONFIG
6773
.storage()
6874
.get_object_store()
6975
.get_object(&path)
7076
.await?;
7177

72-
let resource = serde_json::from_slice::<JsonValue>(&resource)?;
78+
let resource = serde_json::from_slice::<Filter>(&resource)?;
7379

7480
Ok((web::Json(resource), StatusCode::OK))
7581
}
@@ -95,6 +101,8 @@ pub async fn post(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostErr
95101
.map_err(|_| FiltersError::Metadata("Non ASCII Stream Name Provided"))?;
96102

97103
let path = filter_path(user_id, stream_name, &format!("{}.json", filt_id));
104+
let filter: Filter = serde_json::from_slice(&body)?;
105+
FILTERS.update(filter);
98106

99107
let store = CONFIG.storage().get_object_store();
100108
store.put_object(&path, body).await?;
@@ -154,12 +162,3 @@ impl actix_web::ResponseError for FiltersError {
154162
.body(self.to_string())
155163
}
156164
}
157-
158-
// #[derive(Debug, Serialize, Deserialize)]
159-
// pub struct Filters {
160-
// version: String,
161-
// stream_name: String,
162-
// filter_name: String,
163-
// query: String,
164-
// time-filter: TimeFilter
165-
// }

server/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ mod static_schema;
3939
mod stats;
4040
mod storage;
4141
mod sync;
42+
mod users;
4243
mod utils;
4344
mod validator;
4445

server/src/users/dashboards.rs

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
use std::sync::RwLock;
2+
3+
use once_cell::sync::Lazy;
4+
use relative_path::RelativePathBuf;
5+
use serde::{Deserialize, Serialize};
6+
7+
use crate::{handlers::http::users::USERS_ROOT_DIR, metadata::LOCK_EXPECT, option::CONFIG};
8+
9+
use super::TimeFilter;
10+
11+
pub static DASHBOARDS: Lazy<Dashboards> = Lazy::new(Dashboards::default);
12+
13+
#[derive(Debug, Serialize, Deserialize, Default, Clone)]
14+
pub struct Pannel {
15+
stream_name: String,
16+
query: String,
17+
chart_type: String,
18+
columns: Vec<String>,
19+
headers: Vec<String>,
20+
dimensions: (u64, u64),
21+
}
22+
23+
#[derive(Debug, Serialize, Deserialize, Default, Clone)]
24+
pub struct Dashboard {
25+
version: String,
26+
name: String,
27+
id: String,
28+
time_filter: TimeFilter,
29+
refresh_interval: u64,
30+
pannels: Vec<Pannel>,
31+
}
32+
33+
impl Dashboard {
34+
pub fn dashboard_id(&self) -> &str {
35+
&self.id
36+
}
37+
}
38+
39+
#[derive(Default)]
40+
pub struct Dashboards(RwLock<Vec<Dashboard>>);
41+
42+
impl Dashboards {
43+
pub async fn load(&self) -> anyhow::Result<()> {
44+
let mut this = vec![];
45+
let path = RelativePathBuf::from(USERS_ROOT_DIR);
46+
let store = CONFIG.storage().get_object_store();
47+
let objs = store
48+
.get_objects(Some(&path), Box::new(|path| path.ends_with(".json")))
49+
.await?;
50+
51+
for obj in objs {
52+
if let Ok(filter) = serde_json::from_slice::<Dashboard>(&obj) {
53+
this.push(filter);
54+
}
55+
}
56+
57+
let mut s = self.0.write().expect(LOCK_EXPECT);
58+
s.append(&mut this);
59+
60+
Ok(())
61+
}
62+
63+
pub fn update(&self, dashboard: Dashboard) {
64+
let mut s = self.0.write().expect(LOCK_EXPECT);
65+
66+
s.push(dashboard);
67+
}
68+
69+
pub fn find(&self, dashboard_id: &str) -> Option<Dashboard> {
70+
self.0
71+
.read()
72+
.expect(LOCK_EXPECT)
73+
.iter()
74+
.find(|dashboard| dashboard.dashboard_id() == dashboard_id)
75+
.cloned()
76+
}
77+
}

server/src/users/filters.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
use std::sync::RwLock;
2+
3+
use once_cell::sync::Lazy;
4+
use relative_path::RelativePathBuf;
5+
use serde::{Deserialize, Serialize};
6+
7+
use super::TimeFilter;
8+
use crate::{handlers::http::users::USERS_ROOT_DIR, metadata::LOCK_EXPECT, option::CONFIG};
9+
10+
pub static FILTERS: Lazy<Filters> = Lazy::new(Filters::default);
11+
12+
#[derive(Debug, Serialize, Deserialize, Clone)]
13+
pub struct Filter {
14+
version: String,
15+
stream_name: String,
16+
filter_name: String,
17+
filter_id: String,
18+
query: String,
19+
time_filter: Option<TimeFilter>,
20+
}
21+
22+
impl Filter {
23+
pub fn filter_id(&self) -> &str {
24+
&self.filter_id
25+
}
26+
}
27+
28+
#[derive(Debug, Default)]
29+
pub struct Filters(RwLock<Vec<Filter>>);
30+
31+
impl Filters {
32+
pub async fn load(&self) -> anyhow::Result<()> {
33+
let mut this = vec![];
34+
let path = RelativePathBuf::from(USERS_ROOT_DIR);
35+
let store = CONFIG.storage().get_object_store();
36+
let objs = store
37+
.get_objects(Some(&path), Box::new(|path| path.ends_with(".json")))
38+
.await?;
39+
40+
for obj in objs {
41+
if let Ok(filter) = serde_json::from_slice::<Filter>(&obj) {
42+
this.push(filter);
43+
}
44+
}
45+
46+
let mut s = self.0.write().expect(LOCK_EXPECT);
47+
s.append(&mut this);
48+
49+
Ok(())
50+
}
51+
52+
pub fn update(&self, filter: Filter) {
53+
let mut s = self.0.write().expect(LOCK_EXPECT);
54+
55+
s.push(filter);
56+
}
57+
58+
pub fn find(&self, filter_id: &str) -> Option<Filter> {
59+
self.0
60+
.read()
61+
.expect(LOCK_EXPECT)
62+
.iter()
63+
.find(|filter| filter.filter_id() == filter_id)
64+
.cloned()
65+
}
66+
}

server/src/users/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
pub mod dashboards;
2+
pub mod filters;
3+
4+
use serde::{Deserialize, Serialize};
5+
6+
#[derive(Debug, Serialize, Deserialize, Default, Clone)]
7+
pub struct TimeFilter {
8+
to: String,
9+
from: String,
10+
}

0 commit comments

Comments
 (0)