Skip to content

Updates for counts API #1347

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 98 additions & 36 deletions src/alerts/alerts_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*
*/

use std::fmt::Display;

use arrow_array::{Float64Array, Int64Array, RecordBatch};
use datafusion::{
functions_aggregate::{
Expand Down Expand Up @@ -339,46 +341,97 @@ fn get_filter_expr(where_clause: &Conditions) -> Expr {
}
}

pub fn get_filter_string(where_clause: &Conditions) -> Result<String, String> {
match &where_clause.operator {
Some(op) => match op {
LogicalOperator::And => {
let mut exprs = vec![];
for condition in &where_clause.condition_config {
if let Some(value) = &condition.value {
// ad-hoc error check in case value is some and operator is either `is null` or `is not null`
if condition.operator.eq(&WhereConfigOperator::IsNull)
|| condition.operator.eq(&WhereConfigOperator::IsNotNull)
{
return Err("value must be null when operator is either `is null` or `is not null`"
.into());
}
let value = NumberOrString::from_string(value.to_owned());
match value {
NumberOrString::Number(val) => exprs.push(format!(
"\"{}\" {} {}",
condition.column, condition.operator, val
)),
NumberOrString::String(val) => exprs.push(format!(
"\"{}\" {} '{}'",
condition.column,
condition.operator,
val.replace('\'', "''")
)),
}
} else {
exprs.push(format!("\"{}\" {}", condition.column, condition.operator))
}
}

Ok(exprs.join(" AND "))
}
_ => Err(String::from("Invalid option 'or', only 'and' is supported")),
},
_ => Err(String::from(
"Invalid option 'null', only 'and' is supported",
)),
}
}

fn match_alert_operator(expr: &ConditionConfig) -> Expr {
// the form accepts value as a string
// if it can be parsed as a number, then parse it
// else keep it as a string
let value = NumberOrString::from_string(expr.value.clone());

// for maintaining column case
let column = format!(r#""{}""#, expr.column);
match expr.operator {
WhereConfigOperator::Equal => col(column).eq(lit(value)),
WhereConfigOperator::NotEqual => col(column).not_eq(lit(value)),
WhereConfigOperator::LessThan => col(column).lt(lit(value)),
WhereConfigOperator::GreaterThan => col(column).gt(lit(value)),
WhereConfigOperator::LessThanOrEqual => col(column).lt_eq(lit(value)),
WhereConfigOperator::GreaterThanOrEqual => col(column).gt_eq(lit(value)),
WhereConfigOperator::IsNull => col(column).is_null(),
WhereConfigOperator::IsNotNull => col(column).is_not_null(),
WhereConfigOperator::ILike => col(column).ilike(lit(&expr.value)),
WhereConfigOperator::Contains => col(column).like(lit(&expr.value)),
WhereConfigOperator::BeginsWith => Expr::BinaryExpr(BinaryExpr::new(
Box::new(col(column)),
Operator::RegexIMatch,
Box::new(lit(format!("^{}", expr.value))),
)),
WhereConfigOperator::EndsWith => Expr::BinaryExpr(BinaryExpr::new(
Box::new(col(column)),
Operator::RegexIMatch,
Box::new(lit(format!("{}$", expr.value))),
)),
WhereConfigOperator::DoesNotContain => col(column).not_ilike(lit(&expr.value)),
WhereConfigOperator::DoesNotBeginWith => Expr::BinaryExpr(BinaryExpr::new(
Box::new(col(column)),
Operator::RegexNotIMatch,
Box::new(lit(format!("^{}", expr.value))),
)),
WhereConfigOperator::DoesNotEndWith => Expr::BinaryExpr(BinaryExpr::new(
Box::new(col(column)),
Operator::RegexNotIMatch,
Box::new(lit(format!("{}$", expr.value))),
)),
if let Some(value) = &expr.value {
let value = NumberOrString::from_string(value.clone());

// for maintaining column case
let column = format!(r#""{}""#, expr.column);
match expr.operator {
WhereConfigOperator::Equal => col(column).eq(lit(value)),
WhereConfigOperator::NotEqual => col(column).not_eq(lit(value)),
WhereConfigOperator::LessThan => col(column).lt(lit(value)),
WhereConfigOperator::GreaterThan => col(column).gt(lit(value)),
WhereConfigOperator::LessThanOrEqual => col(column).lt_eq(lit(value)),
WhereConfigOperator::GreaterThanOrEqual => col(column).gt_eq(lit(value)),
WhereConfigOperator::ILike => col(column).ilike(lit(value)),
WhereConfigOperator::Contains => col(column).like(lit(value)),
WhereConfigOperator::BeginsWith => Expr::BinaryExpr(BinaryExpr::new(
Box::new(col(column)),
Operator::RegexIMatch,
Box::new(lit(format!("^{}", value))),
)),
WhereConfigOperator::EndsWith => Expr::BinaryExpr(BinaryExpr::new(
Box::new(col(column)),
Operator::RegexIMatch,
Box::new(lit(format!("{}$", value))),
)),
WhereConfigOperator::DoesNotContain => col(column).not_ilike(lit(value)),
WhereConfigOperator::DoesNotBeginWith => Expr::BinaryExpr(BinaryExpr::new(
Box::new(col(column)),
Operator::RegexNotIMatch,
Box::new(lit(format!("^{}", value))),
)),
WhereConfigOperator::DoesNotEndWith => Expr::BinaryExpr(BinaryExpr::new(
Box::new(col(column)),
Operator::RegexNotIMatch,
Box::new(lit(format!("{}$", value))),
)),
_ => unreachable!("value must not be null for operators other than `is null` and `is not null`. Should've been caught in validation")
}
} else {
// for maintaining column case
let column = format!(r#""{}""#, expr.column);
match expr.operator {
WhereConfigOperator::IsNull => col(column).is_null(),
WhereConfigOperator::IsNotNull => col(column).is_not_null(),
_ => unreachable!("value must be null for `is null` and `is not null`. Should've been caught in validation")
}
}
}

Expand Down Expand Up @@ -417,3 +470,12 @@ impl NumberOrString {
}
}
}

impl Display for NumberOrString {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
NumberOrString::Number(v) => write!(f, "{v}"),
NumberOrString::String(v) => write!(f, "{v}"),
}
}
}
53 changes: 41 additions & 12 deletions src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ impl Display for AlertOperator {
}
}

