Skip to content

Commit 0d0dae0

Browse files
send fields as first chunk for streaming response
1 parent 2f00ad5 commit 0d0dae0

File tree

2 files changed

+63
-28
lines changed

2 files changed

+63
-28
lines changed

src/handlers/http/query.rs

Lines changed: 62 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*
1717
*/
1818

19+
use crate::event::error::EventError;
20+
use crate::handlers::http::fetch_schema;
1921
use actix_web::http::header::ContentType;
2022
use actix_web::web::{self, Json};
2123
use actix_web::{FromRequest, HttpRequest, HttpResponse, Responder};
@@ -24,7 +26,8 @@ use chrono::{DateTime, Utc};
2426
use datafusion::common::tree_node::TreeNode;
2527
use datafusion::error::DataFusionError;
2628
use datafusion::execution::context::SessionState;
27-
use futures::StreamExt;
29+
use futures::stream::once;
30+
use futures::{future, Stream, StreamExt};
2831
use futures_util::Future;
2932
use http::StatusCode;
3033
use serde::{Deserialize, Serialize};
@@ -35,9 +38,6 @@ use std::sync::Arc;
3538
use std::time::Instant;
3639
use tracing::error;
3740

38-
use crate::event::error::EventError;
39-
use crate::handlers::http::fetch_schema;
40-
4141
use crate::event::commit_schema;
4242
use crate::metrics::QUERY_EXECUTE_TIME;
4343
use crate::option::Mode;
@@ -46,11 +46,13 @@ use crate::query::error::ExecuteError;
4646
use crate::query::{execute, execute_stream, CountsRequest, CountsResponse, Query as LogicalQuery};
4747
use crate::query::{TableScanVisitor, QUERY_SESSION};
4848
use crate::rbac::Users;
49-
use crate::response::{QueryResponse, TIME_ELAPSED_HEADER};
49+
use crate::response::QueryResponse;
5050
use crate::storage::ObjectStorageError;
5151
use crate::utils::actix::extract_session_key_from_req;
5252
use crate::utils::time::{TimeParseError, TimeRange};
5353
use crate::utils::user_auth_for_datasets;
54+
55+
const TIME_ELAPSED_HEADER: &str = "p-time-elapsed";
5456
/// Query Request through http endpoint.
5557
#[derive(Debug, Deserialize, Serialize, Clone)]
5658
#[serde(rename_all = "camelCase")]
@@ -167,7 +169,7 @@ async fn handle_non_streaming_query(
167169
fill_null: query_request.send_null,
168170
with_fields: query_request.fields,
169171
}
170-
.to_http()?;
172+
.to_json()?;
171173
Ok(HttpResponse::Ok()
172174
.insert_header((TIME_ELAPSED_HEADER, total_time.as_str()))
173175
.json(response))
@@ -190,30 +192,65 @@ async fn handle_streaming_query(
190192
let send_null = query_request.send_null;
191193
let with_fields = query_request.fields;
192194

193-
let stream = records_stream.map(move |batch_result| match batch_result {
194-
Ok(batch) => {
195-
let response = QueryResponse {
196-
records: vec![batch],
197-
fields: fields.clone(),
198-
fill_null: send_null,
199-
with_fields,
195+
let stream = if with_fields {
196+
// send the fields as an initial chunk
197+
let fields_json = serde_json::json!({
198+
"fields": fields
199+
})
200+
.to_string();
201+
202+
// stream the records without fields
203+
let records_stream = records_stream.map(move |batch_result| match batch_result {
204+
Ok(batch) => {
205+
let response = QueryResponse {
206+
records: vec![batch],
207+
fields: Vec::new(),
208+
fill_null: send_null,
209+
with_fields: false,
210+
}
211+
.to_json()
212+
.unwrap_or_else(|e| {
213+
error!("Failed to parse record batch into JSON: {}", e);
214+
json!({})
215+
});
216+
Ok(Bytes::from(format!("{}\n", response)))
200217
}
201-
.to_http()
202-
.unwrap_or_else(|e| {
203-
error!("Failed to parse record batch into JSON: {}", e);
204-
json!({})
205-
});
206-
Ok(Bytes::from(format!("{}\n", response)))
207-
}
208-
Err(e) => Err(actix_web::error::ErrorInternalServerError(e)),
209-
});
218+
Err(e) => Err(actix_web::error::ErrorInternalServerError(e)),
219+
});
220+
221+
// Combine the initial fields chunk with the records stream
222+
let fields_chunk = once(future::ok::<_, actix_web::Error>(Bytes::from(format!(
223+
"{}\n",
224+
fields_json
225+
))));
226+
Box::pin(fields_chunk.chain(records_stream))
227+
as Pin<Box<dyn Stream<Item = Result<Bytes, actix_web::Error>>>>
228+
} else {
229+
let stream = records_stream.map(move |batch_result| match batch_result {
230+
Ok(batch) => {
231+
let response = QueryResponse {
232+
records: vec![batch],
233+
fields: fields.clone(),
234+
fill_null: send_null,
235+
with_fields,
236+
}
237+
.to_json()
238+
.unwrap_or_else(|e| {
239+
error!("Failed to parse record batch into JSON: {}", e);
240+
json!({})
241+
});
242+
Ok(Bytes::from(format!("{}\n", response)))
243+
}
244+
Err(e) => Err(actix_web::error::ErrorInternalServerError(e)),
245+
});
210246

211-
let boxed_stream = Box::pin(stream);
247+
Box::pin(stream) as Pin<Box<dyn Stream<Item = Result<Bytes, actix_web::Error>>>>
248+
};
212249

213250
Ok(HttpResponse::Ok()
214-
.content_type("application/json")
251+
.content_type("application/x-ndjson")
215252
.insert_header((TIME_ELAPSED_HEADER, total_time.as_str()))
216-
.streaming(boxed_stream))
253+
.streaming(stream))
217254
}
218255

219256
pub async fn get_counts(

src/response.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ use itertools::Itertools;
2222
use serde_json::{json, Value};
2323
use tracing::info;
2424

25-
pub const TIME_ELAPSED_HEADER: &str = "p-time-elapsed";
26-
2725
pub struct QueryResponse {
2826
pub records: Vec<RecordBatch>,
2927
pub fields: Vec<String>,
@@ -32,7 +30,7 @@ pub struct QueryResponse {
3230
}
3331

3432
impl QueryResponse {
35-
pub fn to_http(&self) -> Result<Value, QueryError> {
33+
pub fn to_json(&self) -> Result<Value, QueryError> {
3634
info!("{}", "Returning query results");
3735
let mut json_records = record_batches_to_json(&self.records)?;
3836

0 commit comments

Comments
 (0)