Skip to content

Commit 52980c4

Browse files
streaming response
1 parent 749a16f commit 52980c4

File tree

4 files changed

+65
-21
lines changed

4 files changed

+65
-21
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ static-files = "0.2"
121121
thiserror = "2.0"
122122
ulid = { version = "1.0", features = ["serde"] }
123123
xxhash-rust = { version = "0.8", features = ["xxh3"] }
124+
futures-core = "0.3.31"
124125

125126
[build-dependencies]
126127
cargo_toml = "0.21"

src/handlers/http/query.rs

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919
use actix_web::http::header::ContentType;
2020
use actix_web::web::{self, Json};
2121
use actix_web::{FromRequest, HttpRequest, HttpResponse, Responder};
22+
use bytes::Bytes;
2223
use chrono::{DateTime, Utc};
2324
use datafusion::common::tree_node::TreeNode;
2425
use datafusion::error::DataFusionError;
2526
use datafusion::execution::context::SessionState;
27+
use futures::StreamExt;
2628
use futures_util::Future;
2729
use http::StatusCode;
2830
use serde::{Deserialize, Serialize};
@@ -41,15 +43,16 @@ use crate::metrics::QUERY_EXECUTE_TIME;
4143
use crate::option::Mode;
4244
use crate::parseable::{StreamNotFound, PARSEABLE};
4345
use crate::query::error::ExecuteError;
44-
use crate::query::{execute, CountsRequest, CountsResponse, Query as LogicalQuery};
46+
use crate::query::{execute_stream, CountsRequest, CountsResponse, Query as LogicalQuery};
4547
use crate::query::{TableScanVisitor, QUERY_SESSION};
4648
use crate::rbac::Users;
47-
use crate::response::{QueryResponse, TIME_ELAPSED_HEADER};
49+
use crate::response::TIME_ELAPSED_HEADER;
4850
use crate::storage::ObjectStorageError;
4951
use crate::utils::actix::extract_session_key_from_req;
52+
use crate::utils::arrow::record_batches_to_json;
5053
use crate::utils::time::{TimeParseError, TimeRange};
5154
use crate::utils::user_auth_for_datasets;
52-
55+
use futures_core::Stream as CoreStream;
5356
/// Query Request through http endpoint.
5457
#[derive(Debug, Deserialize, Serialize, Clone)]
5558
#[serde(rename_all = "camelCase")]
@@ -132,25 +135,33 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
132135
.insert_header((TIME_ELAPSED_HEADER, total_time.as_str()))
133136
.json(response));
134137
}
138+
let (records_stream, fields) = execute_stream(query, &table_name).await?;
139+
let fields = fields.clone();
140+
let stream = records_stream.map(move |batch_result| {
141+
match batch_result {
142+
Ok(batch) => {
143+
// convert record batch to JSON
144+
let json = record_batches_to_json(&[batch])
145+
.map_err(actix_web::error::ErrorInternalServerError)?;
146+
// // Serialize to JSON string
147+
// let json = serde_json::to_value(&json)
148+
// .map_err(actix_web::error::ErrorInternalServerError)?;
149+
let response = json!({
150+
"fields": fields,
151+
"records": json,
152+
});
153+
Ok(Bytes::from(response.to_string()))
154+
}
155+
Err(e) => Err(actix_web::error::ErrorInternalServerError(e)),
156+
}
157+
});
135158

136-
let (records, fields) = execute(query, &table_name).await?;
137-
let total_time = format!("{:?}", time.elapsed());
138-
let response = QueryResponse {
139-
records,
140-
fields,
141-
fill_null: query_request.send_null,
142-
with_fields: query_request.fields,
143-
total_time,
144-
}
145-
.to_http()?;
146-
147-
let time = time.elapsed().as_secs_f64();
148-
149-
QUERY_EXECUTE_TIME
150-
.with_label_values(&[&table_name])
151-
.observe(time);
159+
let boxed_stream =
160+
Box::pin(stream) as Pin<Box<dyn CoreStream<Item = Result<Bytes, actix_web::Error>> + Send>>;
152161

153-
Ok(response)
162+
Ok(HttpResponse::Ok()
163+
.content_type("application/json")
164+
.streaming(boxed_stream))
154165
}
155166

156167
pub async fn get_counts(

src/query/mod.rs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use datafusion::arrow::record_batch::RecordBatch;
2727
use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor};
2828
use datafusion::error::DataFusionError;
2929
use datafusion::execution::disk_manager::DiskManagerConfig;
30-
use datafusion::execution::SessionStateBuilder;
30+
use datafusion::execution::{SendableRecordBatchStream, SessionStateBuilder};
3131
use datafusion::logical_expr::expr::Alias;
3232
use datafusion::logical_expr::{
3333
Aggregate, Explain, Filter, LogicalPlan, PlanType, Projection, ToStringifiedPlan,
@@ -78,6 +78,17 @@ pub async fn execute(
7878
.expect("The Join should have been successful")
7979
}
8080

81+
pub async fn execute_stream(
82+
query: Query,
83+
stream_name: &str,
84+
) -> Result<(SendableRecordBatchStream, Vec<String>), ExecuteError> {
85+
let time_partition = PARSEABLE.get_stream(stream_name)?.get_time_partition();
86+
QUERY_RUNTIME
87+
.spawn(async move { query.execute_stream(time_partition.as_ref()).await })
88+
.await
89+
.expect("The Join should have been successful")
90+
}
91+
8192
// A query request by client
8293
#[derive(Debug)]
8394
pub struct Query {
@@ -182,6 +193,26 @@ impl Query {
182193
Ok((results, fields))
183194
}
184195

196+
// execute stream
197+
pub async fn execute_stream(
198+
&self,
199+
time_partition: Option<&String>,
200+
) -> Result<(SendableRecordBatchStream, Vec<String>), ExecuteError> {
201+
let df = QUERY_SESSION
202+
.execute_logical_plan(self.final_logical_plan(time_partition))
203+
.await?;
204+
let fields = df
205+
.schema()
206+
.fields()
207+
.iter()
208+
.map(|f| f.name())
209+
.cloned()
210+
.collect_vec();
211+
let stream = df.execute_stream().await?;
212+
213+
Ok((stream, fields))
214+
}
215+
185216
pub async fn get_dataframe(
186217
&self,
187218
time_partition: Option<&String>,

0 commit comments

Comments
 (0)