Skip to content

Commit e168cdd

Browse files
committed
update arrow flight server to perform query cache
1 parent 419c5e2 commit e168cdd

File tree

7 files changed

+133
-73
lines changed

7 files changed

+133
-73
lines changed

server/src/handlers/airplane.rs

Lines changed: 71 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,13 @@
1717
*/
1818

1919
use arrow_array::RecordBatch;
20-
use arrow_flight::encode::FlightDataEncoderBuilder;
2120
use arrow_flight::flight_service_server::FlightServiceServer;
2221
use arrow_flight::PollInfo;
2322
use arrow_schema::ArrowError;
2423

2524
use datafusion::common::tree_node::TreeNode;
2625
use serde_json::json;
2726
use std::net::SocketAddr;
28-
use std::sync::Arc;
2927
use std::time::Instant;
3028
use tonic::codec::CompressionEncoding;
3129

@@ -34,34 +32,38 @@ use futures_util::{Future, TryFutureExt};
3432
use tonic::transport::{Identity, Server, ServerTlsConfig};
3533
use tonic_web::GrpcWebLayer;
3634

37-
use crate::event::commit_schema;
3835
use crate::handlers::http::cluster::get_ingestor_info;
39-
use crate::handlers::http::fetch_schema;
4036

37+
use crate::handlers::{CACHE_RESULTS_HEADER_KEY, CACHE_VIEW_HEADER_KEY, USER_ID_HEADER_KEY};
4138
use crate::metrics::QUERY_EXECUTE_TIME;
42-
use crate::option::{Mode, CONFIG};
39+
use crate::option::CONFIG;
4340

4441
use crate::handlers::livetail::cross_origin_config;
4542

46-
use crate::handlers::http::query::{authorize_and_set_filter_tags, into_query};
43+
use crate::handlers::http::query::{
44+
authorize_and_set_filter_tags, into_query, put_results_in_cache, update_schema_when_distributed,
45+
};
4746
use crate::query::{TableScanVisitor, QUERY_SESSION};
48-
use crate::storage::object_storage::commit_schema_to_storage;
47+
use crate::querycache::QueryCacheManager;
4948
use crate::utils::arrow::flight::{
50-
append_temporary_events, get_query_from_ticket, run_do_get_rpc, send_to_ingester,
49+
append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc,
50+
send_to_ingester,
5151
};
5252
use arrow_flight::{
5353
flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData,
5454
FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc,
5555
SchemaResult, Ticket,
5656
};
5757
use arrow_ipc::writer::IpcWriteOptions;
58-
use futures::{stream, TryStreamExt};
58+
use futures::stream;
5959
use tonic::{Request, Response, Status, Streaming};
6060

6161
use crate::handlers::livetail::extract_session_key;
6262
use crate::metadata::STREAM_INFO;
6363
use crate::rbac::Users;
6464

65+
use super::http::query::get_results_from_cache;
66+
6567
#[derive(Clone, Debug)]
6668
pub struct AirServiceImpl {}
6769