#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, FromStr)]
#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, FromStr, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub enum WhereConfigOperator {
#[serde(rename = "=")]
Expand Down Expand Up @@ -326,7 +326,7 @@ pub struct FilterConfig {
pub struct ConditionConfig {
pub column: String,
pub operator: WhereConfigOperator,
pub value: String,
pub value: Option<String>,
}

#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)]
Expand All @@ -343,20 +343,28 @@ impl Conditions {
LogicalOperator::And | LogicalOperator::Or => {
let expr1 = &self.condition_config[0];
let expr2 = &self.condition_config[1];
format!(
"[{} {} {} {op} {} {} {}]",
expr1.column,
expr1.operator,
expr1.value,
expr2.column,
expr2.operator,
expr2.value
)
let expr1_msg = if let Some(val) = &expr1.value {
format!("{} {} {}", expr1.column, expr1.operator, val)
} else {
format!("{} {}", expr1.column, expr1.operator)
};

let expr2_msg = if let Some(val) = &expr2.value {
format!("{} {} {}", expr2.column, expr2.operator, val)
} else {
format!("{} {}", expr2.column, expr2.operator)
};

format!("[{} {op} {}]", expr1_msg, expr2_msg)
}
},
None => {
let expr = &self.condition_config[0];
format!("[{} {} {}]", expr.column, expr.operator, expr.value)
if let Some(val) = &expr.value {
format!("{} {} {}", expr.column, expr.operator, val)
} else {
format!("{} {}", expr.column, expr.operator)
}
}
}
}
Expand Down Expand Up @@ -645,6 +653,27 @@ impl AlertConfig {
}
}
}

