Skip to content

Commit 2d59640

Browse files
committed
add cache endpoints
1 parent 9e9b4c7 commit 2d59640

File tree

7 files changed

+55
-1
lines changed

7 files changed

+55
-1
lines changed

server/src/handlers/http.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use crate::option::CONFIG;
2626
use self::{cluster::get_ingestor_info, query::Query};
2727

2828
pub(crate) mod about;
29+
mod cache;
2930
pub mod cluster;
3031
pub(crate) mod health_check;
3132
pub(crate) mod ingest;
@@ -40,7 +41,6 @@ pub(crate) mod query;
4041
pub(crate) mod rbac;
4142
pub(crate) mod role;
4243
pub mod users;
43-
4444
pub const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760;
4545
pub const API_BASE_PATH: &str = "api";
4646
pub const API_VERSION: &str = "v1";

server/src/handlers/http/ingest.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use crate::handlers::{
3030
LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, LOG_SOURCE_OTEL, PREFIX_META, PREFIX_TAGS, SEPARATOR,
3131
STREAM_NAME_HEADER_KEY,
3232
};
33+
use crate::localcache::CacheError;
3334
use crate::metadata::{self, STREAM_INFO};
3435
use crate::option::{Mode, CONFIG};
3536
use crate::storage::{LogStream, ObjectStorageError};
@@ -445,6 +446,8 @@ pub enum PostError {
445446
FiltersError(#[from] FiltersError),
446447
#[error("Error: {0}")]
447448
DashboardError(#[from] DashboardError),
449+
#[error("Error: {0}")]
450+
CacheError(#[from] CacheError),
448451
}
449452

450453
impl actix_web::ResponseError for PostError {
@@ -465,6 +468,7 @@ impl actix_web::ResponseError for PostError {
465468
PostError::DashboardError(_) => StatusCode::INTERNAL_SERVER_ERROR,
466469
PostError::FiltersError(_) => StatusCode::INTERNAL_SERVER_ERROR,
467470
PostError::Error(_) => StatusCode::INTERNAL_SERVER_ERROR,
471+
PostError::CacheError(_) => StatusCode::INTERNAL_SERVER_ERROR,
468472
}
469473
}
470474

server/src/handlers/http/modal/query_server.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ impl QueryServer {
121121
web::scope(&base_path())
122122
// POST "/query" ==> Get results of the SQL query passed in request body
123123
.service(Server::get_query_factory())
124+
.service(Server::get_cache_webscope())
124125
.service(Server::get_liveness_factory())
125126
.service(Server::get_readiness_factory())
126127
.service(Server::get_about_factory())

server/src/handlers/http/modal/server.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use crate::banner;
2121
use crate::handlers;
2222
use crate::handlers::http::about;
2323
use crate::handlers::http::base_path;
24+
use crate::handlers::http::cache;
2425
use crate::handlers::http::health_check;
2526
use crate::handlers::http::query;
2627
use crate::handlers::http::users::dashboards;
@@ -137,6 +138,7 @@ impl Server {
137138
web::scope(&base_path())
138139
// POST "/query" ==> Get results of the SQL query passed in request body
139140
.service(Self::get_query_factory())
141+
.service(Self::get_cache_webscope())
140142
.service(Self::get_ingest_factory())
141143
.service(Self::get_liveness_factory())
142144
.service(Self::get_readiness_factory())
@@ -218,6 +220,18 @@ impl Server {
218220
web::resource("/query").route(web::post().to(query::query).authorize(Action::Query))
219221
}
220222

223+
pub fn get_cache_webscope() -> Scope {
224+
web::scope("/cache").service(
225+
web::scope("/{user_id}").service(
226+
web::scope("/{stream}").service(
227+
web::resource("")
228+
.route(web::get().to(cache::list).authorize(Action::ListCache))
229+
.route(web::post().to(cache::remove).authorize(Action::RemoveCache)),
230+
),
231+
),
232+
)
233+
}
234+
221235
// get the logstream web scope
222236
pub fn get_logstream_webscope() -> Scope {
223237
web::scope("/logstream")

server/src/localcache.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,4 +261,6 @@ pub enum CacheError {
261261
ParquetError(#[from] ParquetError),
262262
#[error("{0}")]
263263
MetadataError(#[from] MetadataError),
264+
#[error("Error: Cache File Does Not Exist")]
265+
DoesNotExist,
264266
}

server/src/querycache.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,18 @@ impl QueryCache {
8080
self.files.get(&key).cloned()
8181
}
8282

83+
pub fn current_size(&self) -> u64 {
84+
self.current_size
85+
}
86+
87+
pub fn remove(&mut self, key: CacheMetadata) -> Option<PathBuf> {
88+
self.files.remove(&key)
89+
}
90+
91+
pub fn queries(&self) -> Vec<&CacheMetadata> {
92+
self.files.keys().collect_vec()
93+
}
94+
8395
// read the parquet
8496
// return the recordbatches
8597
pub async fn get_cached_records(
@@ -238,6 +250,23 @@ impl QueryCacheManager {
238250
Ok(cache)
239251
}
240252

253+
pub async fn remove_from_cache(
254+
&self,
255+
key: CacheMetadata,
256+
stream: &str,
257+
user_id: &str,
258+
) -> Result<(), CacheError> {
259+
let mut cache = self.get_cache(stream, user_id).await?;
260+
261+
if let Some(remove_result) = cache.remove(key) {
262+
self.put_cache(stream, &cache, user_id).await?;
263+
tokio::spawn(fs::remove_file(remove_result));
264+
Ok(())
265+
} else {
266+
Err(CacheError::DoesNotExist)
267+
}
268+
}
269+
241270
pub async fn put_cache(
242271
&self,
243272
stream: &str,

server/src/rbac/role.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ pub enum Action {
5858
GetFilter,
5959
CreateFilter,
6060
DeleteFilter,
61+
ListCache,
62+
RemoveCache,
6163
}
6264

6365
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
@@ -123,6 +125,8 @@ impl RoleBuilder {
123125
| Action::ListFilter
124126
| Action::CreateFilter
125127
| Action::DeleteFilter
128+
| Action::ListCache
129+
| Action::RemoveCache
126130
| Action::GetAnalytics => Permission::Unit(action),
127131
Action::Ingest
128132
| Action::GetSchema

0 commit comments

Comments
 (0)