Skip to content

feat: streaming response #1317

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 17, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ static-files = "0.2"
thiserror = "2.0"
ulid = { version = "1.0", features = ["serde"] }
xxhash-rust = { version = "0.8", features = ["xxh3"] }
futures-core = "0.3.31"

[build-dependencies]
cargo_toml = "0.21"
Expand Down
189 changes: 147 additions & 42 deletions src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,43 @@
*
*/

use crate::event::error::EventError;
use crate::handlers::http::fetch_schema;
use actix_web::http::header::ContentType;
use actix_web::web::{self, Json};
use actix_web::{FromRequest, HttpRequest, HttpResponse, Responder};
use bytes::Bytes;
use chrono::{DateTime, Utc};
use datafusion::common::tree_node::TreeNode;
use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionState;
use futures::stream::once;
use futures::{future, Stream, StreamExt};
use futures_util::Future;
use http::StatusCode;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use serde_json::json;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;
use tracing::error;

use crate::event::error::EventError;
use crate::handlers::http::fetch_schema;

use crate::event::commit_schema;
use crate::metrics::QUERY_EXECUTE_TIME;
use crate::option::Mode;
use crate::parseable::{StreamNotFound, PARSEABLE};
use crate::query::error::ExecuteError;
use crate::query::{execute, CountsRequest, CountsResponse, Query as LogicalQuery};
use crate::query::{execute, execute_stream, CountsRequest, CountsResponse, Query as LogicalQuery};
use crate::query::{TableScanVisitor, QUERY_SESSION};
use crate::rbac::Users;
use crate::response::{QueryResponse, TIME_ELAPSED_HEADER};
use crate::response::QueryResponse;
use crate::storage::ObjectStorageError;
use crate::utils::actix::extract_session_key_from_req;
use crate::utils::time::{TimeParseError, TimeRange};
use crate::utils::user_auth_for_datasets;

const TIME_ELAPSED_HEADER: &str = "p-time-elapsed";
/// Query Request through http endpoint.
#[derive(Debug, Deserialize, Serialize, Clone)]
#[serde(rename_all = "camelCase")]
Expand All @@ -62,6 +65,8 @@ pub struct Query {
#[serde(skip)]
pub fields: bool,
#[serde(skip)]
pub streaming: bool,
#[serde(skip)]
pub filter_tags: Option<Vec<String>>,
}

Expand All @@ -73,7 +78,6 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
{
Ok(raw_logical_plan) => raw_logical_plan,
Err(_) => {
//if logical plan creation fails, create streams and try again
create_streams_for_querier().await;
session_state
.create_logical_plan(&query_request.query)
Expand All @@ -83,10 +87,8 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
let time_range =
TimeRange::parse_human_time(&query_request.start_time, &query_request.end_time)?;

// Create a visitor to extract the table names present in query
let mut visitor = TableScanVisitor::default();
let _ = raw_logical_plan.visit(&mut visitor);

let tables = visitor.into_inner();
update_schema_when_distributed(&tables).await?;
let query: LogicalQuery = into_query(&query_request, &session_state, time_range).await?;
Expand All @@ -101,56 +103,154 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
user_auth_for_datasets(&permissions, &tables)?;

let time = Instant::now();
// Intercept `count(*)`` queries and use the counts API

if let Some(column_name) = query.is_logical_plan_count_without_filters() {
let counts_req = CountsRequest {
stream: table_name.clone(),
start_time: query_request.start_time.clone(),
end_time: query_request.end_time.clone(),
num_bins: 1,
};
let count_records = counts_req.get_bin_density().await?;
// NOTE: this should not panic, since there is atleast one bin, always
let count = count_records[0].count;
let response = if query_request.fields {
json!({
"fields": [&column_name],
"records": [json!({column_name: count})]
})
} else {
Value::Array(vec![json!({column_name: count})])
};
return handle_count_query(&query_request, &table_name, column_name, time).await;
}

let total_time = format!("{:?}", time.elapsed());
let time = time.elapsed().as_secs_f64();
if !query_request.streaming {
return handle_non_streaming_query(query, &table_name, &query_request, time).await;
}

QUERY_EXECUTE_TIME
.with_label_values(&[&table_name])
.observe(time);
handle_streaming_query(query, &table_name, &query_request, time).await
}

return Ok(HttpResponse::Ok()
.insert_header((TIME_ELAPSED_HEADER, total_time.as_str()))
.json(response));
}
async fn handle_count_query(
query_request: &Query,
table_name: &str,
column_name: &str,
time: Instant,
) -> Result<HttpResponse, QueryError> {
let counts_req = CountsRequest {
stream: table_name.to_string(),
start_time: query_request.start_time.clone(),
end_time: query_request.end_time.clone(),
num_bins: 1,
};
let count_records = counts_req.get_bin_density().await?;
let count = count_records[0].count;
let response = if query_request.fields {
json!({
"fields": [column_name],
"records": [json!({column_name: count})]
})
} else {
serde_json::Value::Array(vec![json!({column_name: count})])
};

let total_time = format!("{:?}", time.elapsed());
let time = time.elapsed().as_secs_f64();

let (records, fields) = execute(query, &table_name).await?;
QUERY_EXECUTE_TIME
.with_label_values(&[table_name])
.observe(time);

Ok(HttpResponse::Ok()
.insert_header((TIME_ELAPSED_HEADER, total_time.as_str()))
.json(response))
}

