Skip to content

Commit ed4ac71

Browse files
authored
chore: updates for counts API (#1347)
counts API will also work with applied filters
1 parent 5de56e5 commit ed4ac71

File tree

5 files changed

+231
-57
lines changed

5 files changed

+231
-57
lines changed

src/alerts/alerts_utils.rs

Lines changed: 98 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*
1717
*/
1818

19+
use std::fmt::Display;
20+
1921
use arrow_array::{Float64Array, Int64Array, RecordBatch};
2022
use datafusion::{
2123
functions_aggregate::{
@@ -339,46 +341,97 @@ fn get_filter_expr(where_clause: &Conditions) -> Expr {
339341
}
340342
}
341343

344+
pub fn get_filter_string(where_clause: &Conditions) -> Result<String, String> {
345+
match &where_clause.operator {
346+
Some(op) => match op {
347+
LogicalOperator::And => {
348+
let mut exprs = vec![];
349+
for condition in &where_clause.condition_config {
350+
if let Some(value) = &condition.value {
351+
// ad-hoc error check in case value is some and operator is either `is null` or `is not null`
352+
if condition.operator.eq(&WhereConfigOperator::IsNull)
353+
|| condition.operator.eq(&WhereConfigOperator::IsNotNull)
354+
{
355+
return Err("value must be null when operator is either `is null` or `is not null`"
356+
.into());
357+
}
358+
let value = NumberOrString::from_string(value.to_owned());
359+
match value {
360+
NumberOrString::Number(val) => exprs.push(format!(
361+
"\"{}\" {} {}",
362+
condition.column, condition.operator, val
363+
)),
364+
NumberOrString::String(val) => exprs.push(format!(
365+
"\"{}\" {} '{}'",
366+
condition.column,
367+
condition.operator,
368+
val.replace('\'', "''")
369+
)),
370+
}
371+
} else {
372+
exprs.push(format!("\"{}\" {}", condition.column, condition.operator))
373+
}
374+
}
375+
376+
Ok(exprs.join(" AND "))
377+
}
378+
_ => Err(String::from("Invalid option 'or', only 'and' is supported")),
379+
},
380+
_ => Err(String::from(
381+
"Invalid option 'null', only 'and' is supported",
382+
)),
383+
}
384+
}
385+
342386
fn match_alert_operator(expr: &ConditionConfig) -> Expr {
343387
// the form accepts value as a string
344388
// if it can be parsed as a number, then parse it
345389
// else keep it as a string
346-
let value = NumberOrString::from_string(expr.value.clone());
347-
348-
// for maintaining column case
349-
let column = format!(r#""{}""#, expr.column);
350-
match expr.operator {
351-
WhereConfigOperator::Equal => col(column).eq(lit(value)),
352-
WhereConfigOperator::NotEqual => col(column).not_eq(lit(value)),
353-
WhereConfigOperator::LessThan => col(column).lt(lit(value)),
354-
WhereConfigOperator::GreaterThan => col(column).gt(lit(value)),
355-
WhereConfigOperator::LessThanOrEqual => col(column).lt_eq(lit(value)),
356-
WhereConfigOperator::GreaterThanOrEqual => col(column).gt_eq(lit(value)),
357-
WhereConfigOperator::IsNull => col(column).is_null(),
358-
WhereConfigOperator::IsNotNull => col(column).is_not_null(),
359-
WhereConfigOperator::ILike => col(column).ilike(lit(&expr.value)),
360-
WhereConfigOperator::Contains => col(column).like(lit(&expr.value)),
361-
WhereConfigOperator::BeginsWith => Expr::BinaryExpr(BinaryExpr::new(
362-
Box::new(col(column)),
363-
Operator::RegexIMatch,
364-
Box::new(lit(format!("^{}", expr.value))),
365-
)),
366-
WhereConfigOperator::EndsWith => Expr::BinaryExpr(BinaryExpr::new(
367-
Box::new(col(column)),
368-
Operator::RegexIMatch,
369-
Box::new(lit(format!("{}$", expr.value))),
370-
)),
371-
WhereConfigOperator::DoesNotContain => col(column).not_ilike(lit(&expr.value)),
372-
WhereConfigOperator::DoesNotBeginWith => Expr::BinaryExpr(BinaryExpr::new(
373-
Box::new(col(column)),
374-
Operator::RegexNotIMatch,
375-
Box::new(lit(format!("^{}", expr.value))),
376-
)),
377-
WhereConfigOperator::DoesNotEndWith => Expr::BinaryExpr(BinaryExpr::new(
378-
Box::new(col(column)),
379-
Operator::RegexNotIMatch,
380-
Box::new(lit(format!("{}$", expr.value))),
381-
)),
390+
if let Some(value) = &expr.value {
391+
let value = NumberOrString::from_string(value.clone());
392+
393+
// for maintaining column case
394+
let column = format!(r#""{}""#, expr.column);
395+
match expr.operator {
396+
WhereConfigOperator::Equal => col(column).eq(lit(value)),
397+
WhereConfigOperator::NotEqual => col(column).not_eq(lit(value)),
398+
WhereConfigOperator::LessThan => col(column).lt(lit(value)),
399+
WhereConfigOperator::GreaterThan => col(column).gt(lit(value)),
400+
WhereConfigOperator::LessThanOrEqual => col(column).lt_eq(lit(value)),
401+
WhereConfigOperator::GreaterThanOrEqual => col(column).gt_eq(lit(value)),
402+
WhereConfigOperator::ILike => col(column).ilike(lit(value)),
403+
WhereConfigOperator::Contains => col(column).like(lit(value)),
404+
WhereConfigOperator::BeginsWith => Expr::BinaryExpr(BinaryExpr::new(
405+
Box::new(col(column)),
406+
Operator::RegexIMatch,
407+
Box::new(lit(format!("^{}", value))),
408+
)),
409+
WhereConfigOperator::EndsWith => Expr::BinaryExpr(BinaryExpr::new(
410+
Box::new(col(column)),
411+
Operator::RegexIMatch,
412+
Box::new(lit(format!("{}$", value))),
413+
)),
414+
WhereConfigOperator::DoesNotContain => col(column).not_ilike(lit(value)),
415+
WhereConfigOperator::DoesNotBeginWith => Expr::BinaryExpr(BinaryExpr::new(
416+
Box::new(col(column)),
417+
Operator::RegexNotIMatch,
418+
Box::new(lit(format!("^{}", value))),
419+
)),
420+
WhereConfigOperator::DoesNotEndWith => Expr::BinaryExpr(BinaryExpr::new(
421+
Box::new(col(column)),
422+
Operator::RegexNotIMatch,
423+
Box::new(lit(format!("{}$", value))),
424+
)),
425+
_ => unreachable!("value must not be null for operators other than `is null` and `is not null`. Should've been caught in validation")
426+
}
427+
} else {
428+
// for maintaining column case
429+
let column = format!(r#""{}""#, expr.column);
430+
match expr.operator {
431+
WhereConfigOperator::IsNull => col(column).is_null(),
432+
WhereConfigOperator::IsNotNull => col(column).is_not_null(),
433+
_ => unreachable!("value must be null for `is null` and `is not null`. Should've been caught in validation")
434+
}
382435
}
383436
}
384437

@@ -417,3 +470,12 @@ impl NumberOrString {
417470
}
418471
}
419472
}
473+
474+
impl Display for NumberOrString {
475+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
476+
match self {
477+
NumberOrString::Number(v) => write!(f, "{v}"),
478+
NumberOrString::String(v) => write!(f, "{v}"),
479+
}
480+
}
481+
}

src/alerts/mod.rs

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ impl Display for AlertOperator {
220220
}
221221
}
222222

223-
#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, FromStr)]
223+
#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, FromStr, PartialEq, Eq)]
224224
#[serde(rename_all = "camelCase")]
225225
pub enum WhereConfigOperator {
226226
#[serde(rename = "=")]
@@ -326,7 +326,7 @@ pub struct FilterConfig {
326326
pub struct ConditionConfig {
327327
pub column: String,
328328
pub operator: WhereConfigOperator,
329-
pub value: String,
329+
pub value: Option<String>,
330330
}
331331

332332
#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)]
@@ -343,20 +343,28 @@ impl Conditions {
343343
LogicalOperator::And | LogicalOperator::Or => {
344344
let expr1 = &self.condition_config[0];
345345
let expr2 = &self.condition_config[1];
346-
format!(
347-
"[{} {} {} {op} {} {} {}]",
348-
expr1.column,
349-
expr1.operator,
350-
expr1.value,
351-
expr2.column,
352-
expr2.operator,
353-
expr2.value
354-
)
346+
let expr1_msg = if let Some(val) = &expr1.value {
347+
format!("{} {} {}", expr1.column, expr1.operator, val)
348+
} else {
349+
format!("{} {}", expr1.column, expr1.operator)
350+
};
351+
352+
let expr2_msg = if let Some(val) = &expr2.value {
353+
format!("{} {} {}", expr2.column, expr2.operator, val)
354+
} else {
355+
format!("{} {}", expr2.column, expr2.operator)
356+
};
357+
358+
format!("[{} {op} {}]", expr1_msg, expr2_msg)
355359
}
356360
},
357361
None => {
358362
let expr = &self.condition_config[0];
359-
format!("[{} {} {}]", expr.column, expr.operator, expr.value)
363+
if let Some(val) = &expr.value {
364+
format!("{} {} {}", expr.column, expr.operator, val)
365+
} else {
366+
format!("{} {}", expr.column, expr.operator)
367+
}
360368
}
361369
}
362370
}
@@ -645,6 +653,27 @@ impl AlertConfig {
645653
}
646654
}
647655
}
656+
657+
// validate that the value should be None in case of `is null` and `is not null`
658+
for condition in config.condition_config.iter() {
659+
let needs_no_value = matches!(
660+
condition.operator,
661+
WhereConfigOperator::IsNull | WhereConfigOperator::IsNotNull
662+
);
663+
664+
if needs_no_value && condition.value.is_some() {
665+
return Err(AlertError::CustomError(
666+
"value must be null when operator is either `is null` or `is not null`"
667+
.into(),
668+
));
669+
}
670+
if !needs_no_value && condition.value.as_ref().is_none_or(|v| v.trim().is_empty()) {
671+
return Err(AlertError::CustomError(
672+
"value must not be null when operator is neither `is null` nor `is not null`"
673+
.into(),
674+
));
675+
}
676+
}
648677
Ok(())
649678
}
650679

