Skip to content

Commit c2c0b60

Browse files
committed
Updates for counts API
counts API will also work with applied filters
1 parent 970a5a5 commit c2c0b60

File tree

4 files changed

+118
-9
lines changed

4 files changed

+118
-9
lines changed

src/alerts/alerts_utils.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,33 @@ fn get_filter_expr(where_clause: &Conditions) -> Expr {
339339
}
340340
}
341341

342+
pub fn get_filter_string(where_clause: &Conditions) -> Result<String, String> {
343+
match &where_clause.operator {
344+
Some(op) => match op {
345+
LogicalOperator::And => {
346+
let mut exprs = vec![];
347+
for condition in &where_clause.condition_config {
348+
let value = NumberOrString::from_string(condition.value.to_string());
349+
match value {
350+
NumberOrString::Number(val) => exprs.push(format!(
351+
"{} {} {}",
352+
condition.column, condition.operator, val
353+
)),
354+
NumberOrString::String(val) => exprs.push(format!(
355+
"{} {} '{}'",
356+
condition.column, condition.operator, val
357+
)),
358+
}
359+
}
360+
361+
Ok(exprs.join(" AND "))
362+
}
363+
_ => Err(String::from("Invalid option 'or'")),
364+
},
365+
_ => Err(String::from("Invalid option 'null'")),
366+
}
367+
}
368+
342369
fn match_alert_operator(expr: &ConditionConfig) -> Expr {
343370
// the form accepts value as a string
344371
// if it can be parsed as a number, then parse it

src/handlers/http/query.rs

Lines changed: 48 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
use crate::event::error::EventError;
2020
use crate::handlers::http::fetch_schema;
21+
use crate::utils::arrow::record_batches_to_json;
2122
use actix_web::http::header::ContentType;
2223
use actix_web::web::{self, Json};
2324
use actix_web::{Either, FromRequest, HttpRequest, HttpResponse, Responder};
@@ -31,8 +32,9 @@ use futures::stream::once;
3132
use futures::{future, Stream, StreamExt};
3233
use futures_util::Future;
3334
use http::StatusCode;
35+
use itertools::Itertools;
3436
use serde::{Deserialize, Serialize};
35-
use serde_json::json;
37+
use serde_json::{json, Value};
3638
use std::collections::HashMap;
3739
use std::pin::Pin;
3840
use std::sync::Arc;
@@ -44,7 +46,7 @@ use crate::metrics::QUERY_EXECUTE_TIME;
4446
use crate::option::Mode;
4547
use crate::parseable::{StreamNotFound, PARSEABLE};
4648
use crate::query::error::ExecuteError;
47-
use crate::query::{execute, CountsRequest, CountsResponse, Query as LogicalQuery};
49+
use crate::query::{execute, CountsRequest, Query as LogicalQuery};
4850
use crate::query::{TableScanVisitor, QUERY_SESSION};
4951
use crate::rbac::Users;
5052
use crate::response::QueryResponse;
@@ -193,6 +195,7 @@ async fn handle_count_query(
193195
start_time: query_request.start_time.clone(),
194196
end_time: query_request.end_time.clone(),
195197
num_bins: 1,
198+
conditions: None,
196199
};
197200
let count_records = counts_req.get_bin_density().await?;
198201
let count = count_records[0].count;
@@ -358,22 +361,59 @@ fn create_batch_processor(
358361
Err(e) => Err(actix_web::error::ErrorInternalServerError(e)),
359362
}
360363
}
364+
361365
pub async fn get_counts(
362366
req: HttpRequest,
363367
counts_request: Json<CountsRequest>,
364368
) -> Result<impl Responder, QueryError> {
365369
let creds = extract_session_key_from_req(&req)?;
366370
let permissions = Users.get_permissions(&creds);
367371

372+
let body = counts_request.into_inner();
373+
368374
// does user have access to table?
369-
user_auth_for_datasets(&permissions, &[counts_request.stream.clone()])?;
375+
user_auth_for_datasets(&permissions, &[body.stream.clone()])?;
376+
377+
// if the user has given a sql query (counts call with filters applied), then use this flow
378+
// this could include filters or group by
379+
if body.conditions.is_some() {
380+
let sql = body.get_df_sql().await?;
381+
382+
let query_request = Query {
383+
query: sql,
384+
start_time: body.start_time,
385+
end_time: body.end_time,
386+
send_null: true,
387+
fields: true,
388+
streaming: false,
389+
filter_tags: None,
390+
};
370391

371-
let records = counts_request.get_bin_density().await?;
392+
let (records, _) = get_records_and_fields(&query_request, &req).await?;
372393

373-
Ok(web::Json(CountsResponse {
374-
fields: vec!["start_time".into(), "end_time".into(), "count".into()],
375-
records,
376-
}))
394+
if let Some(records) = records {
395+
let json_records = record_batches_to_json(&records)?;
396+
let records = json_records.into_iter().map(Value::Object).collect_vec();
397+
398+
let res = json!({
399+
"fields": vec!["start_time", "end_time", "count"],
400+
"records": records,
401+
});
402+
403+
return Ok(web::Json(res));
404+
} else {
405+
return Err(QueryError::CustomError(
406+
"No data returned for counts SQL".into(),
407+
));
408+
}
409+
}
410+
411+
let records = body.get_bin_density().await?;
412+
let res = json!({
413+
"fields": vec!["start_time", "end_time", "count"],
414+
"records": records,
415+
});
416+
Ok(web::Json(res))
377417
}
378418

