Skip to content

Commit b21f5a0

Browse files
Merge branch 'main' into new-dashboard
2 parents 914b662 + ed4ac71 commit b21f5a0

File tree

16 files changed

+1285
-122
lines changed

16 files changed

+1285
-122
lines changed

src/alerts/alerts_utils.rs

Lines changed: 98 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*
1717
*/
1818

19+
use std::fmt::Display;
20+
1921
use arrow_array::{Float64Array, Int64Array, RecordBatch};
2022
use datafusion::{
2123
functions_aggregate::{
@@ -339,46 +341,97 @@ fn get_filter_expr(where_clause: &Conditions) -> Expr {
339341
}
340342
}
341343

344+
pub fn get_filter_string(where_clause: &Conditions) -> Result<String, String> {
345+
match &where_clause.operator {
346+
Some(op) => match op {
347+
LogicalOperator::And => {
348+
let mut exprs = vec![];
349+
for condition in &where_clause.condition_config {
350+
if let Some(value) = &condition.value {
351+
// ad-hoc error check in case value is some and operator is either `is null` or `is not null`
352+
if condition.operator.eq(&WhereConfigOperator::IsNull)
353+
|| condition.operator.eq(&WhereConfigOperator::IsNotNull)
354+
{
355+
return Err("value must be null when operator is either `is null` or `is not null`"
356+
.into());
357+
}
358+
let value = NumberOrString::from_string(value.to_owned());
359+
match value {
360+
NumberOrString::Number(val) => exprs.push(format!(
361+
"\"{}\" {} {}",
362+
condition.column, condition.operator, val
363+
)),
364+
NumberOrString::String(val) => exprs.push(format!(
365+
"\"{}\" {} '{}'",
366+
condition.column,
367+
condition.operator,
368+
val.replace('\'', "''")
369+
)),
370+
}
371+
} else {
372+
exprs.push(format!("\"{}\" {}", condition.column, condition.operator))
373+
}
374+
}
375+
376+
Ok(exprs.join(" AND "))
377+
}
378+
_ => Err(String::from("Invalid option 'or', only 'and' is supported")),
379+
},
380+
_ => Err(String::from(
381+
"Invalid option 'null', only 'and' is supported",
382+
)),
383+
}
384+
}
385+
342386
fn match_alert_operator(expr: &ConditionConfig) -> Expr {
343387
// the form accepts value as a string
344388
// if it can be parsed as a number, then parse it
345389
// else keep it as a string
346-
let value = NumberOrString::from_string(expr.value.clone());
347-
348-
// for maintaining column case
349-
let column = format!(r#""{}""#, expr.column);
350-
match expr.operator {
351-
WhereConfigOperator::Equal => col(column).eq(lit(value)),
352-
WhereConfigOperator::NotEqual => col(column).not_eq(lit(value)),
353-
WhereConfigOperator::LessThan => col(column).lt(lit(value)),
354-
WhereConfigOperator::GreaterThan => col(column).gt(lit(value)),
355-
WhereConfigOperator::LessThanOrEqual => col(column).lt_eq(lit(value)),
356-
WhereConfigOperator::GreaterThanOrEqual => col(column).gt_eq(lit(value)),
357-
WhereConfigOperator::IsNull => col(column).is_null(),
358-
WhereConfigOperator::IsNotNull => col(column).is_not_null(),
359-
WhereConfigOperator::ILike => col(column).ilike(lit(&expr.value)),
360-
WhereConfigOperator::Contains => col(column).like(lit(&expr.value)),
361-
WhereConfigOperator::BeginsWith => Expr::BinaryExpr(BinaryExpr::new(
362-
Box::new(col(column)),
363-
Operator::RegexIMatch,
364-
Box::new(lit(format!("^{}", expr.value))),
365-
)),
366-
WhereConfigOperator::EndsWith => Expr::BinaryExpr(BinaryExpr::new(
367-
Box::new(col(column)),
368-
Operator::RegexIMatch,
369-
Box::new(lit(format!("{}$", expr.value))),
370-
)),
371-
WhereConfigOperator::DoesNotContain => col(column).not_ilike(lit(&expr.value)),
372-
WhereConfigOperator::DoesNotBeginWith => Expr::BinaryExpr(BinaryExpr::new(
373-
Box::new(col(column)),
374-
Operator::RegexNotIMatch,
375-
Box::new(lit(format!("^{}", expr.value))),
376-
)),
377-
WhereConfigOperator::DoesNotEndWith => Expr::BinaryExpr(BinaryExpr::new(
378-
Box::new(col(column)),
379-
Operator::RegexNotIMatch,
380-
Box::new(lit(format!("{}$", expr.value))),
381-
)),
390+
if let Some(value) = &expr.value {
391+
let value = NumberOrString::from_string(value.clone());
392+
393+
// for maintaining column case
394+
let column = format!(r#""{}""#, expr.column);
395+
match expr.operator {
396+
WhereConfigOperator::Equal => col(column).eq(lit(value)),
397+
WhereConfigOperator::NotEqual => col(column).not_eq(lit(value)),
398+
WhereConfigOperator::LessThan => col(column).lt(lit(value)),
399+
WhereConfigOperator::GreaterThan => col(column).gt(lit(value)),
400+
WhereConfigOperator::LessThanOrEqual => col(column).lt_eq(lit(value)),
401+
WhereConfigOperator::GreaterThanOrEqual => col(column).gt_eq(lit(value)),
402+
WhereConfigOperator::ILike => col(column).ilike(lit(value)),
403+
WhereConfigOperator::Contains => col(column).like(lit(value)),
404+
WhereConfigOperator::BeginsWith => Expr::BinaryExpr(BinaryExpr::new(
405+
Box::new(col(column)),
406+
Operator::RegexIMatch,
407+
Box::new(lit(format!("^{}", value))),
408+
)),
409+
WhereConfigOperator::EndsWith => Expr::BinaryExpr(BinaryExpr::new(
410+
Box::new(col(column)),
411+
Operator::RegexIMatch,
412+
Box::new(lit(format!("{}$", value))),
413+
)),
414+
WhereConfigOperator::DoesNotContain => col(column).not_ilike(lit(value)),
415+
WhereConfigOperator::DoesNotBeginWith => Expr::BinaryExpr(BinaryExpr::new(
416+
Box::new(col(column)),
417+
Operator::RegexNotIMatch,
418+
Box::new(lit(format!("^{}", value))),
419+
)),
420+
WhereConfigOperator::DoesNotEndWith => Expr::BinaryExpr(BinaryExpr::new(
421+
Box::new(col(column)),
422+
Operator::RegexNotIMatch,
423+
Box::new(lit(format!("{}$", value))),
424+
)),
425+
_ => unreachable!("value must not be null for operators other than `is null` and `is not null`. Should've been caught in validation")
426+
}
427+
} else {
428+
// for maintaining column case
429+
let column = format!(r#""{}""#, expr.column);
430+
match expr.operator {
431+
WhereConfigOperator::IsNull => col(column).is_null(),
432+
WhereConfigOperator::IsNotNull => col(column).is_not_null(),
433+
_ => unreachable!("value must be null for `is null` and `is not null`. Should've been caught in validation")
434+
}
382435
}
383436
}
384437

@@ -417,3 +470,12 @@ impl NumberOrString {
417470
}
418471
}
419472
}
473+
474+
impl Display for NumberOrString {
475+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
476+
match self {
477+
NumberOrString::Number(v) => write!(f, "{v}"),
478+
NumberOrString::String(v) => write!(f, "{v}"),
479+
}
480+
}
481+
}

src/alerts/mod.rs

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ impl Display for AlertOperator {
220220
}
221221
}
222222

223-
#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, FromStr)]
223+
#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, FromStr, PartialEq, Eq)]
224224
#[serde(rename_all = "camelCase")]
225225
pub enum WhereConfigOperator {
226226
#[serde(rename = "=")]
@@ -326,7 +326,7 @@ pub struct FilterConfig {
326326
pub struct ConditionConfig {
327327
pub column: String,
328328
pub operator: WhereConfigOperator,
329-
pub value: String,
329+
pub value: Option<String>,
330330
}
331331

332332
#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)]
@@ -343,20 +343,28 @@ impl Conditions {
343343
LogicalOperator::And | LogicalOperator::Or => {
344344
let expr1 = &self.condition_config[0];
345345
let expr2 = &self.condition_config[1];
346-
format!(
347-
"[{} {} {} {op} {} {} {}]",
348-
expr1.column,
349-
expr1.operator,
350-
expr1.value,
351-
expr2.column,
352-
expr2.operator,
353-
expr2.value
354-
)
346+
let expr1_msg = if let Some(val) = &expr1.value {
347+
format!("{} {} {}", expr1.column, expr1.operator, val)
348+
} else {
349+
format!("{} {}", expr1.column, expr1.operator)
350+
};
351+
352+
let expr2_msg = if let Some(val) = &expr2.value {
353+
format!("{} {} {}", expr2.column, expr2.operator, val)
354+
} else {
355+
format!("{} {}", expr2.column, expr2.operator)
356+
};
357+
358+
format!("[{} {op} {}]", expr1_msg, expr2_msg)
355359
}
356360
},
357361
None => {
358362
let expr = &self.condition_config[0];
359-
format!("[{} {} {}]", expr.column, expr.operator, expr.value)
363+
if let Some(val) = &expr.value {
364+
format!("{} {} {}", expr.column, expr.operator, val)
365+
} else {
366+
format!("{} {}", expr.column, expr.operator)
367+
}
360368
}
361369
}
362370
}
@@ -645,6 +653,27 @@ impl AlertConfig {
645653
}
646654
}
647655
}
656+
657+
// validate that the value should be None in case of `is null` and `is not null`
658+
for condition in config.condition_config.iter() {
659+
let needs_no_value = matches!(
660+
condition.operator,
661+
WhereConfigOperator::IsNull | WhereConfigOperator::IsNotNull
662+
);
663+
664+
if needs_no_value && condition.value.is_some() {
665+
return Err(AlertError::CustomError(
666+
"value must be null when operator is either `is null` or `is not null`"
667+
.into(),
668+
));
669+
}
670+
if !needs_no_value && condition.value.as_ref().is_none_or(|v| v.trim().is_empty()) {
671+
return Err(AlertError::CustomError(
672+
"value must not be null when operator is neither `is null` nor `is not null`"
673+
.into(),
674+
));
675+
}
676+
}
648677
Ok(())
649678
}
650679