src/handlers/http/query.rs

Lines changed: 48 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
use crate::event::error::EventError;
2020
use crate::handlers::http::fetch_schema;
21+
use crate::utils::arrow::record_batches_to_json;
2122
use actix_web::http::header::ContentType;
2223
use actix_web::web::{self, Json};
2324
use actix_web::{Either, FromRequest, HttpRequest, HttpResponse, Responder};
@@ -31,8 +32,9 @@ use futures::stream::once;
3132
use futures::{future, Stream, StreamExt};
3233
use futures_util::Future;
3334
use http::StatusCode;
35+
use itertools::Itertools;
3436
use serde::{Deserialize, Serialize};
35-
use serde_json::json;
37+
use serde_json::{json, Value};
3638
use std::collections::HashMap;
3739
use std::pin::Pin;
3840
use std::sync::Arc;
@@ -44,7 +46,7 @@ use crate::metrics::QUERY_EXECUTE_TIME;
4446
use crate::option::Mode;
4547
use crate::parseable::{StreamNotFound, PARSEABLE};
4648
use crate::query::error::ExecuteError;
47-
use crate::query::{execute, CountsRequest, CountsResponse, Query as LogicalQuery};
49+
use crate::query::{execute, CountsRequest, Query as LogicalQuery};
4850
use crate::query::{TableScanVisitor, QUERY_SESSION};
4951
use crate::rbac::Users;
5052
use crate::response::QueryResponse;
@@ -193,6 +195,7 @@ async fn handle_count_query(
193195
start_time: query_request.start_time.clone(),
194196
end_time: query_request.end_time.clone(),
195197
num_bins: 1,
198+
conditions: None,
196199
};
197200
let count_records = counts_req.get_bin_density().await?;
198201
let count = count_records[0].count;
@@ -358,22 +361,59 @@ fn create_batch_processor(
358361
Err(e) => Err(actix_web::error::ErrorInternalServerError(e)),
359362
}
360363
}
364+
361365
pub async fn get_counts(
362366
req: HttpRequest,
363367
counts_request: Json<CountsRequest>,
364368
) -> Result<impl Responder, QueryError> {
365369
let creds = extract_session_key_from_req(&req)?;
366370
let permissions = Users.get_permissions(&creds);
367371

372+
let body = counts_request.into_inner();
373+
368374
// does user have access to table?
369-
user_auth_for_datasets(&permissions, &[counts_request.stream.clone()])?;
375+
user_auth_for_datasets(&permissions, &[body.stream.clone()])?;
376+
377+
// if the user has given a sql query (counts call with filters applied), then use this flow
378+
// this could include filters or group by
379+
if body.conditions.is_some() {
380+
let sql = body.get_df_sql().await?;
381+
382+
let query_request = Query {
383+
query: sql,
384+
start_time: body.start_time,
385+
end_time: body.end_time,
386+
send_null: true,
387+
fields: true,
388+
streaming: false,
389+
filter_tags: None,
390+
};
370391

371-
let records = counts_request.get_bin_density().await?;
392+
let (records, _) = get_records_and_fields(&query_request, &req).await?;
372393

373-
Ok(web::Json(CountsResponse {
374-
fields: vec!["start_time".into(), "end_time".into(), "count".into()],
375-
records,
376-
}))
394+
if let Some(records) = records {
395+
let json_records = record_batches_to_json(&records)?;
396+
let records = json_records.into_iter().map(Value::Object).collect_vec();
397+
398+
let res = json!({
399+
"fields": vec!["start_time", "endTime", "count"],
400+
"records": records,
401+
});
402+
403+
return Ok(web::Json(res));
404+
} else {
405+
return Err(QueryError::CustomError(
406+
"No data returned for counts SQL".into(),
407+
));
408+
}
409+
}
410+
411+
let records = body.get_bin_density().await?;
412+
let res = json!({
413+
"fields": vec!["start_time", "endTime", "count"],
414+
"records": records,
415+
});
416+
Ok(web::Json(res))
377417
}
378418

379419
pub async fn update_schema_when_distributed(tables: &Vec<String>) -> Result<(), EventError> {

src/prism/logstream/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,7 @@ impl PrismDatasetRequest {
349349
start_time: "1h".to_owned(),
350350
end_time: "now".to_owned(),
351351
num_bins: 10,
352+
conditions: None,
352353
};
353354

354355
let records = count_request.get_bin_density().await?;

0 commit comments

Comments
 (0)