Skip to content

Commit 4731d40

Browse files
refactor, add comments
1 parent e60a183 commit 4731d40

File tree

3 files changed

+109
-44
lines changed

3 files changed

+109
-44
lines changed

src/handlers/airplane.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,10 +219,19 @@ impl FlightService for AirServiceImpl {
219219
})?;
220220
let time = Instant::now();
221221

222-
let (records, _) = execute(query, &stream_name)
222+
let (records, _) = execute(query, &stream_name, false)
223223
.await
224224
.map_err(|err| Status::internal(err.to_string()))?;
225225

226+
let records = match records {
227+
actix_web::Either::Left(rbs) => rbs,
228+
actix_web::Either::Right(_) => {
229+
return Err(Status::failed_precondition(
230+
"Expected batch results, got stream",
231+
))
232+
}
233+
};
234+
226235
/*
227236
* INFO: No returning the schema with the data.
228237
* kept it in case it needs to be sent in the future.

src/handlers/http/query.rs

Lines changed: 71 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use crate::event::error::EventError;
2020
use crate::handlers::http::fetch_schema;
2121
use actix_web::http::header::ContentType;
2222
use actix_web::web::{self, Json};
23-
use actix_web::{FromRequest, HttpRequest, HttpResponse, Responder};
23+
use actix_web::{Either, FromRequest, HttpRequest, HttpResponse, Responder};
2424
use bytes::Bytes;
2525
use chrono::{DateTime, Utc};
2626
use datafusion::common::tree_node::TreeNode;
@@ -43,7 +43,7 @@ use crate::metrics::QUERY_EXECUTE_TIME;
4343
use crate::option::Mode;
4444
use crate::parseable::{StreamNotFound, PARSEABLE};
4545
use crate::query::error::ExecuteError;
46-
use crate::query::{execute, execute_stream, CountsRequest, CountsResponse, Query as LogicalQuery};
46+
use crate::query::{execute, CountsRequest, CountsResponse, Query as LogicalQuery};
4747
use crate::query::{TableScanVisitor, QUERY_SESSION};
4848
use crate::rbac::Users;
4949
use crate::response::QueryResponse;
@@ -104,17 +104,38 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
104104

105105
let time = Instant::now();
106106

107+
// if the query is `select count(*) from <dataset>`
108+
// we use the `get_bin_density` method to get the count of records in the dataset
109+
// instead of executing the query using datafusion
107110
if let Some(column_name) = query.is_logical_plan_count_without_filters() {
108111
return handle_count_query(&query_request, &table_name, column_name, time).await;
109112
}
110113

114+
// if the query request has streaming = false (default)
115+
// we use datafusion's `execute` method to get the records
111116
if !query_request.streaming {
112117
return handle_non_streaming_query(query, &table_name, &query_request, time).await;
113118
}
114119

120+
// if the query request has streaming = true
121+
// we use datafusion's `execute_stream` method to get the records
115122
handle_streaming_query(query, &table_name, &query_request, time).await
116123
}
117124

125+
/// Handles count queries (e.g., `SELECT COUNT(*) FROM <dataset-name>`)
126+
///
127+
/// Instead of executing the query through DataFusion, this function uses the
128+
/// `CountsRequest::get_bin_density` method to quickly retrieve the count of records
129+
/// in the specified dataset and time range.
130+
///
131+
/// # Arguments
132+
/// - `query_request`: The original query request from the client.
133+
/// - `table_name`: The name of the table/dataset to count records in.
134+
/// - `column_name`: The column being counted (usually `*`).
135+
/// - `time`: The timer for measuring query execution time.
136+
///
137+
/// # Returns
138+
/// - `HttpResponse` with the count result as JSON, including fields if requested.
118139
async fn handle_count_query(
119140
query_request: &Query,
120141
table_name: &str,
@@ -150,13 +171,35 @@ async fn handle_count_query(
150171
.json(response))
151172
}
152173

174+
/// Handles standard (non-streaming) queries, returning all results in a single JSON response.
175+
///
176+
/// Executes the logical query using DataFusion's batch execution, collects all results,
177+
/// and serializes them into a single JSON object. The response includes the records,
178+
/// field names, and other metadata as specified in the query request.
179+
///
180+
/// # Arguments
181+
/// - `query`: The logical query to execute.
182+
/// - `table_name`: The name of the table/dataset being queried.
183+
/// - `query_request`: The original query request from the client.
184+
/// - `time`: The timer for measuring query execution time.
185+
///
186+
/// # Returns
187+
/// - `HttpResponse` with the full query result as a JSON object.
153188
async fn handle_non_streaming_query(
154189
query: LogicalQuery,
155190
table_name: &str,
156191
query_request: &Query,
157192
time: Instant,
158193
) -> Result<HttpResponse, QueryError> {
159-
let (records, fields) = execute(query, table_name).await?;
194+
let (records, fields) = execute(query, table_name, query_request.streaming).await?;
195+
let records = match records {
196+
Either::Left(rbs) => rbs,
197+
Either::Right(_) => {
198+
return Err(QueryError::MalformedQuery(
199+
"Expected batch results, got stream",
200+
))
201+
}
202+
};
160203
let total_time = format!("{:?}", time.elapsed());
161204
let time = time.elapsed().as_secs_f64();
162205

@@ -175,13 +218,36 @@ async fn handle_non_streaming_query(
175218
.json(response))
176219
}
177220

221+
/// Handles streaming queries, returning results as newline-delimited JSON (NDJSON).
222+
///
223+
/// Executes the logical query using DataFusion's streaming execution. If the `fields`
224+
/// flag is set, the first chunk of the response contains the field names as a JSON object.
225+
/// Each subsequent chunk contains a record batch as a JSON object, separated by newlines.
226+
/// This allows clients to start processing results before the entire query completes.
227+
///
228+
/// # Arguments
229+
/// - `query`: The logical query to execute.
230+
/// - `table_name`: The name of the table/dataset being queried.
231+
/// - `query_request`: The original query request from the client.
232+
/// - `time`: The timer for measuring query execution time.
233+
///
234+
/// # Returns
235+
/// - `HttpResponse` streaming the query results as NDJSON, optionally prefixed with the fields array.
178236
async fn handle_streaming_query(
179237
query: LogicalQuery,
180238
table_name: &str,
181239
query_request: &Query,
182240
time: Instant,
183241
) -> Result<HttpResponse, QueryError> {
184-
let (records_stream, fields) = execute_stream(query, table_name).await?;
242+
let (records_stream, fields) = execute(query, table_name, query_request.streaming).await?;
243+
let records_stream = match records_stream {
244+
Either::Left(_) => {
245+
return Err(QueryError::MalformedQuery(
246+
"Expected stream results, got batch",
247+
))
248+
}
249+
Either::Right(stream) => stream,
250+
};
185251
let fields = fields.clone();
186252
let total_time = format!("{:?}", time.elapsed());
187253
let time = time.elapsed().as_secs_f64();
@@ -193,7 +259,7 @@ async fn handle_streaming_query(
193259
let with_fields = query_request.fields;
194260

195261
let stream = if with_fields {
196-
// send the fields as an initial chunk
262+
// send the fields json as an initial chunk
197263
let fields_json = serde_json::json!({
198264
"fields": fields
199265
})

src/query/mod.rs

Lines changed: 28 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@ mod filter_optimizer;
2020
mod listing_table_builder;
2121
pub mod stream_schema_provider;
2222

23+
use actix_web::Either;
2324
use chrono::NaiveDateTime;
2425
use chrono::{DateTime, Duration, Utc};
2526
use datafusion::arrow::record_batch::RecordBatch;
26-
2727
use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor};
2828
use datafusion::error::DataFusionError;
2929
use datafusion::execution::disk_manager::DiskManagerConfig;
@@ -70,21 +70,17 @@ pub static QUERY_RUNTIME: Lazy<Runtime> =
7070
pub async fn execute(
7171
query: Query,
7272
stream_name: &str,
73-
) -> Result<(Vec<RecordBatch>, Vec<String>), ExecuteError> {
73+
is_streaming: bool,
74+
) -> Result<
75+
(
76+
Either<Vec<RecordBatch>, SendableRecordBatchStream>,
77+
Vec<String>,
78+
),
79+
ExecuteError,
80+
> {
7481
let time_partition = PARSEABLE.get_stream(stream_name)?.get_time_partition();
7582
QUERY_RUNTIME
76-
.spawn(async move { query.execute(time_partition.as_ref()).await })
77-
.await
78-
.expect("The Join should have been successful")
79-
}
80-
81-
pub async fn execute_stream(
82-
query: Query,
83-
stream_name: &str,
84-
) -> Result<(SendableRecordBatchStream, Vec<String>), ExecuteError> {
85-
let time_partition = PARSEABLE.get_stream(stream_name)?.get_time_partition();
86-
QUERY_RUNTIME
87-
.spawn(async move { query.execute_stream(time_partition.as_ref()).await })
83+
.spawn(async move { query.execute(time_partition.as_ref(), is_streaming).await })
8884
.await
8985
.expect("The Join should have been successful")
9086
}
@@ -168,10 +164,20 @@ impl Query {
168164
SessionContext::new_with_state(state)
169165
}
170166

167+
/// this function returns the result of the query
168+
/// if streaming is true, it returns a stream
169+
/// if streaming is false, it returns a vector of record batches
171170
pub async fn execute(
172171
&self,
173172
time_partition: Option<&String>,
174-
) -> Result<(Vec<RecordBatch>, Vec<String>), ExecuteError> {
173+
is_streaming: bool,
174+
) -> Result<
175+
(
176+
Either<Vec<RecordBatch>, SendableRecordBatchStream>,
177+
Vec<String>,
178+
),
179+
ExecuteError,
180+
> {
175181
let df = QUERY_SESSION
176182
.execute_logical_plan(self.final_logical_plan(time_partition))
177183
.await?;
@@ -184,35 +190,19 @@ impl Query {
184190
.cloned()
185191
.collect_vec();
186192

187-
if fields.is_empty() {
188-
return Ok((vec![], fields));
193+
if fields.is_empty() && !is_streaming {
194+
return Ok((Either::Left(vec![]), fields));
189195
}
190196

191-
let results = df.collect().await?;
197+
let results = if !is_streaming {
198+
Either::Left(df.collect().await?)
199+
} else {
200+
Either::Right(df.execute_stream().await?)
201+
};
192202

193203
Ok((results, fields))
194204
}
195205

196-
// execute stream
197-
pub async fn execute_stream(
198-
&self,
199-
time_partition: Option<&String>,
200-
) -> Result<(SendableRecordBatchStream, Vec<String>), ExecuteError> {
201-
let df = QUERY_SESSION
202-
.execute_logical_plan(self.final_logical_plan(time_partition))
203-
.await?;
204-
let fields = df
205-
.schema()
206-
.fields()
207-
.iter()
208-
.map(|f| f.name())
209-
.cloned()
210-
.collect_vec();
211-
let stream = df.execute_stream().await?;
212-
213-
Ok((stream, fields))
214-
}
215-
216206
pub async fn get_dataframe(
217207
&self,
218208
time_partition: Option<&String>,

0 commit comments

Comments
 (0)