Skip to content

Commit ba5b44e

Browse files
committed
impl query caching
1 parent 5f15ea0 commit ba5b44e

File tree

3 files changed

+108
-135
lines changed

3 files changed

+108
-135
lines changed

server/src/handlers.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ const PREFIX_META: &str = "x-p-meta-";
2525
const STREAM_NAME_HEADER_KEY: &str = "x-p-stream";
2626
const CACHE_RESULTS_HEADER_KEY: &str = "x-p-cache-results";
2727
const CACHE_VIEW_HEADER_KEY: &str = "x-p-show-cached";
28+
const USER_ID_HEADER_KEY: &str = "x-p-user-id";
2829
const LOG_SOURCE_KEY: &str = "x-p-log-source";
2930
const TIME_PARTITION_KEY: &str = "x-p-time-partition";
3031
const TIME_PARTITION_LIMIT_KEY: &str = "x-p-time-partition-limit";

server/src/handlers/http/query.rs

Lines changed: 52 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,24 @@
1919
use actix_web::http::header::ContentType;
2020
use actix_web::web::{self, Json};
2121
use actix_web::{FromRequest, HttpRequest, Responder};
22+
use anyhow::anyhow;
2223
use chrono::{DateTime, Utc};
2324
use datafusion::common::tree_node::TreeNode;
2425
use datafusion::error::DataFusionError;
2526
use datafusion::execution::context::SessionState;
2627
use futures_util::Future;
27-
use http::StatusCode;
28+
use http::{HeaderValue, StatusCode};
2829
use std::collections::HashMap;
2930
use std::pin::Pin;
3031
use std::sync::Arc;
3132
use std::time::Instant;
3233

3334
use crate::event::error::EventError;
3435
use crate::handlers::http::fetch_schema;
36+
use arrow_array::RecordBatch;
3537

3638
use crate::event::commit_schema;
37-
use crate::handlers::{CACHE_RESULTS_HEADER_KEY, CACHE_VIEW_HEADER_KEY};
39+
use crate::handlers::{CACHE_RESULTS_HEADER_KEY, CACHE_VIEW_HEADER_KEY, USER_ID_HEADER_KEY};
3840
use crate::localcache::CacheError;
3941
use crate::metrics::QUERY_EXECUTE_TIME;
4042
use crate::option::{Mode, CONFIG};
@@ -83,100 +85,59 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
8385
.await
8486
.unwrap_or(None);
8587

86-
let cache_results = req
88+
let cache_results = req.headers().get(CACHE_RESULTS_HEADER_KEY);
89+
let show_cached = req.headers().get(CACHE_VIEW_HEADER_KEY);
90+
let user_id = req
8791
.headers()
88-
.iter()
89-
.find(|&(key, _)| key == CACHE_RESULTS_HEADER_KEY);
90-
91-
let show_cached = req
92-
.headers()
93-
.iter()
94-
.find(|&(key, _)| key == CACHE_VIEW_HEADER_KEY);
95-
96-
match (show_cached, query_cache_manager) {
97-
(None, None) => {}
98-
(None, Some(_)) => {}
99-
(Some(_), None) => {
100-
log::warn!(
101-
"Instructed to show cached results but Query Caching is not Enabledon Server"
102-
);
103-
}
104-
(Some(_), Some(query_cache_manager)) => {
105-
let mut query_cache = query_cache_manager.get_cache(stream).await?;
106-
107-
let (start, end) =
108-
parse_human_time(&query_request.start_time, &query_request.end_time)?;
109-
let key = format!(
110-
"{}-{}-{}",
111-
start.to_rfc3339(),
112-
end.to_rfc3339(),
113-
query_request.query.clone()
114-
);
115-
116-
let file_path = query_cache.get_file(key);
117-
if let Some(file_path) = file_path {
118-
let (records, fields) = query_cache.get_cached_records(&file_path).await?;
119-
let response = QueryResponse {
120-
records,
121-
fields,
122-
fill_null: query_request.send_null,
123-
with_fields: query_request.fields,
124-
}
125-
.to_http()?;
126-
127-
return Ok(response);
128-
}
129-
}
130-
}
92+
.get(USER_ID_HEADER_KEY)
93+
.ok_or_else(|| QueryError::Anyhow(anyhow!("User Id not provided")))?
94+
.to_str()
95+
.map_err(|err| anyhow!(err))?;
96+
97+
// deal with cached data
98+
if let Ok(results) = get_results_from_cache(
99+
show_cached,
100+
query_cache_manager,
101+
stream,
102+
user_id,
103+
&query_request.start_time,
104+
&query_request.end_time,
105+
&query_request.query,
106+
query_request.send_null,
107+
query_request.fields,
108+
)
109+
.await
110+
{
111+
return results.to_http();
112+
};
131113