async fn handle_non_streaming_query(
query: LogicalQuery,
table_name: &str,
query_request: &Query,
time: Instant,
) -> Result<HttpResponse, QueryError> {
let (records, fields) = execute(query, table_name).await?;
let total_time = format!("{:?}", time.elapsed());
let time = time.elapsed().as_secs_f64();

QUERY_EXECUTE_TIME
.with_label_values(&[table_name])
.observe(time);
let response = QueryResponse {
records,
fields,
fill_null: query_request.send_null,
with_fields: query_request.fields,
total_time,
}
.to_http()?;
.to_json()?;
Ok(HttpResponse::Ok()
.insert_header((TIME_ELAPSED_HEADER, total_time.as_str()))
.json(response))
}

async fn handle_streaming_query(
query: LogicalQuery,
table_name: &str,
query_request: &Query,
time: Instant,
) -> Result<HttpResponse, QueryError> {
let (records_stream, fields) = execute_stream(query, table_name).await?;
let fields = fields.clone();
let total_time = format!("{:?}", time.elapsed());
let time = time.elapsed().as_secs_f64();

QUERY_EXECUTE_TIME
.with_label_values(&[&table_name])
.with_label_values(&[table_name])
.observe(time);

Ok(response)
let send_null = query_request.send_null;
let with_fields = query_request.fields;

let stream = if with_fields {
// send the fields as an initial chunk
let fields_json = serde_json::json!({
"fields": fields
})
.to_string();

// stream the records without fields
let records_stream = records_stream.map(move |batch_result| match batch_result {
Ok(batch) => {
let response = QueryResponse {
records: vec![batch],
fields: Vec::new(),
fill_null: send_null,
with_fields: false,
}
.to_json()
.unwrap_or_else(|e| {
error!("Failed to parse record batch into JSON: {}", e);
json!({})
});
Ok(Bytes::from(format!("{}\n", response)))
}
Err(e) => Err(actix_web::error::ErrorInternalServerError(e)),
});

// Combine the initial fields chunk with the records stream
let fields_chunk = once(future::ok::<_, actix_web::Error>(Bytes::from(format!(
"{}\n",
fields_json
))));
Box::pin(fields_chunk.chain(records_stream))
as Pin<Box<dyn Stream<Item = Result<Bytes, actix_web::Error>>>>
} else {
let stream = records_stream.map(move |batch_result| match batch_result {
Ok(batch) => {
let response = QueryResponse {
records: vec![batch],
fields: fields.clone(),
fill_null: send_null,
with_fields,
}
.to_json()
.unwrap_or_else(|e| {
error!("Failed to parse record batch into JSON: {}", e);
json!({})
});
Ok(Bytes::from(format!("{}\n", response)))
}
Err(e) => Err(actix_web::error::ErrorInternalServerError(e)),
});

Box::pin(stream) as Pin<Box<dyn Stream<Item = Result<Bytes, actix_web::Error>>>>
};

Ok(HttpResponse::Ok()
.content_type("application/x-ndjson")
.insert_header((TIME_ELAPSED_HEADER, total_time.as_str()))
.streaming(stream))
}