src/cli.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,41 @@ pub struct Options {
388388
help = "Maximum level of flattening allowed for events"
389389
)]
390390
pub event_flatten_level: usize,
391+
392+
// maximum limit to store the statistics for a field
393+
#[arg(
394+
long,
395+
env = "P_MAX_FIELD_STATISTICS",
396+
default_value = "50",
397+
help = "Maximum number of field statistics to store"
398+
)]
399+
pub max_field_statistics: usize,
400+
401+
// collect dataset stats
402+
#[arg(
403+
long,
404+
env = "P_COLLECT_DATASET_STATS",
405+
default_value = "false",
406+
help = "Enable/Disable collecting dataset stats"
407+
)]
408+
pub collect_dataset_stats: bool,
409+
410+
// the duration during which local sync should be completed
411+
#[arg(
412+
long,
413+
env = "P_LOCAL_SYNC_THRESHOLD",
414+
default_value = "30",
415+
help = "Local sync threshold in seconds"
416+
)]
417+
pub local_sync_threshold: u64,
418+
// the duration during which object store sync should be completed
419+
#[arg(
420+
long,
421+
env = "P_OBJECT_STORE_SYNC_THRESHOLD",
422+
default_value = "15",
423+
help = "Object store sync threshold in seconds"
424+
)]
425+
pub object_store_sync_threshold: u64,
391426
}
392427