@@ -130,7 +132,7 @@ impl FlightService for AirServiceImpl {
130132
async fn do_get(&self, req: Request<Ticket>) -> Result<Response<Self::DoGetStream>, Status> {
131133
let key = extract_session_key(req.metadata())?;
132134

133-
let ticket = get_query_from_ticket(req)?;
135+
let ticket = get_query_from_ticket(&req)?;
134136

135137
log::info!("query requested to airplane: {:?}", ticket);
136138

@@ -150,32 +152,57 @@ impl FlightService for AirServiceImpl {
150152
let mut visitor = TableScanVisitor::default();
151153
let _ = raw_logical_plan.visit(&mut visitor);
152154

153-
let tables = visitor.into_inner();
154-
155-
if CONFIG.parseable.mode == Mode::Query {
156-
// using http to get the schema. may update to use flight later
157-
for table in tables {
158-
if let Ok(new_schema) = fetch_schema(&table).await {
159-
// commit schema merges the schema internally and updates the schema in storage.
160-
commit_schema_to_storage(&table, new_schema.clone())
161-
.await
162-
.map_err(|err| Status::internal(err.to_string()))?;
163-
commit_schema(&table, Arc::new(new_schema))
164-
.map_err(|err| Status::internal(err.to_string()))?;
165-
}
166-
}
155+
let streams = visitor.into_inner();
156+
157+
let query_cache_manager = QueryCacheManager::global(CONFIG.parseable.query_cache_size)
158+
.await
159+
.unwrap_or(None);
160+
161+
let cache_results = req
162+
.metadata()
163+
.get(CACHE_RESULTS_HEADER_KEY)
164+
.and_then(|value| value.to_str().ok()); // I dont think we need to own this.
165+
166+
let show_cached = req
167+
.metadata()
168+
.get(CACHE_VIEW_HEADER_KEY)
169+
.and_then(|value| value.to_str().ok());
170+
171+
let user_id = req
172+
.metadata()
173+
.get(USER_ID_HEADER_KEY)
174+
.and_then(|value| value.to_str().ok());
175+
let stream_name = streams
176+
.first()
177+
.ok_or_else(|| Status::aborted("Malformed SQL Provided, Table Name Not Found"))?
178+
.to_owned();
179+
180+
// send the cached results
181+
if let Ok(cache_results) = get_results_from_cache(
182+
show_cached,
183+
query_cache_manager,
184+
&stream_name,
185+
user_id,
186+
&ticket.start_time,
187+
&ticket.end_time,
188+
&ticket.query,
189+
ticket.send_null,
190+
ticket.fields,
191+
)
192+
.await
193+
{
194+
return cache_results.into_flight();
167195
}
168196

197+
update_schema_when_distributed(streams)
198+
.await
199+
.map_err(|err| Status::internal(err.to_string()))?;
200+
169201
// map payload to query
170202
let mut query = into_query(&ticket, &session_state)
171203
.await
172204
.map_err(|_| Status::internal("Failed to parse query"))?;
173205

174-
// if table name is not present it is a Malformed Query
175-
let stream_name = query
176-
.first_table_name()
177-
.ok_or_else(|| Status::invalid_argument("Malformed Query"))?;
178-
179206
let event =
180207
if send_to_ingester(query.start.timestamp_millis(), query.end.timestamp_millis()) {
181208
let sql = format!("select * from {}", &stream_name);
@@ -210,11 +237,23 @@ impl FlightService for AirServiceImpl {
210237
Status::permission_denied("User Does not have permission to access this")
211238
})?;
212239
let time = Instant::now();
213-
let (results, _) = query
240+
let (records, _) = query
214241
.execute(stream_name.clone())
215242
.await
216243
.map_err(|err| Status::internal(err.to_string()))?;
217244

245+
put_results_in_cache(
246+
cache_results,
247+
user_id,
248+
query_cache_manager,
249+
&stream_name,
250+
&records,
251+
query.start.to_rfc3339(),
252+
query.end.to_rfc3339(),
253+
ticket.query,
254+
)
255+
.await;
256+
218257
/*
219258
* INFO: No returning the schema with the data.
220259
* kept it in case it needs to be sent in the future.
@@ -226,18 +265,7 @@ impl FlightService for AirServiceImpl {
226265
.collect::<Vec<_>>();
227266
let schema = Schema::try_merge(schemas).map_err(|err| Status::internal(err.to_string()))?;
228267
*/
229-
let input_stream = futures::stream::iter(results.into_iter().map(Ok));
230-
let write_options = IpcWriteOptions::default()
231-
.try_with_compression(Some(arrow_ipc::CompressionType(1)))
232-
.map_err(|err| Status::failed_precondition(err.to_string()))?;
233-
234-
let flight_data_stream = FlightDataEncoderBuilder::new()
235-
.with_max_flight_data_size(usize::MAX)
236-
.with_options(write_options)
237-
// .with_schema(schema.into())
238-
.build(input_stream);
239-
240-
let flight_data_stream = flight_data_stream.map_err(|err| Status::unknown(err.to_string()));
268+
let out = into_flight_data(records);
241269

242270
if let Some(event) = event {
243271
event.clear(&stream_name);
@@ -248,9 +276,7 @@ impl FlightService for AirServiceImpl {
248276
.with_label_values(&[&format!("flight-query-{}", stream_name)])
249277
.observe(time);
250278

251-
Ok(Response::new(
252-
Box::pin(flight_data_stream) as Self::DoGetStream
253-
))
279+
out
254280
}
255281

256282
async fn do_put(

server/src/handlers/http/modal/query_server.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,6 @@ impl QueryServer {
180180
FILTERS.load().await?;
181181
DASHBOARDS.load().await?;
182182

183-
184183
// load data from stats back to prometheus metrics
185184
metrics::fetch_stats_from_storage().await;
186185
metrics::reset_daily_metric_from_global();

server/src/handlers/http/modal/server.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,6 @@ impl Server {
475475
FILTERS.load().await?;
476476
DASHBOARDS.load().await?;
477477

478-
479478
metrics::fetch_stats_from_storage().await;
480479
metrics::reset_daily_metric_from_global();
481480
storage::retention::load_retention_from_global();

server/src/handlers/http/query.rs

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use datafusion::common::tree_node::TreeNode;
2525
use datafusion::error::DataFusionError;
2626
use datafusion::execution::context::SessionState;
2727
use futures_util::Future;
28-
use http::{HeaderValue, StatusCode};
28+
use http::StatusCode;
2929
use std::collections::HashMap;
3030
use std::pin::Pin;
3131
use std::sync::Arc;
@@ -85,9 +85,18 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
8585
.await
8686
.unwrap_or(None);
8787

88-
let cache_results = req.headers().get(CACHE_RESULTS_HEADER_KEY);
89-
let show_cached = req.headers().get(CACHE_VIEW_HEADER_KEY);
90-
let user_id = req.headers().get(USER_ID_HEADER_KEY);
88+
let cache_results = req
89+
.headers()
90+
.get(CACHE_RESULTS_HEADER_KEY)
91+
.and_then(|value| value.to_str().ok());
92+
let show_cached = req
93+
.headers()
94+
.get(CACHE_VIEW_HEADER_KEY)
95+
.and_then(|value| value.to_str().ok());
96+
let user_id = req
97+
.headers()
98+
.get(USER_ID_HEADER_KEY)
99+
.and_then(|value| value.to_str().ok());
91100

92101
// deal with cached data
93102
if let Ok(results) = get_results_from_cache(
@@ -151,7 +160,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
151160
Ok(response)
152161
}
153162

154-
async fn update_schema_when_distributed(tables: Vec<String>) -> Result<(), QueryError> {
163+
pub async fn update_schema_when_distributed(tables: Vec<String>) -> Result<(), QueryError> {
155164
if CONFIG.parseable.mode == Mode::Query {
156165
for table in tables {
157166
if let Ok(new_schema) = fetch_schema(&table).await {
@@ -167,9 +176,9 @@ async fn update_schema_when_distributed(tables: Vec<String>) -> Result<(), Query
167176
}
168177

169178
#[allow(clippy::too_many_arguments)]
170-
async fn put_results_in_cache(
171-
cache_results: Option<&HeaderValue>,
172-
user_id: Option<&HeaderValue>,
179+
pub async fn put_results_in_cache(
180+
cache_results: Option<&str>,
181+
user_id: Option<&str>,
173182
query_cache_manager: Option<&QueryCacheManager>,
174183
stream: &str,
175184
records: &[RecordBatch],
@@ -185,10 +194,7 @@ async fn put_results_in_cache(
185194
}
186195
// do cache
187196
(Some(_), Some(query_cache_manager)) => {
188-
let user_id = user_id
189-
.expect("User Id was provided")
190-
.to_str()
191-
.expect("is proper ASCII");
197+
let user_id = user_id.expect("User Id was provided");
192198

193199
if let Err(err) = query_cache_manager
194200
.create_parquet_cache(stream, records, user_id, start, end, query)
@@ -209,11 +215,11 @@ async fn put_results_in_cache(
209215
}
210216

211217
#[allow(clippy::too_many_arguments)]
212-
async fn get_results_from_cache(
213-
show_cached: Option<&HeaderValue>,
218+
pub async fn get_results_from_cache(
219+
show_cached: Option<&str>,
214220
query_cache_manager: Option<&QueryCacheManager>,
215221
stream: &str,
216-
user_id: Option<&HeaderValue>,
222+
user_id: Option<&str>,
217223
start_time: &str,
218224
end_time: &str,
219225
query: &str,
@@ -228,10 +234,8 @@ async fn get_results_from_cache(
228234
None
229235
}
230236
(Some(_), Some(query_cache_manager)) => {
231-
let user_id = user_id
232-
.ok_or_else(|| QueryError::Anyhow(anyhow!("User Id not provided")))?
233-
.to_str()
234-
.map_err(|err| anyhow!(err))?;
237+
let user_id =
238+
user_id.ok_or_else(|| QueryError::Anyhow(anyhow!("User Id not provided")))?;
235239

236240
let mut query_cache = query_cache_manager.get_cache(stream, user_id).await?;
237241

server/src/response.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,18 @@
1616
*
1717
*/
1818

19-
use crate::{handlers::http::query::QueryError, utils::arrow::record_batches_to_json};
19+
use crate::{
20+
handlers::http::query::QueryError,
21+
utils::arrow::{
22+
flight::{into_flight_data, DoGetStream},
23+
record_batches_to_json,
24+
},
25+
};
2026
use actix_web::{web, Responder};
2127
use datafusion::arrow::record_batch::RecordBatch;
2228
use itertools::Itertools;
2329
use serde_json::{json, Value};
30+
use tonic::{Response, Status};
2431

2532
pub struct QueryResponse {
2633
pub records: Vec<RecordBatch>,
@@ -57,4 +64,8 @@ impl QueryResponse {
5764

5865
Ok(web::Json(response))
5966
}
67+
68+
pub fn into_flight(self) -> Result<Response<DoGetStream>, Status> {
69+
into_flight_data(self.records)
70+
}
6071
}

server/src/storage/object_storage.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ use super::{
2626
};
2727

2828
use crate::handlers::http::modal::ingest_server::INGESTOR_META;
29-
use crate::metrics::{LIFETIME_EVENTS_STORAGE_SIZE, STORAGE_SIZE_TODAY};
3029
use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR};
30+
use crate::metrics::{LIFETIME_EVENTS_STORAGE_SIZE, STORAGE_SIZE_TODAY};
3131
use crate::option::Mode;
3232
use crate::{
3333
alerts::Alerts,

0 commit comments

Comments
 (0)