379419
pub async fn update_schema_when_distributed(tables: &Vec<String>) -> Result<(), EventError> {

src/prism/logstream/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,7 @@ impl PrismDatasetRequest {
349349
start_time: "1h".to_owned(),
350350
end_time: "now".to_owned(),
351351
num_bins: 10,
352+
conditions: None,
352353
};
353354

354355
let records = count_request.get_bin_density().await?;

src/query/mod.rs

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ use tokio::runtime::Runtime;
4747
use self::error::ExecuteError;
4848
use self::stream_schema_provider::GlobalSchemaProvider;
4949
pub use self::stream_schema_provider::PartialTimeFilter;
50+
use crate::alerts::alerts_utils::get_filter_string;
51+
use crate::alerts::Conditions;
5052
use crate::catalog::column::{Int64Type, TypedStatistics};
5153
use crate::catalog::manifest::Manifest;
5254
use crate::catalog::snapshot::Snapshot;
@@ -297,7 +299,7 @@ impl Query {
297299
}
298300

299301
/// Record of counts for a given time bin.
300-
#[derive(Debug, Serialize, Clone)]
302+
#[derive(Debug, Serialize, Clone, Deserialize)]
301303
pub struct CountsRecord {
302304
/// Start time of the bin
303305
pub start_time: String,
@@ -312,6 +314,15 @@ struct TimeBounds {
312314
end: DateTime<Utc>,
313315
}
314316

317+
#[derive(Debug, Deserialize, Clone)]
318+
#[serde(rename_all = "camelCase")]
319+
pub struct CountConditions {
320+
/// Optional conditions for filters
321+
pub conditions: Option<Conditions>,
322+
/// GroupBy columns
323+
pub group_by: Option<Vec<String>>,
324+
}
325+
315326
/// Request for counts, received from API/SQL query.
316327
#[derive(Debug, Deserialize, Clone)]
317328
#[serde(rename_all = "camelCase")]
@@ -324,6 +335,8 @@ pub struct CountsRequest {
324335
pub end_time: String,
325336
/// Number of bins to divide the time range into
326337
pub num_bins: u64,
338+
/// Conditions
339+
pub conditions: Option<CountConditions>,
327340
}
328341

329342
impl CountsRequest {
@@ -429,6 +442,34 @@ impl CountsRequest {
429442

430443
bounds
431444
}
445+
446+
/// This function will get executed only if self.conditions is some
447+
pub async fn get_df_sql(&self) -> Result<String, QueryError> {
448+
let count_conditions = self.conditions.as_ref().unwrap();
449+
450+
let time_range = TimeRange::parse_human_time(&self.start_time, &self.end_time)?;
451+
452+
let dur = time_range.end.signed_duration_since(time_range.start);
453+
454+
let date_bin = if dur.num_minutes() <= 60 * 10 {
455+
// date_bin 1 minute
456+
format!("CAST(DATE_BIN('1 minute', {}.p_timestamp, TIMESTAMP '2025-01-01T00:00:00.000Z') AS TEXT) as start_time, DATE_BIN('1 minute', p_timestamp, TIMESTAMP '2025-01-01T00:00:00.000Z') + INTERVAL '1 minute' as end_time", self.stream)
457+
} else if dur.num_minutes() > 60 * 10 && dur.num_minutes() < 60 * 240 {
458+
// date_bin 1 hour
459+
String::from("CAST(DATE_BIN('1 hour', p_timestamp, TIMESTAMP '2025-01-01T00:00:00.000Z') AS TEXT) as start_time, DATE_BIN('1 hour', p_timestamp, TIMESTAMP '2025-01-01T00:00:00.000Z') + INTERVAL '1 hour' as end_time")
460+
} else {
461+
// date_bin 1 day
462+
String::from("CAST(DATE_BIN('1 day', p_timestamp, TIMESTAMP '2025-01-01T00:00:00.000Z') AS TEXT) as start_time, DATE_BIN('1 day', p_timestamp, TIMESTAMP '2025-01-01T00:00:00.000Z') + INTERVAL '1 day' as end_time")
463+
};
464+
465+
let query = if let Some(conditions) = &count_conditions.conditions {
466+
let f = get_filter_string(conditions).map_err(QueryError::CustomError)?;
467+
format!("SELECT {date_bin}, COUNT(*) as count FROM {} WHERE {} GROUP BY end_time,start_time ORDER BY end_time",self.stream,f)
468+
} else {
469+
format!("SELECT {date_bin}, COUNT(*) as count FROM {} GROUP BY end_time,start_time ORDER BY end_time",self.stream)
470+
};
471+
Ok(query)
472+
}
432473
}
433474

434475
/// Response for the counts API

0 commit comments

Comments
 (0)