pub async fn get_counts(
Expand Down Expand Up @@ -222,6 +322,10 @@ impl FromRequest for Query {
query.send_null = params.get("sendNull").cloned().unwrap_or(false);
}

if !query.streaming {
query.streaming = params.get("streaming").cloned().unwrap_or(false);
}

Ok(query)
};

Expand Down Expand Up @@ -285,6 +389,7 @@ fn transform_query_for_ingestor(query: &Query) -> Option<Query> {
send_null: query.send_null,
start_time: start_time.to_rfc3339(),
end_time: end_time.to_rfc3339(),
streaming: query.streaming,
};

Some(q)
Expand Down
33 changes: 32 additions & 1 deletion src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion::error::DataFusionError;
use datafusion::execution::disk_manager::DiskManagerConfig;
use datafusion::execution::SessionStateBuilder;
use datafusion::execution::{SendableRecordBatchStream, SessionStateBuilder};
use datafusion::logical_expr::expr::Alias;
use datafusion::logical_expr::{
Aggregate, Explain, Filter, LogicalPlan, PlanType, Projection, ToStringifiedPlan,
Expand Down Expand Up @@ -78,6 +78,17 @@ pub async fn execute(
.expect("The Join should have been successful")
}

pub async fn execute_stream(
query: Query,
stream_name: &str,
) -> Result<(SendableRecordBatchStream, Vec<String>), ExecuteError> {
let time_partition = PARSEABLE.get_stream(stream_name)?.get_time_partition();
QUERY_RUNTIME
.spawn(async move { query.execute_stream(time_partition.as_ref()).await })
.await
.expect("The Join should have been successful")
}

// A query request by client
#[derive(Debug)]
pub struct Query {
Expand Down Expand Up @@ -182,6 +193,26 @@ impl Query {
Ok((results, fields))
}

// execute stream
pub async fn execute_stream(
&self,
time_partition: Option<&String>,
) -> Result<(SendableRecordBatchStream, Vec<String>), ExecuteError> {
let df = QUERY_SESSION
.execute_logical_plan(self.final_logical_plan(time_partition))
.await?;
let fields = df
.schema()
.fields()
.iter()
.map(|f| f.name())
.cloned()
.collect_vec();
let stream = df.execute_stream().await?;

Ok((stream, fields))
}

pub async fn get_dataframe(
&self,
time_partition: Option<&String>,
Expand Down
10 changes: 2 additions & 8 deletions src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,20 @@
*/

use crate::{handlers::http::query::QueryError, utils::arrow::record_batches_to_json};
use actix_web::HttpResponse;
use datafusion::arrow::record_batch::RecordBatch;
use itertools::Itertools;
use serde_json::{json, Value};
use tracing::info;

pub const TIME_ELAPSED_HEADER: &str = "p-time-elapsed";

pub struct QueryResponse {
pub records: Vec<RecordBatch>,
pub fields: Vec<String>,
pub fill_null: bool,
pub with_fields: bool,
pub total_time: String,
}

impl QueryResponse {
pub fn to_http(&self) -> Result<HttpResponse, QueryError> {
pub fn to_json(&self) -> Result<Value, QueryError> {
info!("{}", "Returning query results");
let mut json_records = record_batches_to_json(&self.records)?;

Expand All @@ -58,8 +54,6 @@ impl QueryResponse {
Value::Array(values)
};

Ok(HttpResponse::Ok()
.insert_header((TIME_ELAPSED_HEADER, self.total_time.as_str()))
.json(response))
Ok(response)
}
}
Loading