Skip to content

Commit e1ee279

Browse files
committed
update querying with cache
1 parent 5d037e5 commit e1ee279

File tree

3 files changed

+75
-39
lines changed

3 files changed

+75
-39
lines changed

server/src/handlers.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ pub mod livetail;
2323
const PREFIX_TAGS: &str = "x-p-tag-";
2424
const PREFIX_META: &str = "x-p-meta-";
2525
const STREAM_NAME_HEADER_KEY: &str = "x-p-stream";
26+
const CACHE_RESULTS_HEADER_KEY: &str = "x-p-cache-results";
27+
const CACHE_VIEW_HEADER_KEY: &str = "x-p-show-cached";
2628
const LOG_SOURCE_KEY: &str = "x-p-log-source";
2729
const TIME_PARTITION_KEY: &str = "x-p-time-partition";
2830
const TIME_PARTITION_LIMIT_KEY: &str = "x-p-time-partition-limit";

server/src/handlers/http/query.rs

Lines changed: 71 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use crate::event::error::EventError;
3434
use crate::handlers::http::fetch_schema;
3535

3636
use crate::event::commit_schema;
37+
use crate::handlers::{CACHE_RESULTS_HEADER_KEY, CACHE_VIEW_HEADER_KEY};
3738
use crate::localcache::CacheError;
3839
use crate::metrics::QUERY_EXECUTE_TIME;
3940
use crate::option::{Mode, CONFIG};
@@ -74,36 +75,59 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
7475
// create a visitor to extract the table name
7576
let mut visitor = TableScanVisitor::default();
7677
let _ = raw_logical_plan.visit(&mut visitor);
77-
let stream = visitor.top();
78+
let stream = visitor
79+
.top()
80+
.ok_or_else(|| QueryError::MalformedQuery("Table Name not found in SQL"))?;
81+
7882
let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size)
7983
.await
8084
.unwrap_or(None);
8185

82-
if let Some(query_cache_manager) = query_cache_manager {
83-
let mut query_cache = query_cache_manager.get_cache(stream).await?;
84-
85-
let (start, end) = parse_human_time(&query_request.start_time, &query_request.end_time)?;
86-
let key = format!(
87-
"{}-{}-{}",
88-
start.to_rfc3339(),
89-
end.to_rfc3339(),
90-
query_request.query.clone()
91-
);
92-
93-
let file_path = query_cache.get_file(key);
94-
if let Some(file_path) = file_path {
95-
let (records, fields) = query_cache.get_cached_records(&file_path).await?;
96-
let response = QueryResponse {
97-
records,
98-
fields,
99-
fill_null: query_request.send_null,
100-
with_fields: query_request.fields,
101-
}
102-
.to_http()?;
86+
let cache_results = req
87+
.headers()
88+
.iter()
89+
.find(|&(key, _)| key == CACHE_RESULTS_HEADER_KEY);
90+
91+
let show_cached = req
92+
.headers()
93+
.iter()
94+
.find(|&(key, _)| key == CACHE_VIEW_HEADER_KEY);
95+
96+
match (show_cached, query_cache_manager) {
97+
(None, None) => {}
98+
(None, Some(_)) => {}
99+
(Some(_), None) => {
100+
log::warn!(
101+
"Instructed to show cached results but Query Caching is not Enabledon Server"
102+
);
103+
}
104+
(Some(_), Some(query_cache_manager)) => {
105+
let mut query_cache = query_cache_manager.get_cache(stream).await?;
106+
107+
let (start, end) =
108+
parse_human_time(&query_request.start_time, &query_request.end_time)?;
109+
let key = format!(
110+
"{}-{}-{}",
111+
start.to_rfc3339(),
112+
end.to_rfc3339(),
113+
query_request.query.clone()
114+
);
115+
116+
let file_path = query_cache.get_file(key);
117+
if let Some(file_path) = file_path {
118+
let (records, fields) = query_cache.get_cached_records(&file_path).await?;
119+
let response = QueryResponse {
120+
records,
121+
fields,
122+
fill_null: query_request.send_null,
123+
with_fields: query_request.fields,
124+
}
125+
.to_http()?;
103126

104-
return Ok(response);
127+
return Ok(response);
128+
}
105129
}
106-
};
130+
}
107131

108132
let tables = visitor.into_inner();
109133

@@ -125,23 +149,33 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
125149

126150
let table_name = query
127151
.first_table_name()
128-
.ok_or_else(|| QueryError::MalformedQuery("No table name found in query".to_string()))?;
152+
.ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?;
129153
authorize_and_set_filter_tags(&mut query, permissions, &table_name)?;
130154

131155
let time = Instant::now();
132156

133157
let (records, fields) = query.execute(table_name.clone()).await?;
134-
// put the rbs to parquet
135-
if let Some(query_cache_manager) = query_cache_manager {
136-
query_cache_manager
137-
.create_parquet_cache(
138-
&table_name,
139-
&records,
140-
query.start.to_rfc3339(),
141-
query.end.to_rfc3339(),
142-
query_request.query,
143-
)
144-
.await?;
158+
159+
match (cache_results, query_cache_manager) {
160+
(None, None) => {}
161+
(None, Some(_)) => {}
162+
(Some(_), None) => {
163+
log::warn!(
164+
"Instructed to cache query results but Query Caching is not Enabled in Server"
165+
);
166+
}
167+
// do cache
168+
(Some(_), Some(query_cache_manager)) => {
169+
query_cache_manager
170+
.create_parquet_cache(
171+
&table_name,
172+
&records,
173+
query.start.to_rfc3339(),
174+
query.end.to_rfc3339(),
175+
query_request.query,
176+
)
177+
.await?
178+
}
145179
}
146180

147181
let response = QueryResponse {
@@ -348,7 +382,7 @@ pub enum QueryError {
348382
#[error("Evern Error: {0}")]
349383
EventError(#[from] EventError),
350384
#[error("Error: {0}")]
351-
MalformedQuery(String),
385+
MalformedQuery(&'static str),
352386
#[error(
353387
r#"Error: Failed to Parse Record Batch into Json
354388
Description: {0}"#

server/src/query.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,8 @@ impl TableScanVisitor {
186186
pub fn into_inner(self) -> Vec<String> {
187187
self.tables
188188
}
189-
pub fn top(&self) -> &str {
190-
self.tables[0].as_ref()
189+
pub fn top(&self) -> Option<&str> {
190+
self.tables.first().map(|s| s.as_ref())
191191
}
192192
}
193193

0 commit comments

Comments
 (0)