Skip to content

feat: streaming response #1317

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 17, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ static-files = "0.2"
thiserror = "2.0"
ulid = { version = "1.0", features = ["serde"] }
xxhash-rust = { version = "0.8", features = ["xxh3"] }
futures-core = "0.3.31"

[build-dependencies]
cargo_toml = "0.21"
Expand Down
7 changes: 2 additions & 5 deletions src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*
*/

use std::{io::ErrorKind, sync::Arc};
use std::sync::Arc;

use chrono::{DateTime, Local, NaiveTime, Utc};
use column::Column;
Expand Down Expand Up @@ -259,10 +259,7 @@ async fn create_manifest(
.date_naive()
.and_time(
NaiveTime::from_num_seconds_from_midnight_opt(23 * 3600 + 59 * 60 + 59, 999_999_999)
.ok_or(IOError::new(
ErrorKind::Other,
"Failed to create upper bound for manifest",
))?,
.ok_or(IOError::other("Failed to create upper bound for manifest"))?,
)
.and_utc();

Expand Down
8 changes: 5 additions & 3 deletions src/handlers/airplane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,11 @@ impl FlightService for AirServiceImpl {
}

async fn do_get(&self, req: Request<Ticket>) -> Result<Response<Self::DoGetStream>, Status> {
let key = extract_session_key(req.metadata())?;
let key = extract_session_key(req.metadata())
.map_err(|e| Status::unauthenticated(e.to_string()))?;

let ticket = get_query_from_ticket(&req)?;
let ticket =
get_query_from_ticket(&req).map_err(|e| Status::invalid_argument(e.to_string()))?;

info!("query requested to airplane: {:?}", ticket);

Expand Down Expand Up @@ -246,7 +248,7 @@ impl FlightService for AirServiceImpl {
.observe(time);

// Airplane takes off 🛫
out
out.map_err(|e| *e)
}

async fn do_put(
Expand Down
189 changes: 147 additions & 42 deletions src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,43 @@
*
*/

use crate::event::error::EventError;
use crate::handlers::http::fetch_schema;
use actix_web::http::header::ContentType;
use actix_web::web::{self, Json};
use actix_web::{FromRequest, HttpRequest, HttpResponse, Responder};
use bytes::Bytes;
use chrono::{DateTime, Utc};
use datafusion::common::tree_node::TreeNode;
use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionState;
use futures::stream::once;
use futures::{future, Stream, StreamExt};
use futures_util::Future;
use http::StatusCode;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use serde_json::json;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;
use tracing::error;

use crate::event::error::EventError;
use crate::handlers::http::fetch_schema;

use crate::event::commit_schema;
use crate::metrics::QUERY_EXECUTE_TIME;
use crate::option::Mode;
use crate::parseable::{StreamNotFound, PARSEABLE};
use crate::query::error::ExecuteError;
use crate::query::{execute, CountsRequest, CountsResponse, Query as LogicalQuery};
use crate::query::{execute, execute_stream, CountsRequest, CountsResponse, Query as LogicalQuery};
use crate::query::{TableScanVisitor, QUERY_SESSION};
use crate::rbac::Users;
use crate::response::{QueryResponse, TIME_ELAPSED_HEADER};
use crate::response::QueryResponse;
use crate::storage::ObjectStorageError;
use crate::utils::actix::extract_session_key_from_req;
use crate::utils::time::{TimeParseError, TimeRange};
use crate::utils::user_auth_for_datasets;

const TIME_ELAPSED_HEADER: &str = "p-time-elapsed";
/// Query Request through http endpoint.
#[derive(Debug, Deserialize, Serialize, Clone)]
#[serde(rename_all = "camelCase")]
Expand All @@ -62,6 +65,8 @@ pub struct Query {
#[serde(skip)]
pub fields: bool,
#[serde(skip)]
pub streaming: bool,
#[serde(skip)]
pub filter_tags: Option<Vec<String>>,
}

Expand All @@ -73,7 +78,6 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
{
Ok(raw_logical_plan) => raw_logical_plan,
Err(_) => {
//if logical plan creation fails, create streams and try again
create_streams_for_querier().await;
session_state
.create_logical_plan(&query_request.query)
Expand All @@ -83,10 +87,8 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
let time_range =
TimeRange::parse_human_time(&query_request.start_time, &query_request.end_time)?;

// Create a visitor to extract the table names present in query
let mut visitor = TableScanVisitor::default();
let _ = raw_logical_plan.visit(&mut visitor);

let tables = visitor.into_inner();
update_schema_when_distributed(&tables).await?;
let query: LogicalQuery = into_query(&query_request, &session_state, time_range).await?;
Expand All @@ -101,56 +103,154 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
user_auth_for_datasets(&permissions, &tables)?;

let time = Instant::now();
// Intercept `count(*)`` queries and use the counts API

if let Some(column_name) = query.is_logical_plan_count_without_filters() {
let counts_req = CountsRequest {
stream: table_name.clone(),
start_time: query_request.start_time.clone(),
end_time: query_request.end_time.clone(),
num_bins: 1,
};
let count_records = counts_req.get_bin_density().await?;
// NOTE: this should not panic, since there is atleast one bin, always
let count = count_records[0].count;
let response = if query_request.fields {
json!({
"fields": [&column_name],
"records": [json!({column_name: count})]
})
} else {
Value::Array(vec![json!({column_name: count})])
};
return handle_count_query(&query_request, &table_name, column_name, time).await;
}

let total_time = format!("{:?}", time.elapsed());
let time = time.elapsed().as_secs_f64();
if !query_request.streaming {
return handle_non_streaming_query(query, &table_name, &query_request, time).await;
}

QUERY_EXECUTE_TIME
.with_label_values(&[&table_name])
.observe(time);
handle_streaming_query(query, &table_name, &query_request, time).await
}

return Ok(HttpResponse::Ok()
.insert_header((TIME_ELAPSED_HEADER, total_time.as_str()))
.json(response));
}
async fn handle_count_query(
query_request: &Query,
table_name: &str,
column_name: &str,
time: Instant,
) -> Result<HttpResponse, QueryError> {
let counts_req = CountsRequest {
stream: table_name.to_string(),
start_time: query_request.start_time.clone(),
end_time: query_request.end_time.clone(),
num_bins: 1,
};
let count_records = counts_req.get_bin_density().await?;
let count = count_records[0].count;
let response = if query_request.fields {
json!({
"fields": [column_name],
"records": [json!({column_name: count})]
})
} else {
serde_json::Value::Array(vec![json!({column_name: count})])
};

let total_time = format!("{:?}", time.elapsed());
let time = time.elapsed().as_secs_f64();

let (records, fields) = execute(query, &table_name).await?;
QUERY_EXECUTE_TIME
.with_label_values(&[table_name])
.observe(time);

Ok(HttpResponse::Ok()
.insert_header((TIME_ELAPSED_HEADER, total_time.as_str()))
.json(response))
}

async fn handle_non_streaming_query(
query: LogicalQuery,
table_name: &str,
query_request: &Query,
time: Instant,
) -> Result<HttpResponse, QueryError> {
let (records, fields) = execute(query, table_name).await?;
let total_time = format!("{:?}", time.elapsed());
let time = time.elapsed().as_secs_f64();

QUERY_EXECUTE_TIME
.with_label_values(&[table_name])
.observe(time);
let response = QueryResponse {
records,
fields,
fill_null: query_request.send_null,
with_fields: query_request.fields,
total_time,
}
.to_http()?;
.to_json()?;
Ok(HttpResponse::Ok()
.insert_header((TIME_ELAPSED_HEADER, total_time.as_str()))
.json(response))
}

async fn handle_streaming_query(
query: LogicalQuery,
table_name: &str,
query_request: &Query,
time: Instant,
) -> Result<HttpResponse, QueryError> {
let (records_stream, fields) = execute_stream(query, table_name).await?;
let fields = fields.clone();
let total_time = format!("{:?}", time.elapsed());
let time = time.elapsed().as_secs_f64();

QUERY_EXECUTE_TIME
.with_label_values(&[&table_name])
.with_label_values(&[table_name])
.observe(time);

Ok(response)
let send_null = query_request.send_null;
let with_fields = query_request.fields;

let stream = if with_fields {
// send the fields as an initial chunk
let fields_json = serde_json::json!({
"fields": fields
})
.to_string();

// stream the records without fields
let records_stream = records_stream.map(move |batch_result| match batch_result {
Ok(batch) => {
let response = QueryResponse {
records: vec![batch],
fields: Vec::new(),
fill_null: send_null,
with_fields: false,
}
.to_json()
.unwrap_or_else(|e| {
error!("Failed to parse record batch into JSON: {}", e);
json!({})
});
Ok(Bytes::from(format!("{}\n", response)))
}
Err(e) => Err(actix_web::error::ErrorInternalServerError(e)),
});

// Combine the initial fields chunk with the records stream
let fields_chunk = once(future::ok::<_, actix_web::Error>(Bytes::from(format!(
"{}\n",
fields_json
))));
Box::pin(fields_chunk.chain(records_stream))
as Pin<Box<dyn Stream<Item = Result<Bytes, actix_web::Error>>>>
} else {
let stream = records_stream.map(move |batch_result| match batch_result {
Ok(batch) => {
let response = QueryResponse {
records: vec![batch],
fields: fields.clone(),
fill_null: send_null,
with_fields,
}
.to_json()
.unwrap_or_else(|e| {
error!("Failed to parse record batch into JSON: {}", e);
json!({})
});
Ok(Bytes::from(format!("{}\n", response)))
}
Err(e) => Err(actix_web::error::ErrorInternalServerError(e)),
});

Box::pin(stream) as Pin<Box<dyn Stream<Item = Result<Bytes, actix_web::Error>>>>
};

Ok(HttpResponse::Ok()
.content_type("application/x-ndjson")
.insert_header((TIME_ELAPSED_HEADER, total_time.as_str()))
.streaming(stream))
}

pub async fn get_counts(
Expand Down Expand Up @@ -222,6 +322,10 @@ impl FromRequest for Query {
query.send_null = params.get("sendNull").cloned().unwrap_or(false);
}

if !query.streaming {
query.streaming = params.get("streaming").cloned().unwrap_or(false);
}

Ok(query)
};

Expand Down Expand Up @@ -285,6 +389,7 @@ fn transform_query_for_ingestor(query: &Query) -> Option<Query> {
send_null: query.send_null,
start_time: start_time.to_rfc3339(),
end_time: end_time.to_rfc3339(),
streaming: query.streaming,
};

Some(q)
Expand Down
18 changes: 10 additions & 8 deletions src/handlers/livetail.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,10 @@ impl FlightService for FlightServiceImpl {
}

async fn do_get(&self, req: Request<Ticket>) -> Result<Response<Self::DoGetStream>, Status> {
let key = extract_session_key(req.metadata())?;
let key = extract_session_key(req.metadata()).map_err(|e| *e)?;
let ticket: serde_json::Value = serde_json::from_slice(&req.into_inner().ticket)
.map_err(|err| Status::internal(err.to_string()))?;
let stream = extract_stream(&ticket)?;
let stream = extract_stream(&ticket).map_err(|e| *e)?;
info!("livetail requested for stream {}", stream);
match Users.authorize(key, rbac::role::Action::Query, Some(stream), None) {
rbac::Response::Authorized => (),
Expand Down Expand Up @@ -232,16 +232,16 @@ pub fn server() -> impl Future<Output = Result<(), Box<dyn std::error::Error + S
}
}

pub fn extract_stream(body: &serde_json::Value) -> Result<&str, Status> {
pub fn extract_stream(body: &serde_json::Value) -> Result<&str, Box<Status>> {
body.as_object()
.ok_or(Status::invalid_argument("expected object in request body"))?
.ok_or_else(|| Box::new(Status::invalid_argument("expected object in request body")))?
.get("stream")
.ok_or(Status::invalid_argument("stream key value is not provided"))?
.ok_or_else(|| Box::new(Status::invalid_argument("stream key value is not provided")))?
.as_str()
.ok_or(Status::invalid_argument("stream key value is invalid"))
.ok_or_else(|| Box::new(Status::invalid_argument("stream key value is invalid")))
}

pub fn extract_session_key(headers: &MetadataMap) -> Result<SessionKey, Status> {
pub fn extract_session_key(headers: &MetadataMap) -> Result<SessionKey, Box<Status>> {
// Extract username and password from the request using basic auth extractor.
let basic = extract_basic_auth(headers).map(|creds| SessionKey::BasicAuth {
username: creds.user_id,
Expand All @@ -261,7 +261,9 @@ pub fn extract_session_key(headers: &MetadataMap) -> Result<SessionKey, Status>
return Ok(SessionKey::SessionId(session));
}

Err(Status::unauthenticated("No authentication method supplied"))
Err(Box::new(Status::unauthenticated(
"No authentication method supplied",
)))
}

fn extract_basic_auth(header: &MetadataMap) -> Option<Credentials> {
Expand Down
Loading
Loading