From 52980c44620b9577b513f3598e282eb34f3aea41 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Fri, 16 May 2025 05:08:00 -0400 Subject: [PATCH 1/8] streaming response --- Cargo.lock | 1 + Cargo.toml | 1 + src/handlers/http/query.rs | 51 +++++++++++++++++++++++--------------- src/query/mod.rs | 33 +++++++++++++++++++++++- 4 files changed, 65 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 279dbf16a..bee3513fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3478,6 +3478,7 @@ dependencies = [ "derive_more 1.0.0", "fs_extra", "futures", + "futures-core", "futures-util", "hex", "hostname", diff --git a/Cargo.toml b/Cargo.toml index a8763aa91..b73c83470 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -121,6 +121,7 @@ static-files = "0.2" thiserror = "2.0" ulid = { version = "1.0", features = ["serde"] } xxhash-rust = { version = "0.8", features = ["xxh3"] } +futures-core = "0.3.31" [build-dependencies] cargo_toml = "0.21" diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 368fe8b9c..346d03343 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -19,10 +19,12 @@ use actix_web::http::header::ContentType; use actix_web::web::{self, Json}; use actix_web::{FromRequest, HttpRequest, HttpResponse, Responder}; +use bytes::Bytes; use chrono::{DateTime, Utc}; use datafusion::common::tree_node::TreeNode; use datafusion::error::DataFusionError; use datafusion::execution::context::SessionState; +use futures::StreamExt; use futures_util::Future; use http::StatusCode; use serde::{Deserialize, Serialize}; @@ -41,15 +43,16 @@ use crate::metrics::QUERY_EXECUTE_TIME; use crate::option::Mode; use crate::parseable::{StreamNotFound, PARSEABLE}; use crate::query::error::ExecuteError; -use crate::query::{execute, CountsRequest, CountsResponse, Query as LogicalQuery}; +use crate::query::{execute_stream, CountsRequest, CountsResponse, Query as LogicalQuery}; use crate::query::{TableScanVisitor, QUERY_SESSION}; use crate::rbac::Users; -use crate::response::{QueryResponse, TIME_ELAPSED_HEADER}; +use crate::response::TIME_ELAPSED_HEADER; use crate::storage::ObjectStorageError; use crate::utils::actix::extract_session_key_from_req; +use crate::utils::arrow::record_batches_to_json; use crate::utils::time::{TimeParseError, TimeRange}; use crate::utils::user_auth_for_datasets; - +use futures_core::Stream as CoreStream; /// Query Request through http endpoint. #[derive(Debug, Deserialize, Serialize, Clone)] #[serde(rename_all = "camelCase")] @@ -132,25 +135,33 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result { + // convert record batch to JSON + let json = record_batches_to_json(&[batch]) + .map_err(actix_web::error::ErrorInternalServerError)?; + // // Serialize to JSON string + // let json = serde_json::to_value(&json) + // .map_err(actix_web::error::ErrorInternalServerError)?; + let response = json!({ + "fields": fields, + "records": json, + }); + Ok(Bytes::from(response.to_string())) + } + Err(e) => Err(actix_web::error::ErrorInternalServerError(e)), + } + }); - let (records, fields) = execute(query, &table_name).await?; - let total_time = format!("{:?}", time.elapsed()); - let response = QueryResponse { - records, - fields, - fill_null: query_request.send_null, - with_fields: query_request.fields, - total_time, - } - .to_http()?; - - let time = time.elapsed().as_secs_f64(); - - QUERY_EXECUTE_TIME - .with_label_values(&[&table_name]) - .observe(time); + let boxed_stream = + Box::pin(stream) as Pin> + Send>>; - Ok(response) + Ok(HttpResponse::Ok() + .content_type("application/json") + .streaming(boxed_stream)) } pub async fn get_counts( diff --git a/src/query/mod.rs b/src/query/mod.rs index 0608a9459..e24aab2fc 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -27,7 +27,7 @@ use datafusion::arrow::record_batch::RecordBatch; use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion::error::DataFusionError; use datafusion::execution::disk_manager::DiskManagerConfig; -use datafusion::execution::SessionStateBuilder; +use datafusion::execution::{SendableRecordBatchStream, SessionStateBuilder}; use datafusion::logical_expr::expr::Alias; use datafusion::logical_expr::{ Aggregate, Explain, Filter, LogicalPlan, PlanType, Projection, ToStringifiedPlan, @@ -78,6 +78,17 @@ pub async fn execute( .expect("The Join should have been successful") } +pub async fn execute_stream( + query: Query, + stream_name: &str, +) -> Result<(SendableRecordBatchStream, Vec), ExecuteError> { + let time_partition = PARSEABLE.get_stream(stream_name)?.get_time_partition(); + QUERY_RUNTIME + .spawn(async move { query.execute_stream(time_partition.as_ref()).await }) + .await + .expect("The Join should have been successful") +} + // A query request by client #[derive(Debug)] pub struct Query { @@ -182,6 +193,26 @@ impl Query { Ok((results, fields)) } + // execute stream + pub async fn execute_stream( + &self, + time_partition: Option<&String>, + ) -> Result<(SendableRecordBatchStream, Vec), ExecuteError> { + let df = QUERY_SESSION + .execute_logical_plan(self.final_logical_plan(time_partition)) + .await?; + let fields = df + .schema() + .fields() + .iter() + .map(|f| f.name()) + .cloned() + .collect_vec(); + let stream = df.execute_stream().await?; + + Ok((stream, fields)) + } + pub async fn get_dataframe( &self, time_partition: Option<&String>, From e8c8e818eacc2da89a28017218932ecc1e9746e2 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Fri, 16 May 2025 05:22:40 -0400 Subject: [PATCH 2/8] use fields --- src/handlers/http/query.rs | 21 +++++++++++---------- src/response.rs | 8 ++------ 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 346d03343..2f5c8383e 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -46,10 +46,9 @@ use crate::query::error::ExecuteError; use crate::query::{execute_stream, CountsRequest, CountsResponse, Query as LogicalQuery}; use crate::query::{TableScanVisitor, QUERY_SESSION}; use crate::rbac::Users; -use crate::response::TIME_ELAPSED_HEADER; +use crate::response::{QueryResponse, TIME_ELAPSED_HEADER}; use crate::storage::ObjectStorageError; use crate::utils::actix::extract_session_key_from_req; -use crate::utils::arrow::record_batches_to_json; use crate::utils::time::{TimeParseError, TimeRange}; use crate::utils::user_auth_for_datasets; use futures_core::Stream as CoreStream; @@ -141,14 +140,16 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result { // convert record batch to JSON - let json = record_batches_to_json(&[batch]) - .map_err(actix_web::error::ErrorInternalServerError)?; - // // Serialize to JSON string - // let json = serde_json::to_value(&json) - // .map_err(actix_web::error::ErrorInternalServerError)?; - let response = json!({ - "fields": fields, - "records": json, + let response = QueryResponse { + records: vec![batch], + fields: fields.clone(), + fill_null: query_request.send_null, + with_fields: query_request.fields, + } + .to_http() + .unwrap_or_else(|e| { + error!("Failed to parse record batch into JSON: {}", e); + json!({}) }); Ok(Bytes::from(response.to_string())) } diff --git a/src/response.rs b/src/response.rs index 7aca5c44f..a5dc47ee4 100644 --- a/src/response.rs +++ b/src/response.rs @@ -17,7 +17,6 @@ */ use crate::{handlers::http::query::QueryError, utils::arrow::record_batches_to_json}; -use actix_web::HttpResponse; use datafusion::arrow::record_batch::RecordBatch; use itertools::Itertools; use serde_json::{json, Value}; @@ -30,11 +29,10 @@ pub struct QueryResponse { pub fields: Vec, pub fill_null: bool, pub with_fields: bool, - pub total_time: String, } impl QueryResponse { - pub fn to_http(&self) -> Result { + pub fn to_http(&self) -> Result { info!("{}", "Returning query results"); let mut json_records = record_batches_to_json(&self.records)?; @@ -58,8 +56,6 @@ impl QueryResponse { Value::Array(values) }; - Ok(HttpResponse::Ok() - .insert_header((TIME_ELAPSED_HEADER, self.total_time.as_str())) - .json(response)) + Ok(response) } } From 6dacb946b63ba6a98445dad2c840f2aa4807c8cb Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Fri, 16 May 2025 06:31:06 -0400 Subject: [PATCH 3/8] separate streams by new line --- src/handlers/http/query.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 2f5c8383e..f792969cf 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -151,7 +151,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Err(actix_web::error::ErrorInternalServerError(e)), } From 2f00ad5ed3a4ac65f18155d91a9bb0fcf58eb40d Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Fri, 16 May 2025 07:44:52 -0400 Subject: [PATCH 4/8] query param for streaming, default false --- src/handlers/http/query.rs | 160 +++++++++++++++++++++++++------------ 1 file changed, 108 insertions(+), 52 deletions(-) diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index f792969cf..61698c8c3 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -28,7 +28,7 @@ use futures::StreamExt; use futures_util::Future; use http::StatusCode; use serde::{Deserialize, Serialize}; -use serde_json::{json, Value}; +use serde_json::json; use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; @@ -43,7 +43,7 @@ use crate::metrics::QUERY_EXECUTE_TIME; use crate::option::Mode; use crate::parseable::{StreamNotFound, PARSEABLE}; use crate::query::error::ExecuteError; -use crate::query::{execute_stream, CountsRequest, CountsResponse, Query as LogicalQuery}; +use crate::query::{execute, execute_stream, CountsRequest, CountsResponse, Query as LogicalQuery}; use crate::query::{TableScanVisitor, QUERY_SESSION}; use crate::rbac::Users; use crate::response::{QueryResponse, TIME_ELAPSED_HEADER}; @@ -51,7 +51,6 @@ use crate::storage::ObjectStorageError; use crate::utils::actix::extract_session_key_from_req; use crate::utils::time::{TimeParseError, TimeRange}; use crate::utils::user_auth_for_datasets; -use futures_core::Stream as CoreStream; /// Query Request through http endpoint. #[derive(Debug, Deserialize, Serialize, Clone)] #[serde(rename_all = "camelCase")] @@ -64,6 +63,8 @@ pub struct Query { #[serde(skip)] pub fields: bool, #[serde(skip)] + pub streaming: bool, + #[serde(skip)] pub filter_tags: Option>, } @@ -75,7 +76,6 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result raw_logical_plan, Err(_) => { - //if logical plan creation fails, create streams and try again create_streams_for_querier().await; session_state .create_logical_plan(&query_request.query) @@ -85,10 +85,8 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Result Result { + let counts_req = CountsRequest { + stream: table_name.to_string(), + start_time: query_request.start_time.clone(), + end_time: query_request.end_time.clone(), + num_bins: 1, + }; + let count_records = counts_req.get_bin_density().await?; + let count = count_records[0].count; + let response = if query_request.fields { + json!({ + "fields": [column_name], + "records": [json!({column_name: count})] + }) + } else { + serde_json::Value::Array(vec![json!({column_name: count})]) + }; - return Ok(HttpResponse::Ok() - .insert_header((TIME_ELAPSED_HEADER, total_time.as_str())) - .json(response)); + let total_time = format!("{:?}", time.elapsed()); + let time = time.elapsed().as_secs_f64(); + + QUERY_EXECUTE_TIME + .with_label_values(&[table_name]) + .observe(time); + + Ok(HttpResponse::Ok() + .insert_header((TIME_ELAPSED_HEADER, total_time.as_str())) + .json(response)) +} + +async fn handle_non_streaming_query( + query: LogicalQuery, + table_name: &str, + query_request: &Query, + time: Instant, +) -> Result { + let (records, fields) = execute(query, table_name).await?; + let total_time = format!("{:?}", time.elapsed()); + let time = time.elapsed().as_secs_f64(); + + QUERY_EXECUTE_TIME + .with_label_values(&[table_name]) + .observe(time); + let response = QueryResponse { + records, + fields, + fill_null: query_request.send_null, + with_fields: query_request.fields, } - let (records_stream, fields) = execute_stream(query, &table_name).await?; + .to_http()?; + Ok(HttpResponse::Ok() + .insert_header((TIME_ELAPSED_HEADER, total_time.as_str())) + .json(response)) +} + +async fn handle_streaming_query( + query: LogicalQuery, + table_name: &str, + query_request: &Query, + time: Instant, +) -> Result { + let (records_stream, fields) = execute_stream(query, table_name).await?; let fields = fields.clone(); - let stream = records_stream.map(move |batch_result| { - match batch_result { - Ok(batch) => { - // convert record batch to JSON - let response = QueryResponse { - records: vec![batch], - fields: fields.clone(), - fill_null: query_request.send_null, - with_fields: query_request.fields, - } - .to_http() - .unwrap_or_else(|e| { - error!("Failed to parse record batch into JSON: {}", e); - json!({}) - }); - Ok(Bytes::from(format!("{}\n", response.to_string()))) + let total_time = format!("{:?}", time.elapsed()); + let time = time.elapsed().as_secs_f64(); + QUERY_EXECUTE_TIME + .with_label_values(&[table_name]) + .observe(time); + + let send_null = query_request.send_null; + let with_fields = query_request.fields; + + let stream = records_stream.map(move |batch_result| match batch_result { + Ok(batch) => { + let response = QueryResponse { + records: vec![batch], + fields: fields.clone(), + fill_null: send_null, + with_fields, } - Err(e) => Err(actix_web::error::ErrorInternalServerError(e)), + .to_http() + .unwrap_or_else(|e| { + error!("Failed to parse record batch into JSON: {}", e); + json!({}) + }); + Ok(Bytes::from(format!("{}\n", response))) } + Err(e) => Err(actix_web::error::ErrorInternalServerError(e)), }); - let boxed_stream = - Box::pin(stream) as Pin> + Send>>; + let boxed_stream = Box::pin(stream); Ok(HttpResponse::Ok() .content_type("application/json") + .insert_header((TIME_ELAPSED_HEADER, total_time.as_str())) .streaming(boxed_stream)) } @@ -234,6 +285,10 @@ impl FromRequest for Query { query.send_null = params.get("sendNull").cloned().unwrap_or(false); } + if !query.streaming { + query.streaming = params.get("streaming").cloned().unwrap_or(false); + } + Ok(query) }; @@ -297,6 +352,7 @@ fn transform_query_for_ingestor(query: &Query) -> Option { send_null: query.send_null, start_time: start_time.to_rfc3339(), end_time: end_time.to_rfc3339(), + streaming: query.streaming, }; Some(q) From 0d0dae0c9d2c583b9428fa2db09896dfc6fd4e66 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Fri, 16 May 2025 08:51:56 -0400 Subject: [PATCH 5/8] send fields as first chunk for streaming response --- src/handlers/http/query.rs | 87 +++++++++++++++++++++++++++----------- src/response.rs | 4 +- 2 files changed, 63 insertions(+), 28 deletions(-) diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 61698c8c3..2bdc5476a 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -16,6 +16,8 @@ * */ +use crate::event::error::EventError; +use crate::handlers::http::fetch_schema; use actix_web::http::header::ContentType; use actix_web::web::{self, Json}; use actix_web::{FromRequest, HttpRequest, HttpResponse, Responder}; @@ -24,7 +26,8 @@ use chrono::{DateTime, Utc}; use datafusion::common::tree_node::TreeNode; use datafusion::error::DataFusionError; use datafusion::execution::context::SessionState; -use futures::StreamExt; +use futures::stream::once; +use futures::{future, Stream, StreamExt}; use futures_util::Future; use http::StatusCode; use serde::{Deserialize, Serialize}; @@ -35,9 +38,6 @@ use std::sync::Arc; use std::time::Instant; use tracing::error; -use crate::event::error::EventError; -use crate::handlers::http::fetch_schema; - use crate::event::commit_schema; use crate::metrics::QUERY_EXECUTE_TIME; use crate::option::Mode; @@ -46,11 +46,13 @@ use crate::query::error::ExecuteError; use crate::query::{execute, execute_stream, CountsRequest, CountsResponse, Query as LogicalQuery}; use crate::query::{TableScanVisitor, QUERY_SESSION}; use crate::rbac::Users; -use crate::response::{QueryResponse, TIME_ELAPSED_HEADER}; +use crate::response::QueryResponse; use crate::storage::ObjectStorageError; use crate::utils::actix::extract_session_key_from_req; use crate::utils::time::{TimeParseError, TimeRange}; use crate::utils::user_auth_for_datasets; + +const TIME_ELAPSED_HEADER: &str = "p-time-elapsed"; /// Query Request through http endpoint. #[derive(Debug, Deserialize, Serialize, Clone)] #[serde(rename_all = "camelCase")] @@ -167,7 +169,7 @@ async fn handle_non_streaming_query( fill_null: query_request.send_null, with_fields: query_request.fields, } - .to_http()?; + .to_json()?; Ok(HttpResponse::Ok() .insert_header((TIME_ELAPSED_HEADER, total_time.as_str())) .json(response)) @@ -190,30 +192,65 @@ async fn handle_streaming_query( let send_null = query_request.send_null; let with_fields = query_request.fields; - let stream = records_stream.map(move |batch_result| match batch_result { - Ok(batch) => { - let response = QueryResponse { - records: vec![batch], - fields: fields.clone(), - fill_null: send_null, - with_fields, + let stream = if with_fields { + // send the fields as an initial chunk + let fields_json = serde_json::json!({ + "fields": fields + }) + .to_string(); + + // stream the records without fields + let records_stream = records_stream.map(move |batch_result| match batch_result { + Ok(batch) => { + let response = QueryResponse { + records: vec![batch], + fields: Vec::new(), + fill_null: send_null, + with_fields: false, + } + .to_json() + .unwrap_or_else(|e| { + error!("Failed to parse record batch into JSON: {}", e); + json!({}) + }); + Ok(Bytes::from(format!("{}\n", response))) } - .to_http() - .unwrap_or_else(|e| { - error!("Failed to parse record batch into JSON: {}", e); - json!({}) - }); - Ok(Bytes::from(format!("{}\n", response))) - } - Err(e) => Err(actix_web::error::ErrorInternalServerError(e)), - }); + Err(e) => Err(actix_web::error::ErrorInternalServerError(e)), + }); + + // Combine the initial fields chunk with the records stream + let fields_chunk = once(future::ok::<_, actix_web::Error>(Bytes::from(format!( + "{}\n", + fields_json + )))); + Box::pin(fields_chunk.chain(records_stream)) + as Pin>>> + } else { + let stream = records_stream.map(move |batch_result| match batch_result { + Ok(batch) => { + let response = QueryResponse { + records: vec![batch], + fields: fields.clone(), + fill_null: send_null, + with_fields, + } + .to_json() + .unwrap_or_else(|e| { + error!("Failed to parse record batch into JSON: {}", e); + json!({}) + }); + Ok(Bytes::from(format!("{}\n", response))) + } + Err(e) => Err(actix_web::error::ErrorInternalServerError(e)), + }); - let boxed_stream = Box::pin(stream); + Box::pin(stream) as Pin>>> + }; Ok(HttpResponse::Ok() - .content_type("application/json") + .content_type("application/x-ndjson") .insert_header((TIME_ELAPSED_HEADER, total_time.as_str())) - .streaming(boxed_stream)) + .streaming(stream)) } pub async fn get_counts( diff --git a/src/response.rs b/src/response.rs index a5dc47ee4..bd02ababe 100644 --- a/src/response.rs +++ b/src/response.rs @@ -22,8 +22,6 @@ use itertools::Itertools; use serde_json::{json, Value}; use tracing::info; -pub const TIME_ELAPSED_HEADER: &str = "p-time-elapsed"; - pub struct QueryResponse { pub records: Vec, pub fields: Vec, @@ -32,7 +30,7 @@ pub struct QueryResponse { } impl QueryResponse { - pub fn to_http(&self) -> Result { + pub fn to_json(&self) -> Result { info!("{}", "Returning query results"); let mut json_records = record_batches_to_json(&self.records)?; From e60a183db596415226a0d44c48ad53968fc4df11 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Fri, 16 May 2025 09:10:04 -0400 Subject: [PATCH 6/8] clippy suggestions --- src/catalog/mod.rs | 7 ++----- src/handlers/airplane.rs | 8 +++++--- src/handlers/livetail.rs | 18 ++++++++++-------- src/utils/arrow/flight.rs | 6 +++--- 4 files changed, 20 insertions(+), 19 deletions(-) diff --git a/src/catalog/mod.rs b/src/catalog/mod.rs index 8db1a49b4..f8113545a 100644 --- a/src/catalog/mod.rs +++ b/src/catalog/mod.rs @@ -16,7 +16,7 @@ * */ -use std::{io::ErrorKind, sync::Arc}; +use std::sync::Arc; use chrono::{DateTime, Local, NaiveTime, Utc}; use column::Column; @@ -259,10 +259,7 @@ async fn create_manifest( .date_naive() .and_time( NaiveTime::from_num_seconds_from_midnight_opt(23 * 3600 + 59 * 60 + 59, 999_999_999) - .ok_or(IOError::new( - ErrorKind::Other, - "Failed to create upper bound for manifest", - ))?, + .ok_or(IOError::other("Failed to create upper bound for manifest"))?, ) .and_utc(); diff --git a/src/handlers/airplane.rs b/src/handlers/airplane.rs index 8a5551c2f..7e0c83bd0 100644 --- a/src/handlers/airplane.rs +++ b/src/handlers/airplane.rs @@ -126,9 +126,11 @@ impl FlightService for AirServiceImpl { } async fn do_get(&self, req: Request) -> Result, Status> { - let key = extract_session_key(req.metadata())?; + let key = extract_session_key(req.metadata()) + .map_err(|e| Status::unauthenticated(e.to_string()))?; - let ticket = get_query_from_ticket(&req)?; + let ticket = + get_query_from_ticket(&req).map_err(|e| Status::invalid_argument(e.to_string()))?; info!("query requested to airplane: {:?}", ticket); @@ -246,7 +248,7 @@ impl FlightService for AirServiceImpl { .observe(time); // Airplane takes off 🛫 - out + out.map_err(|e| *e) } async fn do_put( diff --git a/src/handlers/livetail.rs b/src/handlers/livetail.rs index bfaa7205e..57630c93c 100644 --- a/src/handlers/livetail.rs +++ b/src/handlers/livetail.rs @@ -98,10 +98,10 @@ impl FlightService for FlightServiceImpl { } async fn do_get(&self, req: Request) -> Result, Status> { - let key = extract_session_key(req.metadata())?; + let key = extract_session_key(req.metadata()).map_err(|e| *e)?; let ticket: serde_json::Value = serde_json::from_slice(&req.into_inner().ticket) .map_err(|err| Status::internal(err.to_string()))?; - let stream = extract_stream(&ticket)?; + let stream = extract_stream(&ticket).map_err(|e| *e)?; info!("livetail requested for stream {}", stream); match Users.authorize(key, rbac::role::Action::Query, Some(stream), None) { rbac::Response::Authorized => (), @@ -232,16 +232,16 @@ pub fn server() -> impl Future Result<&str, Status> { +pub fn extract_stream(body: &serde_json::Value) -> Result<&str, Box> { body.as_object() - .ok_or(Status::invalid_argument("expected object in request body"))? + .ok_or_else(|| Box::new(Status::invalid_argument("expected object in request body")))? .get("stream") - .ok_or(Status::invalid_argument("stream key value is not provided"))? + .ok_or_else(|| Box::new(Status::invalid_argument("stream key value is not provided")))? .as_str() - .ok_or(Status::invalid_argument("stream key value is invalid")) + .ok_or_else(|| Box::new(Status::invalid_argument("stream key value is invalid"))) } -pub fn extract_session_key(headers: &MetadataMap) -> Result { +pub fn extract_session_key(headers: &MetadataMap) -> Result> { // Extract username and password from the request using basic auth extractor. let basic = extract_basic_auth(headers).map(|creds| SessionKey::BasicAuth { username: creds.user_id, @@ -261,7 +261,9 @@ pub fn extract_session_key(headers: &MetadataMap) -> Result return Ok(SessionKey::SessionId(session)); } - Err(Status::unauthenticated("No authentication method supplied")) + Err(Box::new(Status::unauthenticated( + "No authentication method supplied", + ))) } fn extract_basic_auth(header: &MetadataMap) -> Option { diff --git a/src/utils/arrow/flight.rs b/src/utils/arrow/flight.rs index 180d352d5..e310f51f4 100644 --- a/src/utils/arrow/flight.rs +++ b/src/utils/arrow/flight.rs @@ -41,9 +41,9 @@ use tonic::transport::{Channel, Uri}; pub type DoGetStream = stream::BoxStream<'static, Result>; -pub fn get_query_from_ticket(req: &Request) -> Result { +pub fn get_query_from_ticket(req: &Request) -> Result> { serde_json::from_slice::(&req.get_ref().ticket) - .map_err(|err| Status::internal(err.to_string())) + .map_err(|err| Box::new(Status::internal(err.to_string()))) } pub async fn run_do_get_rpc( @@ -141,7 +141,7 @@ fn lit_timestamp_milli(time: i64) -> Expr { Expr::Literal(ScalarValue::TimestampMillisecond(Some(time), None)) } -pub fn into_flight_data(records: Vec) -> Result, Status> { +pub fn into_flight_data(records: Vec) -> Result, Box> { let input_stream = futures::stream::iter(records.into_iter().map(Ok)); let write_options = IpcWriteOptions::default() .try_with_compression(Some(arrow_ipc::CompressionType(1))) From 4731d405419943795dde8d92e4b50533dfde7902 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Fri, 16 May 2025 23:39:08 -0400 Subject: [PATCH 7/8] refactor, add comments --- src/handlers/airplane.rs | 11 +++++- src/handlers/http/query.rs | 76 +++++++++++++++++++++++++++++++++++--- src/query/mod.rs | 66 ++++++++++++++------------------- 3 files changed, 109 insertions(+), 44 deletions(-) diff --git a/src/handlers/airplane.rs b/src/handlers/airplane.rs index 7e0c83bd0..d2528d3a8 100644 --- a/src/handlers/airplane.rs +++ b/src/handlers/airplane.rs @@ -219,10 +219,19 @@ impl FlightService for AirServiceImpl { })?; let time = Instant::now(); - let (records, _) = execute(query, &stream_name) + let (records, _) = execute(query, &stream_name, false) .await .map_err(|err| Status::internal(err.to_string()))?; + let records = match records { + actix_web::Either::Left(rbs) => rbs, + actix_web::Either::Right(_) => { + return Err(Status::failed_precondition( + "Expected batch results, got stream", + )) + } + }; + /* * INFO: No returning the schema with the data. * kept it in case it needs to be sent in the future. diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 2bdc5476a..b1c647ccc 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -20,7 +20,7 @@ use crate::event::error::EventError; use crate::handlers::http::fetch_schema; use actix_web::http::header::ContentType; use actix_web::web::{self, Json}; -use actix_web::{FromRequest, HttpRequest, HttpResponse, Responder}; +use actix_web::{Either, FromRequest, HttpRequest, HttpResponse, Responder}; use bytes::Bytes; use chrono::{DateTime, Utc}; use datafusion::common::tree_node::TreeNode; @@ -43,7 +43,7 @@ use crate::metrics::QUERY_EXECUTE_TIME; use crate::option::Mode; use crate::parseable::{StreamNotFound, PARSEABLE}; use crate::query::error::ExecuteError; -use crate::query::{execute, execute_stream, CountsRequest, CountsResponse, Query as LogicalQuery}; +use crate::query::{execute, CountsRequest, CountsResponse, Query as LogicalQuery}; use crate::query::{TableScanVisitor, QUERY_SESSION}; use crate::rbac::Users; use crate::response::QueryResponse; @@ -104,17 +104,38 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result` + // we use the `get_bin_density` method to get the count of records in the dataset + // instead of executing the query using datafusion if let Some(column_name) = query.is_logical_plan_count_without_filters() { return handle_count_query(&query_request, &table_name, column_name, time).await; } + // if the query request has streaming = false (default) + // we use datafusion's `execute` method to get the records if !query_request.streaming { return handle_non_streaming_query(query, &table_name, &query_request, time).await; } + // if the query request has streaming = true + // we use datafusion's `execute_stream` method to get the records handle_streaming_query(query, &table_name, &query_request, time).await } +/// Handles count queries (e.g., `SELECT COUNT(*) FROM `) +/// +/// Instead of executing the query through DataFusion, this function uses the +/// `CountsRequest::get_bin_density` method to quickly retrieve the count of records +/// in the specified dataset and time range. +/// +/// # Arguments +/// - `query_request`: The original query request from the client. +/// - `table_name`: The name of the table/dataset to count records in. +/// - `column_name`: The column being counted (usually `*`). +/// - `time`: The timer for measuring query execution time. +/// +/// # Returns +/// - `HttpResponse` with the count result as JSON, including fields if requested. async fn handle_count_query( query_request: &Query, table_name: &str, @@ -150,13 +171,35 @@ async fn handle_count_query( .json(response)) } +/// Handles standard (non-streaming) queries, returning all results in a single JSON response. +/// +/// Executes the logical query using DataFusion's batch execution, collects all results, +/// and serializes them into a single JSON object. The response includes the records, +/// field names, and other metadata as specified in the query request. +/// +/// # Arguments +/// - `query`: The logical query to execute. +/// - `table_name`: The name of the table/dataset being queried. +/// - `query_request`: The original query request from the client. +/// - `time`: The timer for measuring query execution time. +/// +/// # Returns +/// - `HttpResponse` with the full query result as a JSON object. async fn handle_non_streaming_query( query: LogicalQuery, table_name: &str, query_request: &Query, time: Instant, ) -> Result { - let (records, fields) = execute(query, table_name).await?; + let (records, fields) = execute(query, table_name, query_request.streaming).await?; + let records = match records { + Either::Left(rbs) => rbs, + Either::Right(_) => { + return Err(QueryError::MalformedQuery( + "Expected batch results, got stream", + )) + } + }; let total_time = format!("{:?}", time.elapsed()); let time = time.elapsed().as_secs_f64(); @@ -175,13 +218,36 @@ async fn handle_non_streaming_query( .json(response)) } +/// Handles streaming queries, returning results as newline-delimited JSON (NDJSON). +/// +/// Executes the logical query using DataFusion's streaming execution. If the `fields` +/// flag is set, the first chunk of the response contains the field names as a JSON object. +/// Each subsequent chunk contains a record batch as a JSON object, separated by newlines. +/// This allows clients to start processing results before the entire query completes. +/// +/// # Arguments +/// - `query`: The logical query to execute. +/// - `table_name`: The name of the table/dataset being queried. +/// - `query_request`: The original query request from the client. +/// - `time`: The timer for measuring query execution time. +/// +/// # Returns +/// - `HttpResponse` streaming the query results as NDJSON, optionally prefixed with the fields array. async fn handle_streaming_query( query: LogicalQuery, table_name: &str, query_request: &Query, time: Instant, ) -> Result { - let (records_stream, fields) = execute_stream(query, table_name).await?; + let (records_stream, fields) = execute(query, table_name, query_request.streaming).await?; + let records_stream = match records_stream { + Either::Left(_) => { + return Err(QueryError::MalformedQuery( + "Expected stream results, got batch", + )) + } + Either::Right(stream) => stream, + }; let fields = fields.clone(); let total_time = format!("{:?}", time.elapsed()); let time = time.elapsed().as_secs_f64(); @@ -193,7 +259,7 @@ async fn handle_streaming_query( let with_fields = query_request.fields; let stream = if with_fields { - // send the fields as an initial chunk + // send the fields json as an initial chunk let fields_json = serde_json::json!({ "fields": fields }) diff --git a/src/query/mod.rs b/src/query/mod.rs index e24aab2fc..e9c5632e7 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -20,10 +20,10 @@ mod filter_optimizer; mod listing_table_builder; pub mod stream_schema_provider; +use actix_web::Either; use chrono::NaiveDateTime; use chrono::{DateTime, Duration, Utc}; use datafusion::arrow::record_batch::RecordBatch; - use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion::error::DataFusionError; use datafusion::execution::disk_manager::DiskManagerConfig; @@ -70,21 +70,17 @@ pub static QUERY_RUNTIME: Lazy = pub async fn execute( query: Query, stream_name: &str, -) -> Result<(Vec, Vec), ExecuteError> { + is_streaming: bool, +) -> Result< + ( + Either, SendableRecordBatchStream>, + Vec, + ), + ExecuteError, +> { let time_partition = PARSEABLE.get_stream(stream_name)?.get_time_partition(); QUERY_RUNTIME - .spawn(async move { query.execute(time_partition.as_ref()).await }) - .await - .expect("The Join should have been successful") -} - -pub async fn execute_stream( - query: Query, - stream_name: &str, -) -> Result<(SendableRecordBatchStream, Vec), ExecuteError> { - let time_partition = PARSEABLE.get_stream(stream_name)?.get_time_partition(); - QUERY_RUNTIME - .spawn(async move { query.execute_stream(time_partition.as_ref()).await }) + .spawn(async move { query.execute(time_partition.as_ref(), is_streaming).await }) .await .expect("The Join should have been successful") } @@ -168,10 +164,20 @@ impl Query { SessionContext::new_with_state(state) } + /// this function returns the result of the query + /// if streaming is true, it returns a stream + /// if streaming is false, it returns a vector of record batches pub async fn execute( &self, time_partition: Option<&String>, - ) -> Result<(Vec, Vec), ExecuteError> { + is_streaming: bool, + ) -> Result< + ( + Either, SendableRecordBatchStream>, + Vec, + ), + ExecuteError, + > { let df = QUERY_SESSION .execute_logical_plan(self.final_logical_plan(time_partition)) .await?; @@ -184,35 +190,19 @@ impl Query { .cloned() .collect_vec(); - if fields.is_empty() { - return Ok((vec![], fields)); + if fields.is_empty() && !is_streaming { + return Ok((Either::Left(vec![]), fields)); } - let results = df.collect().await?; + let results = if !is_streaming { + Either::Left(df.collect().await?) + } else { + Either::Right(df.execute_stream().await?) + }; Ok((results, fields)) } - // execute stream - pub async fn execute_stream( - &self, - time_partition: Option<&String>, - ) -> Result<(SendableRecordBatchStream, Vec), ExecuteError> { - let df = QUERY_SESSION - .execute_logical_plan(self.final_logical_plan(time_partition)) - .await?; - let fields = df - .schema() - .fields() - .iter() - .map(|f| f.name()) - .cloned() - .collect_vec(); - let stream = df.execute_stream().await?; - - Ok((stream, fields)) - } - pub async fn get_dataframe( &self, time_partition: Option<&String>, From ae927499c2feef6c2fae65ca064fa5c31d1dd8d4 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sat, 17 May 2025 00:08:04 -0400 Subject: [PATCH 8/8] optimise streaming --- src/handlers/http/query.rs | 64 +++++++++++++++++--------------------- 1 file changed, 29 insertions(+), 35 deletions(-) diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index b1c647ccc..9312ba77d 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -21,6 +21,7 @@ use crate::handlers::http::fetch_schema; use actix_web::http::header::ContentType; use actix_web::web::{self, Json}; use actix_web::{Either, FromRequest, HttpRequest, HttpResponse, Responder}; +use arrow_array::RecordBatch; use bytes::Bytes; use chrono::{DateTime, Utc}; use datafusion::common::tree_node::TreeNode; @@ -248,7 +249,6 @@ async fn handle_streaming_query( } Either::Right(stream) => stream, }; - let fields = fields.clone(); let total_time = format!("{:?}", time.elapsed()); let time = time.elapsed().as_secs_f64(); QUERY_EXECUTE_TIME @@ -266,22 +266,10 @@ async fn handle_streaming_query( .to_string(); // stream the records without fields - let records_stream = records_stream.map(move |batch_result| match batch_result { - Ok(batch) => { - let response = QueryResponse { - records: vec![batch], - fields: Vec::new(), - fill_null: send_null, - with_fields: false, - } - .to_json() - .unwrap_or_else(|e| { - error!("Failed to parse record batch into JSON: {}", e); - json!({}) - }); - Ok(Bytes::from(format!("{}\n", response))) - } - Err(e) => Err(actix_web::error::ErrorInternalServerError(e)), + let mut batch_processor = create_batch_processor(send_null); + let records_stream = records_stream.map(move |batch_result| { + let batch_result = batch_result.map_err(QueryError::from); + batch_processor(batch_result) }); // Combine the initial fields chunk with the records stream @@ -292,24 +280,9 @@ async fn handle_streaming_query( Box::pin(fields_chunk.chain(records_stream)) as Pin>>> } else { - let stream = records_stream.map(move |batch_result| match batch_result { - Ok(batch) => { - let response = QueryResponse { - records: vec![batch], - fields: fields.clone(), - fill_null: send_null, - with_fields, - } - .to_json() - .unwrap_or_else(|e| { - error!("Failed to parse record batch into JSON: {}", e); - json!({}) - }); - Ok(Bytes::from(format!("{}\n", response))) - } - Err(e) => Err(actix_web::error::ErrorInternalServerError(e)), - }); - + let mut batch_processor = create_batch_processor(send_null); + let stream = records_stream + .map(move |batch_result| batch_processor(batch_result.map_err(QueryError::from))); Box::pin(stream) as Pin>>> }; @@ -319,6 +292,27 @@ async fn handle_streaming_query( .streaming(stream)) } +fn create_batch_processor( + send_null: bool, +) -> impl FnMut(Result) -> Result { + move |batch_result| match batch_result { + Ok(batch) => { + let response = QueryResponse { + records: vec![batch], + fields: Vec::new(), + fill_null: send_null, + with_fields: false, + } + .to_json() + .map_err(|e| { + error!("Failed to parse record batch into JSON: {}", e); + actix_web::error::ErrorInternalServerError(e) + })?; + Ok(Bytes::from(format!("{}\n", response))) + } + Err(e) => Err(actix_web::error::ErrorInternalServerError(e)), + } +} pub async fn get_counts( req: HttpRequest, counts_request: Json,