// validate that the value should be None in case of `is null` and `is not null`
for condition in config.condition_config.iter() {
let needs_no_value = matches!(
condition.operator,
WhereConfigOperator::IsNull | WhereConfigOperator::IsNotNull
);

if needs_no_value && condition.value.is_some() {
return Err(AlertError::CustomError(
"value must be null when operator is either `is null` or `is not null`"
.into(),
));
}
if !needs_no_value && condition.value.as_ref().is_none_or(|v| v.trim().is_empty()) {
return Err(AlertError::CustomError(
"value must not be null when operator is neither `is null` nor `is not null`"
.into(),
));
}
}
Ok(())
}

Expand Down
56 changes: 48 additions & 8 deletions src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

use crate::event::error::EventError;
use crate::handlers::http::fetch_schema;
use crate::utils::arrow::record_batches_to_json;
use actix_web::http::header::ContentType;
use actix_web::web::{self, Json};
use actix_web::{Either, FromRequest, HttpRequest, HttpResponse, Responder};
Expand All @@ -31,8 +32,9 @@ use futures::stream::once;
use futures::{future, Stream, StreamExt};
use futures_util::Future;
use http::StatusCode;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use serde_json::json;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
Expand All @@ -44,7 +46,7 @@ use crate::metrics::QUERY_EXECUTE_TIME;
use crate::option::Mode;
use crate::parseable::{StreamNotFound, PARSEABLE};
use crate::query::error::ExecuteError;
use crate::query::{execute, CountsRequest, CountsResponse, Query as LogicalQuery};
use crate::query::{execute, CountsRequest, Query as LogicalQuery};
use crate::query::{TableScanVisitor, QUERY_SESSION};
use crate::rbac::Users;
use crate::response::QueryResponse;
Expand Down Expand Up @@ -193,6 +195,7 @@ async fn handle_count_query(
start_time: query_request.start_time.clone(),
end_time: query_request.end_time.clone(),
num_bins: 1,
conditions: None,
};
let count_records = counts_req.get_bin_density().await?;
let count = count_records[0].count;
Expand Down Expand Up @@ -358,22 +361,59 @@ fn create_batch_processor(
Err(e) => Err(actix_web::error::ErrorInternalServerError(e)),
}
}

pub async fn get_counts(
req: HttpRequest,
counts_request: Json<CountsRequest>,
) -> Result<impl Responder, QueryError> {
let creds = extract_session_key_from_req(&req)?;
let permissions = Users.get_permissions(&creds);

let body = counts_request.into_inner();

// does user have access to table?
user_auth_for_datasets(&permissions, &[counts_request.stream.clone()])?;
user_auth_for_datasets(&permissions, &[body.stream.clone()])?;

// if the user has given a sql query (counts call with filters applied), then use this flow
// this could include filters or group by
if body.conditions.is_some() {
let sql = body.get_df_sql().await?;

let query_request = Query {
query: sql,
start_time: body.start_time,
end_time: body.end_time,
send_null: true,
fields: true,
streaming: false,
filter_tags: None,
};

let records = counts_request.get_bin_density().await?;
let (records, _) = get_records_and_fields(&query_request, &req).await?;

Ok(web::Json(CountsResponse {
fields: vec!["start_time".into(), "end_time".into(), "count".into()],
records,
}))
if let Some(records) = records {
let json_records = record_batches_to_json(&records)?;
let records = json_records.into_iter().map(Value::Object).collect_vec();

let res = json!({
"fields": vec!["start_time", "endTime", "count"],
"records": records,
});

return Ok(web::Json(res));
} else {
return Err(QueryError::CustomError(
"No data returned for counts SQL".into(),
));
}
}

let records = body.get_bin_density().await?;
let res = json!({
"fields": vec!["start_time", "endTime", "count"],
"records": records,
});
Ok(web::Json(res))
}

pub async fn update_schema_when_distributed(tables: &Vec<String>) -> Result<(), EventError> {
Expand Down
1 change: 1 addition & 0 deletions src/prism/logstream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ impl PrismDatasetRequest {
start_time: "1h".to_owned(),
end_time: "now".to_owned(),
num_bins: 10,
conditions: None,
};

let records = count_request.get_bin_density().await?;
Expand Down
Loading
Loading