diff --git a/Cargo.lock b/Cargo.lock index 279dbf16a..bee3513fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3478,6 +3478,7 @@ dependencies = [ "derive_more 1.0.0", "fs_extra", "futures", + "futures-core", "futures-util", "hex", "hostname", diff --git a/Cargo.toml b/Cargo.toml index a8763aa91..b73c83470 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -121,6 +121,7 @@ static-files = "0.2" thiserror = "2.0" ulid = { version = "1.0", features = ["serde"] } xxhash-rust = { version = "0.8", features = ["xxh3"] } +futures-core = "0.3.31" [build-dependencies] cargo_toml = "0.21" diff --git a/src/catalog/mod.rs b/src/catalog/mod.rs index 8db1a49b4..f8113545a 100644 --- a/src/catalog/mod.rs +++ b/src/catalog/mod.rs @@ -16,7 +16,7 @@ * */ -use std::{io::ErrorKind, sync::Arc}; +use std::sync::Arc; use chrono::{DateTime, Local, NaiveTime, Utc}; use column::Column; @@ -259,10 +259,7 @@ async fn create_manifest( .date_naive() .and_time( NaiveTime::from_num_seconds_from_midnight_opt(23 * 3600 + 59 * 60 + 59, 999_999_999) - .ok_or(IOError::new( - ErrorKind::Other, - "Failed to create upper bound for manifest", - ))?, + .ok_or(IOError::other("Failed to create upper bound for manifest"))?, ) .and_utc(); diff --git a/src/handlers/airplane.rs b/src/handlers/airplane.rs index 8a5551c2f..d2528d3a8 100644 --- a/src/handlers/airplane.rs +++ b/src/handlers/airplane.rs @@ -126,9 +126,11 @@ impl FlightService for AirServiceImpl { } async fn do_get(&self, req: Request) -> Result, Status> { - let key = extract_session_key(req.metadata())?; + let key = extract_session_key(req.metadata()) + .map_err(|e| Status::unauthenticated(e.to_string()))?; - let ticket = get_query_from_ticket(&req)?; + let ticket = + get_query_from_ticket(&req).map_err(|e| Status::invalid_argument(e.to_string()))?; info!("query requested to airplane: {:?}", ticket); @@ -217,10 +219,19 @@ impl FlightService for AirServiceImpl { })?; let time = Instant::now(); - let (records, _) = execute(query, &stream_name) + let (records, _) = execute(query, &stream_name, false) .await .map_err(|err| Status::internal(err.to_string()))?; + let records = match records { + actix_web::Either::Left(rbs) => rbs, + actix_web::Either::Right(_) => { + return Err(Status::failed_precondition( + "Expected batch results, got stream", + )) + } + }; + /* * INFO: No returning the schema with the data. * kept it in case it needs to be sent in the future. @@ -246,7 +257,7 @@ impl FlightService for AirServiceImpl { .observe(time); // Airplane takes off 🛫 - out + out.map_err(|e| *e) } async fn do_put( diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 368fe8b9c..9312ba77d 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -16,26 +16,29 @@ * */ +use crate::event::error::EventError; +use crate::handlers::http::fetch_schema; use actix_web::http::header::ContentType; use actix_web::web::{self, Json}; -use actix_web::{FromRequest, HttpRequest, HttpResponse, Responder}; +use actix_web::{Either, FromRequest, HttpRequest, HttpResponse, Responder}; +use arrow_array::RecordBatch; +use bytes::Bytes; use chrono::{DateTime, Utc}; use datafusion::common::tree_node::TreeNode; use datafusion::error::DataFusionError; use datafusion::execution::context::SessionState; +use futures::stream::once; +use futures::{future, Stream, StreamExt}; use futures_util::Future; use http::StatusCode; use serde::{Deserialize, Serialize}; -use serde_json::{json, Value}; +use serde_json::json; use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; use std::time::Instant; use tracing::error; -use crate::event::error::EventError; -use crate::handlers::http::fetch_schema; - use crate::event::commit_schema; use crate::metrics::QUERY_EXECUTE_TIME; use crate::option::Mode; @@ -44,12 +47,13 @@ use crate::query::error::ExecuteError; use crate::query::{execute, CountsRequest, CountsResponse, Query as LogicalQuery}; use crate::query::{TableScanVisitor, QUERY_SESSION}; use crate::rbac::Users; -use crate::response::{QueryResponse, TIME_ELAPSED_HEADER}; +use crate::response::QueryResponse; use crate::storage::ObjectStorageError; use crate::utils::actix::extract_session_key_from_req; use crate::utils::time::{TimeParseError, TimeRange}; use crate::utils::user_auth_for_datasets; +const TIME_ELAPSED_HEADER: &str = "p-time-elapsed"; /// Query Request through http endpoint. #[derive(Debug, Deserialize, Serialize, Clone)] #[serde(rename_all = "camelCase")] @@ -62,6 +66,8 @@ pub struct Query { #[serde(skip)] pub fields: bool, #[serde(skip)] + pub streaming: bool, + #[serde(skip)] pub filter_tags: Option>, } @@ -73,7 +79,6 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result raw_logical_plan, Err(_) => { - //if logical plan creation fails, create streams and try again create_streams_for_querier().await; session_state .create_logical_plan(&query_request.query) @@ -83,10 +88,8 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Result` + // we use the `get_bin_density` method to get the count of records in the dataset + // instead of executing the query using datafusion if let Some(column_name) = query.is_logical_plan_count_without_filters() { - let counts_req = CountsRequest { - stream: table_name.clone(), - start_time: query_request.start_time.clone(), - end_time: query_request.end_time.clone(), - num_bins: 1, - }; - let count_records = counts_req.get_bin_density().await?; - // NOTE: this should not panic, since there is atleast one bin, always - let count = count_records[0].count; - let response = if query_request.fields { - json!({ - "fields": [&column_name], - "records": [json!({column_name: count})] - }) - } else { - Value::Array(vec![json!({column_name: count})]) - }; + return handle_count_query(&query_request, &table_name, column_name, time).await; + } - let total_time = format!("{:?}", time.elapsed()); - let time = time.elapsed().as_secs_f64(); + // if the query request has streaming = false (default) + // we use datafusion's `execute` method to get the records + if !query_request.streaming { + return handle_non_streaming_query(query, &table_name, &query_request, time).await; + } - QUERY_EXECUTE_TIME - .with_label_values(&[&table_name]) - .observe(time); + // if the query request has streaming = true + // we use datafusion's `execute_stream` method to get the records + handle_streaming_query(query, &table_name, &query_request, time).await +} - return Ok(HttpResponse::Ok() - .insert_header((TIME_ELAPSED_HEADER, total_time.as_str())) - .json(response)); - } +/// Handles count queries (e.g., `SELECT COUNT(*) FROM `) +/// +/// Instead of executing the query through DataFusion, this function uses the +/// `CountsRequest::get_bin_density` method to quickly retrieve the count of records +/// in the specified dataset and time range. +/// +/// # Arguments +/// - `query_request`: The original query request from the client. +/// - `table_name`: The name of the table/dataset to count records in. +/// - `column_name`: The column being counted (usually `*`). +/// - `time`: The timer for measuring query execution time. +/// +/// # Returns +/// - `HttpResponse` with the count result as JSON, including fields if requested. +async fn handle_count_query( + query_request: &Query, + table_name: &str, + column_name: &str, + time: Instant, +) -> Result { + let counts_req = CountsRequest { + stream: table_name.to_string(), + start_time: query_request.start_time.clone(), + end_time: query_request.end_time.clone(), + num_bins: 1, + }; + let count_records = counts_req.get_bin_density().await?; + let count = count_records[0].count; + let response = if query_request.fields { + json!({ + "fields": [column_name], + "records": [json!({column_name: count})] + }) + } else { + serde_json::Value::Array(vec![json!({column_name: count})]) + }; - let (records, fields) = execute(query, &table_name).await?; let total_time = format!("{:?}", time.elapsed()); + let time = time.elapsed().as_secs_f64(); + + QUERY_EXECUTE_TIME + .with_label_values(&[table_name]) + .observe(time); + + Ok(HttpResponse::Ok() + .insert_header((TIME_ELAPSED_HEADER, total_time.as_str())) + .json(response)) +} + +/// Handles standard (non-streaming) queries, returning all results in a single JSON response. +/// +/// Executes the logical query using DataFusion's batch execution, collects all results, +/// and serializes them into a single JSON object. The response includes the records, +/// field names, and other metadata as specified in the query request. +/// +/// # Arguments +/// - `query`: The logical query to execute. +/// - `table_name`: The name of the table/dataset being queried. +/// - `query_request`: The original query request from the client. +/// - `time`: The timer for measuring query execution time. +/// +/// # Returns +/// - `HttpResponse` with the full query result as a JSON object. +async fn handle_non_streaming_query( + query: LogicalQuery, + table_name: &str, + query_request: &Query, + time: Instant, +) -> Result { + let (records, fields) = execute(query, table_name, query_request.streaming).await?; + let records = match records { + Either::Left(rbs) => rbs, + Either::Right(_) => { + return Err(QueryError::MalformedQuery( + "Expected batch results, got stream", + )) + } + }; + let total_time = format!("{:?}", time.elapsed()); + let time = time.elapsed().as_secs_f64(); + + QUERY_EXECUTE_TIME + .with_label_values(&[table_name]) + .observe(time); let response = QueryResponse { records, fields, fill_null: query_request.send_null, with_fields: query_request.fields, - total_time, } - .to_http()?; + .to_json()?; + Ok(HttpResponse::Ok() + .insert_header((TIME_ELAPSED_HEADER, total_time.as_str())) + .json(response)) +} +/// Handles streaming queries, returning results as newline-delimited JSON (NDJSON). +/// +/// Executes the logical query using DataFusion's streaming execution. If the `fields` +/// flag is set, the first chunk of the response contains the field names as a JSON object. +/// Each subsequent chunk contains a record batch as a JSON object, separated by newlines. +/// This allows clients to start processing results before the entire query completes. +/// +/// # Arguments +/// - `query`: The logical query to execute. +/// - `table_name`: The name of the table/dataset being queried. +/// - `query_request`: The original query request from the client. +/// - `time`: The timer for measuring query execution time. +/// +/// # Returns +/// - `HttpResponse` streaming the query results as NDJSON, optionally prefixed with the fields array. +async fn handle_streaming_query( + query: LogicalQuery, + table_name: &str, + query_request: &Query, + time: Instant, +) -> Result { + let (records_stream, fields) = execute(query, table_name, query_request.streaming).await?; + let records_stream = match records_stream { + Either::Left(_) => { + return Err(QueryError::MalformedQuery( + "Expected stream results, got batch", + )) + } + Either::Right(stream) => stream, + }; + let total_time = format!("{:?}", time.elapsed()); let time = time.elapsed().as_secs_f64(); - QUERY_EXECUTE_TIME - .with_label_values(&[&table_name]) + .with_label_values(&[table_name]) .observe(time); - Ok(response) + let send_null = query_request.send_null; + let with_fields = query_request.fields; + + let stream = if with_fields { + // send the fields json as an initial chunk + let fields_json = serde_json::json!({ + "fields": fields + }) + .to_string(); + + // stream the records without fields + let mut batch_processor = create_batch_processor(send_null); + let records_stream = records_stream.map(move |batch_result| { + let batch_result = batch_result.map_err(QueryError::from); + batch_processor(batch_result) + }); + + // Combine the initial fields chunk with the records stream + let fields_chunk = once(future::ok::<_, actix_web::Error>(Bytes::from(format!( + "{}\n", + fields_json + )))); + Box::pin(fields_chunk.chain(records_stream)) + as Pin>>> + } else { + let mut batch_processor = create_batch_processor(send_null); + let stream = records_stream + .map(move |batch_result| batch_processor(batch_result.map_err(QueryError::from))); + Box::pin(stream) as Pin>>> + }; + + Ok(HttpResponse::Ok() + .content_type("application/x-ndjson") + .insert_header((TIME_ELAPSED_HEADER, total_time.as_str())) + .streaming(stream)) } +fn create_batch_processor( + send_null: bool, +) -> impl FnMut(Result) -> Result { + move |batch_result| match batch_result { + Ok(batch) => { + let response = QueryResponse { + records: vec![batch], + fields: Vec::new(), + fill_null: send_null, + with_fields: false, + } + .to_json() + .map_err(|e| { + error!("Failed to parse record batch into JSON: {}", e); + actix_web::error::ErrorInternalServerError(e) + })?; + Ok(Bytes::from(format!("{}\n", response))) + } + Err(e) => Err(actix_web::error::ErrorInternalServerError(e)), + } +} pub async fn get_counts( req: HttpRequest, counts_request: Json, @@ -222,6 +382,10 @@ impl FromRequest for Query { query.send_null = params.get("sendNull").cloned().unwrap_or(false); } + if !query.streaming { + query.streaming = params.get("streaming").cloned().unwrap_or(false); + } + Ok(query) }; @@ -285,6 +449,7 @@ fn transform_query_for_ingestor(query: &Query) -> Option { send_null: query.send_null, start_time: start_time.to_rfc3339(), end_time: end_time.to_rfc3339(), + streaming: query.streaming, }; Some(q) diff --git a/src/handlers/livetail.rs b/src/handlers/livetail.rs index bfaa7205e..57630c93c 100644 --- a/src/handlers/livetail.rs +++ b/src/handlers/livetail.rs @@ -98,10 +98,10 @@ impl FlightService for FlightServiceImpl { } async fn do_get(&self, req: Request) -> Result, Status> { - let key = extract_session_key(req.metadata())?; + let key = extract_session_key(req.metadata()).map_err(|e| *e)?; let ticket: serde_json::Value = serde_json::from_slice(&req.into_inner().ticket) .map_err(|err| Status::internal(err.to_string()))?; - let stream = extract_stream(&ticket)?; + let stream = extract_stream(&ticket).map_err(|e| *e)?; info!("livetail requested for stream {}", stream); match Users.authorize(key, rbac::role::Action::Query, Some(stream), None) { rbac::Response::Authorized => (), @@ -232,16 +232,16 @@ pub fn server() -> impl Future Result<&str, Status> { +pub fn extract_stream(body: &serde_json::Value) -> Result<&str, Box> { body.as_object() - .ok_or(Status::invalid_argument("expected object in request body"))? + .ok_or_else(|| Box::new(Status::invalid_argument("expected object in request body")))? .get("stream") - .ok_or(Status::invalid_argument("stream key value is not provided"))? + .ok_or_else(|| Box::new(Status::invalid_argument("stream key value is not provided")))? .as_str() - .ok_or(Status::invalid_argument("stream key value is invalid")) + .ok_or_else(|| Box::new(Status::invalid_argument("stream key value is invalid"))) } -pub fn extract_session_key(headers: &MetadataMap) -> Result { +pub fn extract_session_key(headers: &MetadataMap) -> Result> { // Extract username and password from the request using basic auth extractor. let basic = extract_basic_auth(headers).map(|creds| SessionKey::BasicAuth { username: creds.user_id, @@ -261,7 +261,9 @@ pub fn extract_session_key(headers: &MetadataMap) -> Result return Ok(SessionKey::SessionId(session)); } - Err(Status::unauthenticated("No authentication method supplied")) + Err(Box::new(Status::unauthenticated( + "No authentication method supplied", + ))) } fn extract_basic_auth(header: &MetadataMap) -> Option { diff --git a/src/query/mod.rs b/src/query/mod.rs index 0608a9459..e9c5632e7 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -20,14 +20,14 @@ mod filter_optimizer; mod listing_table_builder; pub mod stream_schema_provider; +use actix_web::Either; use chrono::NaiveDateTime; use chrono::{DateTime, Duration, Utc}; use datafusion::arrow::record_batch::RecordBatch; - use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion::error::DataFusionError; use datafusion::execution::disk_manager::DiskManagerConfig; -use datafusion::execution::SessionStateBuilder; +use datafusion::execution::{SendableRecordBatchStream, SessionStateBuilder}; use datafusion::logical_expr::expr::Alias; use datafusion::logical_expr::{ Aggregate, Explain, Filter, LogicalPlan, PlanType, Projection, ToStringifiedPlan, @@ -70,10 +70,17 @@ pub static QUERY_RUNTIME: Lazy = pub async fn execute( query: Query, stream_name: &str, -) -> Result<(Vec, Vec), ExecuteError> { + is_streaming: bool, +) -> Result< + ( + Either, SendableRecordBatchStream>, + Vec, + ), + ExecuteError, +> { let time_partition = PARSEABLE.get_stream(stream_name)?.get_time_partition(); QUERY_RUNTIME - .spawn(async move { query.execute(time_partition.as_ref()).await }) + .spawn(async move { query.execute(time_partition.as_ref(), is_streaming).await }) .await .expect("The Join should have been successful") } @@ -157,10 +164,20 @@ impl Query { SessionContext::new_with_state(state) } + /// this function returns the result of the query + /// if streaming is true, it returns a stream + /// if streaming is false, it returns a vector of record batches pub async fn execute( &self, time_partition: Option<&String>, - ) -> Result<(Vec, Vec), ExecuteError> { + is_streaming: bool, + ) -> Result< + ( + Either, SendableRecordBatchStream>, + Vec, + ), + ExecuteError, + > { let df = QUERY_SESSION .execute_logical_plan(self.final_logical_plan(time_partition)) .await?; @@ -173,11 +190,15 @@ impl Query { .cloned() .collect_vec(); - if fields.is_empty() { - return Ok((vec![], fields)); + if fields.is_empty() && !is_streaming { + return Ok((Either::Left(vec![]), fields)); } - let results = df.collect().await?; + let results = if !is_streaming { + Either::Left(df.collect().await?) + } else { + Either::Right(df.execute_stream().await?) + }; Ok((results, fields)) } diff --git a/src/response.rs b/src/response.rs index 7aca5c44f..bd02ababe 100644 --- a/src/response.rs +++ b/src/response.rs @@ -17,24 +17,20 @@ */ use crate::{handlers::http::query::QueryError, utils::arrow::record_batches_to_json}; -use actix_web::HttpResponse; use datafusion::arrow::record_batch::RecordBatch; use itertools::Itertools; use serde_json::{json, Value}; use tracing::info; -pub const TIME_ELAPSED_HEADER: &str = "p-time-elapsed"; - pub struct QueryResponse { pub records: Vec, pub fields: Vec, pub fill_null: bool, pub with_fields: bool, - pub total_time: String, } impl QueryResponse { - pub fn to_http(&self) -> Result { + pub fn to_json(&self) -> Result { info!("{}", "Returning query results"); let mut json_records = record_batches_to_json(&self.records)?; @@ -58,8 +54,6 @@ impl QueryResponse { Value::Array(values) }; - Ok(HttpResponse::Ok() - .insert_header((TIME_ELAPSED_HEADER, self.total_time.as_str())) - .json(response)) + Ok(response) } } diff --git a/src/utils/arrow/flight.rs b/src/utils/arrow/flight.rs index 180d352d5..e310f51f4 100644 --- a/src/utils/arrow/flight.rs +++ b/src/utils/arrow/flight.rs @@ -41,9 +41,9 @@ use tonic::transport::{Channel, Uri}; pub type DoGetStream = stream::BoxStream<'static, Result>; -pub fn get_query_from_ticket(req: &Request) -> Result { +pub fn get_query_from_ticket(req: &Request) -> Result> { serde_json::from_slice::(&req.get_ref().ticket) - .map_err(|err| Status::internal(err.to_string())) + .map_err(|err| Box::new(Status::internal(err.to_string()))) } pub async fn run_do_get_rpc( @@ -141,7 +141,7 @@ fn lit_timestamp_milli(time: i64) -> Expr { Expr::Literal(ScalarValue::TimestampMillisecond(Some(time), None)) } -pub fn into_flight_data(records: Vec) -> Result, Status> { +pub fn into_flight_data(records: Vec) -> Result, Box> { let input_stream = futures::stream::iter(records.into_iter().map(Ok)); let write_options = IpcWriteOptions::default() .try_with_compression(Some(arrow_ipc::CompressionType(1)))