393428
#[derive(Parser, Debug)]

src/connectors/kafka/processor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ impl ParseableSinkProcessor {
5555
.create_stream_if_not_exists(
5656
stream_name,
5757
StreamType::UserDefined,
58+
None,
5859
vec![log_source_entry],
5960
)
6061
.await?;

src/handlers/http/health_check.rs

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,35 @@ pub async fn check_shutdown_middleware(
5656

5757
// This function is called when the server is shutting down
5858
pub async fn shutdown() {
59-
// Set the shutdown flag to true
60-
let mut shutdown_flag = SIGNAL_RECEIVED.lock().await;
61-
*shutdown_flag = true;
59+
// Set shutdown flag to true
60+
set_shutdown_flag().await;
6261

6362
//sleep for 5 secs to allow any ongoing requests to finish
6463
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
64+
65+
// Perform sync operations
66+
perform_sync_operations().await;
67+
68+
// If collect_dataset_stats is enabled, perform sync operations
69+
// This is to ensure that all stats data is synced before the server shuts down
70+
if PARSEABLE.options.collect_dataset_stats {
71+
perform_sync_operations().await;
72+
}
73+
}
74+
75+
async fn set_shutdown_flag() {
76+
let mut shutdown_flag = SIGNAL_RECEIVED.lock().await;
77+
*shutdown_flag = true;
78+
}
79+
80+
async fn perform_sync_operations() {
81+
// Perform local sync
82+
perform_local_sync().await;
83+
// Perform object store sync
84+
perform_object_store_sync().await;
85+
}
86+
87+
async fn perform_local_sync() {
6588
let mut local_sync_joinset = JoinSet::new();
6689

6790
// Sync staging
@@ -76,7 +99,9 @@ pub async fn shutdown() {
7699
Err(err) => error!("Failed to join async task: {err}"),
77100
}
78101
}
102+
}
79103

104+
async fn perform_object_store_sync() {
80105
// Sync object store
81106
let mut object_store_joinset = JoinSet::new();
82107
sync_all_streams(&mut object_store_joinset);

src/handlers/http/ingest.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ pub async fn ingest(
100100
.create_stream_if_not_exists(
101101
&stream_name,
102102
StreamType::UserDefined,
103+
None,
103104
vec![log_source_entry.clone()],
104105
)
105106
.await?;
@@ -183,6 +184,7 @@ pub async fn handle_otel_logs_ingestion(
183184
.create_stream_if_not_exists(
184185
&stream_name,
185186
StreamType::UserDefined,
187+
None,
186188
vec![log_source_entry.clone()],
187189
)
188190
.await?;
@@ -248,6 +250,7 @@ pub async fn handle_otel_metrics_ingestion(
248250
.create_stream_if_not_exists(
249251
&stream_name,
250252
StreamType::UserDefined,
253+
None,
251254
vec![log_source_entry.clone()],
252255
)
253256
.await?;
@@ -313,6 +316,7 @@ pub async fn handle_otel_traces_ingestion(
313316
.create_stream_if_not_exists(
314317
&stream_name,
315318
StreamType::UserDefined,
319+
None,
316320
vec![log_source_entry.clone()],
317321
)
318322
.await?;

0 commit comments

Comments
 (0)