Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 9 additions & 18 deletions src/flow/src/batching_mode/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ use query::QueryEngineRef;
use query::query_engine::DefaultSerializer;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt, ensure};
use sql::parser::{ParseOptions, ParserContext};
use sql::statements::statement::Statement;
use sql::parsers::utils::is_tql;
use store_api::mito_engine_options::MERGE_MODE_KEY;
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::table::adapter::DfTableProviderAdapter;
Expand Down Expand Up @@ -84,22 +83,14 @@ pub struct TaskConfig {
}

fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
let stmts =
ParserContext::create_with_dialect(query, query_ctx.sql_dialect(), ParseOptions::default())
.map_err(BoxedError::new)
.context(ExternalSnafu)?;

ensure!(
stmts.len() == 1,
InvalidQuerySnafu {
reason: format!("Expect only one statement, found {}", stmts.len())
}
);
let stmt = &stmts[0];
match stmt {
Statement::Tql(_) => Ok(QueryType::Tql),
_ => Ok(QueryType::Sql),
}
let is_tql = is_tql(query_ctx.sql_dialect(), query)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
Ok(if is_tql {
QueryType::Tql
} else {
QueryType::Sql
})
}

fn is_merge_mode_last_non_null(options: &HashMap<String, String>) -> bool {
Expand Down
99 changes: 99 additions & 0 deletions src/operator/src/expr_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1158,6 +1158,105 @@ TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http
);
}

#[test]
fn test_create_flow_tql_cte_source_tables() {
let sql = r#"
CREATE FLOW calc_cte
SINK TO metric_cte_sink
EVAL INTERVAL '1m'
AS
WITH tql(ts, the_value) AS (
TQL EVAL (now() - '1m'::interval, now(), '5s') metric_cte
)
SELECT * FROM tql;
"#;

let stmt =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap()
.pop()
.unwrap();

let Statement::CreateFlow(create_flow) = stmt else {
unreachable!()
};
let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();

let to_dot_sep =
|c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
assert_eq!(1, expr.source_table_names.len());
assert_eq!(
"greptime.public.metric_cte",
to_dot_sep(expr.source_table_names[0].clone())
);
}

#[test]
fn test_create_flow_tql_cte_source_tables_quoted_cte_name() {
let sql = r#"
CREATE FLOW calc_cte
SINK TO metric_cte_sink
EVAL INTERVAL '1m'
AS
WITH "TQL"(ts, the_value) AS (
TQL EVAL (now() - '1m'::interval, now(), '5s') metric_cte
)
SELECT * FROM "TQL";
"#;

let stmt =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap()
.pop()
.unwrap();

let Statement::CreateFlow(create_flow) = stmt else {
unreachable!()
};
let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();

let to_dot_sep =
|c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
assert_eq!(1, expr.source_table_names.len());
assert_eq!(
"greptime.public.metric_cte",
to_dot_sep(expr.source_table_names[0].clone())
);
}

#[test]
fn test_create_flow_tql_cte_source_tables_same_name() {
let sql = r#"
CREATE FLOW calc_cte
SINK TO metric_cte_sink
EVAL INTERVAL '1m'
AS
WITH tql(ts, the_value) AS (
TQL EVAL (now() - '1m'::interval, now(), '5s') tql
)
SELECT * FROM tql;
"#;

let stmt =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap()
.pop()
.unwrap();

let Statement::CreateFlow(create_flow) = stmt else {
unreachable!()
};
let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap();

let to_dot_sep =
|c: TableName| format!("{}.{}.{}", c.catalog_name, c.schema_name, c.table_name);
assert_eq!(1, expr.source_table_names.len());
assert_eq!(
"greptime.public.tql",
to_dot_sep(expr.source_table_names[0].clone())
);
}