132114
let tables = visitor.into_inner();
133-
134-
if CONFIG.parseable.mode == Mode::Query {
135-
for table in tables {
136-
if let Ok(new_schema) = fetch_schema(&table).await {
137-
// commit schema merges the schema internally and updates the schema in storage.
138-
commit_schema_to_storage(&table, new_schema.clone())
139-
.await
140-
.map_err(QueryError::ObjectStorage)?;
141-
commit_schema(&table, Arc::new(new_schema)).map_err(QueryError::EventError)?;
142-
}
143-
}
144-
}
115+
update_schema_when_distributed(tables).await?;
145116
let mut query: LogicalQuery = into_query(&query_request, &session_state).await?;
146117

147-
let creds = extract_session_key_from_req(&req).expect("expects basic auth");
148-
let permissions: Vec<Permission> = Users.get_permissions(&creds);
118+
let creds = extract_session_key_from_req(&req)?;
119+
let permissions = Users.get_permissions(&creds);
149120

150121
let table_name = query
151122
.first_table_name()
152123
.ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?;
124+
153125
authorize_and_set_filter_tags(&mut query, permissions, &table_name)?;
154126

155127
let time = Instant::now();
156-
157128
let (records, fields) = query.execute(table_name.clone()).await?;
158-
159-
match (cache_results, query_cache_manager) {
160-
(None, None) => {}
161-
(None, Some(_)) => {}
162-
(Some(_), None) => {
163-
log::warn!(
164-
"Instructed to cache query results but Query Caching is not Enabled in Server"
165-
);
166-
}
167-
// do cache
168-
(Some(_), Some(query_cache_manager)) => {
169-
query_cache_manager
170-
.create_parquet_cache(
171-
&table_name,
172-
&records,
173-
query.start.to_rfc3339(),
174-
query.end.to_rfc3339(),
175-
query_request.query,
176-
)
177-
.await?
178-
}
179-
}
129+
// deal with cache saving
130+
put_results_in_cache(
131+
cache_results,
132+
user_id,
133+
query_cache_manager,
134+
&table_name,
135+
&records,
136+
query.start.to_rfc3339(),
137+
query.end.to_rfc3339(),
138+
query_request.query,
139+
)
140+
.await;
180141

181142
let response = QueryResponse {
182143
records,
@@ -195,7 +156,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
195156
Ok(response)
196157
}
197158

198-
pub fn authorize_and_set_filter_tags(
159+
fn authorize_and_set_filter_tags(
199160
query: &mut LogicalQuery,
200161
permissions: Vec<Permission>,
201162
table_name: &str,
@@ -379,15 +340,22 @@ pub enum QueryError {
379340
ObjectStorage(#[from] ObjectStorageError),
380341
#[error("Cache Error: {0}")]
381342
CacheError(#[from] CacheError),
343+
#[error("")]
344+
CacheMiss,
382345
#[error("Evern Error: {0}")]
383346
EventError(#[from] EventError),
384347
#[error("Error: {0}")]
385348
MalformedQuery(&'static str),
349+
#[allow(unused)]
386350
#[error(
387351
r#"Error: Failed to Parse Record Batch into Json
388352
Description: {0}"#
389353
)]
390354
JsonParse(String),
355+
#[error("Error: {0}")]
356+
ActixError(#[from] actix_web::Error),
357+
#[error("Error: {0}")]
358+
Anyhow(#[from] anyhow::Error),
391359
}
392360

393361
impl actix_web::ResponseError for QueryError {

0 commit comments

Comments
 (0)