Skip to content

Commit 7cc7bcd

Browse files
committed
impl hot tier v1
1 parent b77658a commit 7cc7bcd

File tree

3 files changed

+68
-2
lines changed

3 files changed

+68
-2
lines changed

server/src/handlers/airplane.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ use crate::handlers::http::query::{
4646
use crate::query::{TableScanVisitor, QUERY_SESSION};
4747
use crate::querycache::QueryCacheManager;
4848
use crate::utils::arrow::flight::{
49-
append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc,
49+
append_temporary_events, get_from_ingester_cache, get_query_from_ticket, into_flight_data, run_do_get_rpc,
50+
5051
send_to_ingester,
5152
};
5253
use arrow_flight::{
@@ -204,6 +205,13 @@ impl FlightService for AirServiceImpl {
204205
.map_err(|_| Status::internal("Failed to parse query"))?;
205206

206207

208+
// deal with ingester local cache
209+
if let Some(early) =
210+
get_from_ingester_cache(&query.start, &query.end, &stream_name, ticket.clone()).await
211+
{
212+
return into_flight_data(early);
213+
}
214+
207215
let event =
208216
if send_to_ingester(query.start.timestamp_millis(), query.end.timestamp_millis()) {
209217
let sql = format!("select * from {}", &stream_name);

server/src/handlers/http/query.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ use crate::storage::ObjectStorageError;
5252
use crate::utils::actix::extract_session_key_from_req;
5353

5454
/// Query Request through http endpoint.
55-
#[derive(Debug, serde::Deserialize, serde::Serialize)]
55+
#[derive(Debug, serde::Deserialize, serde::Serialize, Clone)]
5656
#[serde(rename_all = "camelCase")]
5757
pub struct Query {
5858
pub query: String,

server/src/utils/arrow/flight.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
use crate::event::Event;
2020
use crate::handlers::http::ingest::push_logs_unchecked;
2121
use crate::handlers::http::query::Query as QueryJson;
22+
use crate::localcache::LocalCacheManager;
2223
use crate::metadata::STREAM_INFO;
2324
use crate::query::stream_schema_provider::include_now;
2425
use crate::{
@@ -31,10 +32,12 @@ use arrow_flight::encode::FlightDataEncoderBuilder;
3132
use arrow_flight::{FlightData, Ticket};
3233
use arrow_ipc::writer::IpcWriteOptions;
3334
use arrow_select::concat::concat_batches;
35+
use chrono::{DateTime, Utc};
3436
use datafusion::logical_expr::BinaryExpr;
3537
use datafusion::prelude::Expr;
3638
use datafusion::scalar::ScalarValue;
3739
use futures::{stream, TryStreamExt};
40+
use serde_json::json;
3841

3942
use tonic::{Request, Response, Status};
4043

@@ -158,3 +161,58 @@ pub fn into_flight_data(records: Vec<RecordBatch>) -> Result<Response<DoGetStrea
158161

159162
Ok(Response::new(Box::pin(flight_data_stream) as DoGetStream))
160163
}
164+
165+
pub async fn get_from_ingester_cache(
166+
start: &DateTime<Utc>,
167+
end: &DateTime<Utc>,
168+
stream_name: &str,
169+
ticket: QueryJson,
170+
) -> Option<Vec<RecordBatch>> {
171+
LocalCacheManager::global()?;
172+
173+
let time_delta = *end - *start;
174+
let goto_ingester = time_delta.num_days() < CONFIG.parseable.local_cache_time_range;
175+
let goto_ingester = goto_ingester
176+
&& STREAM_INFO
177+
.read()
178+
.expect("lock should not be poisoned")
179+
.get(stream_name)?
180+
.cache_enabled;
181+
182+
if CONFIG.parseable.mode == Mode::Query && goto_ingester {
183+
// send the grpc call to then ingesters, if fails continue with normal flow
184+
let start_time = ticket.start_time;
185+
let end_time = ticket.end_time;
186+
let sql = ticket.query;
187+
let out_ticket = json!({
188+
"query": sql,
189+
"startTime": start_time,
190+
"endTime": end_time
191+
})
192+
.to_string();
193+
194+
// todo: cleanup the namespace
195+
let ingester_metadatas = crate::handlers::http::cluster::get_ingestor_info()
196+
.await
197+
.ok()?;
198+
let mut result_from_ingester: Vec<RecordBatch> = vec![];
199+
200+
let mut error = false;
201+
for im in ingester_metadatas {
202+
if let Ok(mut batches) = run_do_get_rpc(im, out_ticket.clone()).await {
203+
result_from_ingester.append(&mut batches);
204+
} else {
205+
error = true;
206+
break;
207+
}
208+
}
209+
210+
if error {
211+
None
212+
} else {
213+
Some(result_from_ingester)
214+
}
215+
} else {
216+
None
217+
}
218+
}

0 commit comments

Comments
 (0)