#[test]
fn test_create_flow_expr() {
let sql = r"
Expand Down
8 changes: 8 additions & 0 deletions src/operator/src/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ use session::context::QueryContextRef;
use session::table_name::table_idents_to_full_name;
use snafu::{OptionExt, ResultExt, ensure};
use sql::parser::{ParseOptions, ParserContext};
use sql::parsers::utils::is_tql;
use sql::statements::OptionMap;
#[cfg(feature = "enterprise")]
use sql::statements::alter::trigger::AlterTrigger;
Expand Down Expand Up @@ -716,6 +717,13 @@ impl StatementExecutor {
);
let stmt = &stmts[0];

if is_tql(query_ctx.sql_dialect(), &expr.sql)
.map_err(BoxedError::new)
.context(ExternalSnafu)?
{
return Ok(FlowType::Batching);
}

// support tql parse too
let plan = match stmt {
// prom ql is only supported in batching mode
Expand Down
153 changes: 148 additions & 5 deletions src/sql/src/parsers/create_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ use table::requests::validate_database_option;

use crate::ast::{ColumnDef, Ident, ObjectNamePartExt};
use crate::error::{
self, InvalidColumnOptionSnafu, InvalidDatabaseOptionSnafu, InvalidIntervalSnafu,
InvalidSqlSnafu, InvalidTimeIndexSnafu, MissingTimeIndexSnafu, Result, SyntaxSnafu,
UnexpectedSnafu, UnsupportedSnafu,
self, InvalidColumnOptionSnafu, InvalidDatabaseOptionSnafu, InvalidFlowQuerySnafu,
InvalidIntervalSnafu, InvalidSqlSnafu, InvalidTimeIndexSnafu, MissingTimeIndexSnafu, Result,
SyntaxSnafu, UnexpectedSnafu, UnsupportedSnafu,
};
use crate::parser::{FLOW, ParserContext};
use crate::parsers::tql_parser;
Expand Down Expand Up @@ -343,7 +343,7 @@ impl<'a> ParserContext<'a> {
.expect_keyword(Keyword::AS)
.context(SyntaxSnafu)?;

let query = Box::new(self.parse_sql_or_tql(true)?);
let query = Box::new(self.parse_flow_sql_or_tql(true)?);

Ok(Statement::CreateFlow(CreateFlow {
flow_name,
Expand All @@ -357,14 +357,20 @@ impl<'a> ParserContext<'a> {
}))
}

fn parse_sql_or_tql(&mut self, require_now_expr: bool) -> Result<SqlOrTql> {
fn parse_flow_sql_or_tql(&mut self, require_now_expr: bool) -> Result<SqlOrTql> {
let start_loc = self.parser.peek_token().span.start;
let start_index = location_to_index(self.sql, &start_loc);

let starts_with_with = matches!(
self.parser.peek_token().token,
Token::Word(w) if w.keyword == Keyword::WITH
);

// only accept sql or tql
let query = match self.parser.peek_token().token {
Token::Word(w) => match w.keyword {
Keyword::SELECT => self.parse_query(),
Keyword::WITH => self.parse_with_tql_with_now(require_now_expr),
Keyword::NoKeyword
if w.quote_style.is_none() && w.value.to_uppercase() == tql_parser::TQL =>
{
Expand All @@ -376,6 +382,23 @@ impl<'a> ParserContext<'a> {
_ => self.unsupported(self.peek_token_as_string()),
}?;

if starts_with_with {
let Statement::Query(query) = &query else {
return InvalidFlowQuerySnafu {
reason: "Expect a query after WITH".to_string(),
}
.fail();
};

if !utils::is_simple_tql_cte_query(query) {
return InvalidFlowQuerySnafu {
reason: "WITH is only supported for the simplest TQL CTE in CREATE FLOW"
.to_string(),
}
.fail();
}
}

let end_token = self.parser.peek_token();

let raw_query = if end_token == Token::EOF {
Expand All @@ -386,6 +409,7 @@ impl<'a> ParserContext<'a> {
&self.sql[start_index..end_index.min(self.sql.len())]
};
let raw_query = raw_query.trim_end_matches(";");

let query = SqlOrTql::try_from_statement(query, raw_query)?;
Ok(query)
}
Expand Down Expand Up @@ -1808,6 +1832,125 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;",
}
}

#[test]
fn test_parse_create_flow_with_tql_cte_query() {
let sql = r#"
CREATE FLOW calc_reqs_cte
SINK TO cnt_reqs_cte
EVAL INTERVAL '1m'
AS
WITH tql(the_timestamp, the_value) AS (
TQL EVAL (now() - '1m'::interval, now(), '5s') metric
)
SELECT * FROM tql;
"#;

let stmts =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap();
assert_eq!(1, stmts.len());
let Statement::CreateFlow(create_flow) = &stmts[0] else {
panic!("unexpected stmt: {:?}", stmts[0]);
};

let query = create_flow.query.to_string();
assert!(query.to_uppercase().contains("WITH"));
assert!(query.to_uppercase().contains("TQL EVAL"));
}

#[test]
fn test_parse_create_flow_with_sql_cte_is_unsupported() {
let sql = r#"
CREATE FLOW f
SINK TO s
AS
WITH cte AS (SELECT 1) SELECT * FROM cte;
"#;

let err =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap_err();
let msg = err.to_string();
assert!(msg.to_uppercase().contains("WITH"), "err: {msg}");
}

#[test]
fn test_parse_create_flow_with_tql_cte_requires_now_expr() {
let sql = r#"
CREATE FLOW f
SINK TO s
EVAL INTERVAL '1m'
AS
WITH tql(ts, val) AS (
TQL EVAL (0, 15, '5s') metric
)
SELECT * FROM tql;
"#;

let err =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap_err();

let msg = format!("{err:?}");
assert!(
msg.contains("Expected expression containing `now()`"),
"unexpected err: {msg}"
);
}

#[test]
fn test_parse_create_flow_with_tql_cte_non_select_star_is_unsupported() {
let sql = r#"
CREATE FLOW f
SINK TO s
AS
WITH tql(ts, val) AS (
TQL EVAL (now() - '1m'::interval, now(), '5s') metric
)
SELECT ts FROM tql;
"#;

let err =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap_err();
assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}");
}

#[test]
fn test_parse_create_flow_with_tql_cte_filter_is_unsupported() {
let sql = r#"
CREATE FLOW f
SINK TO s
AS
WITH tql(ts, val) AS (
TQL EVAL (now() - '1m'::interval, now(), '5s') metric
)
SELECT * FROM tql WHERE ts > 0;
"#;

let err =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap_err();
assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}");
}

#[test]
fn test_parse_create_flow_with_mixed_sql_tql_cte_is_unsupported() {
let sql = r#"
CREATE FLOW f
SINK TO s
AS
WITH s1 AS (SELECT 1),
tql(ts, val) AS (TQL EVAL (now() - '1m'::interval, now(), '5s') metric)
SELECT * FROM tql;
"#;

let err =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap_err();
assert!(err.to_string().contains("simplest TQL CTE"), "err: {err}");
}

#[test]
fn test_create_flow_no_month() {
let sql = r"
Expand Down
5 changes: 3 additions & 2 deletions src/sql/src/parsers/create_parser/trigger.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::time::Duration;

use snafu::{OptionExt, ResultExt, ensure};
use sqlparser::ast::Query;
use sqlparser::keywords::Keyword;
use sqlparser::parser::Parser;
use sqlparser::tokenizer::Token;
Expand All @@ -16,6 +15,7 @@ use crate::statements::create::SqlOrTql;
use crate::statements::create::trigger::{
AlertManagerWebhook, ChannelType, CreateTrigger, DurationExpr, NotifyChannel, TriggerOn,
};
use crate::statements::query::Query;
use crate::statements::statement::Statement;
use crate::util::{location_to_index, parse_option_string};

Expand Down Expand Up @@ -259,7 +259,8 @@ impl<'a> ParserContext<'a> {
);
let raw_sql = &self.sql[start_index..end_index];

Ok((*sql_query, raw_sql.trim().to_string()))
let query = (*sql_query).try_into()?;
Ok((query, raw_sql.trim().to_string()))
}

pub(crate) fn parse_trigger_for(
Expand Down
Loading
Loading