Skip to content

Commit 22669da

Browse files
authored
Introduce logical dedup operators for PPL (#5014)
* Add new operator for dedup Signed-off-by: Heng Qian <qianheng@amazon.com> * Add customized field trimmer for dedup Signed-off-by: Heng Qian <qianheng@amazon.com> * Support convert dedup to basic sql operators Signed-off-by: Heng Qian <qianheng@amazon.com> * Fix UT Signed-off-by: Heng Qian <qianheng@amazon.com> * Fix IT Signed-off-by: Heng Qian <qianheng@amazon.com> * Fix CI Signed-off-by: Heng Qian <qianheng@amazon.com> * Fix IT Signed-off-by: Heng Qian <qianheng@amazon.com> * Fix IT Signed-off-by: Heng Qian <qianheng@amazon.com> * Add java doc Signed-off-by: Heng Qian <qianheng@amazon.com> * Revert some code and add dedup simplify rule Signed-off-by: Heng Qian <qianheng@amazon.com> * Fix IT Signed-off-by: Heng Qian <qianheng@amazon.com> * Address comments Signed-off-by: Heng Qian <qianheng@amazon.com> * Address comments Signed-off-by: Heng Qian <qianheng@amazon.com> * Address comments Signed-off-by: Heng Qian <qianheng@amazon.com> * Address comments Signed-off-by: Heng Qian <qianheng@amazon.com> --------- Signed-off-by: Heng Qian <qianheng@amazon.com>
1 parent 47709a0 commit 22669da

File tree

124 files changed

+911
-414
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

124 files changed

+911
-414
lines changed

core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
import org.apache.calcite.rex.RexLambdaRef;
2222
import org.apache.calcite.rex.RexNode;
2323
import org.apache.calcite.tools.FrameworkConfig;
24-
import org.apache.calcite.tools.RelBuilder;
2524
import org.opensearch.sql.ast.expression.UnresolvedExpression;
2625
import org.opensearch.sql.calcite.utils.CalciteToolsHelper;
26+
import org.opensearch.sql.calcite.utils.CalciteToolsHelper.OpenSearchRelBuilder;
2727
import org.opensearch.sql.common.setting.Settings;
2828
import org.opensearch.sql.executor.QueryType;
2929
import org.opensearch.sql.expression.function.FunctionProperties;
@@ -32,7 +32,7 @@ public class CalcitePlanContext {
3232

3333
public FrameworkConfig config;
3434
public final Connection connection;
35-
public final RelBuilder relBuilder;
35+
public final OpenSearchRelBuilder relBuilder;
3636
public final ExtendedRexBuilder rexBuilder;
3737
public final FunctionProperties functionProperties;
3838
public final QueryType queryType;

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 8 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
import static org.opensearch.sql.ast.tree.Sort.SortOption.DEFAULT_DESC;
1515
import static org.opensearch.sql.ast.tree.Sort.SortOrder.ASC;
1616
import static org.opensearch.sql.ast.tree.Sort.SortOrder.DESC;
17-
import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_FOR_DEDUP;
18-
import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_FOR_JOIN_MAX_DEDUP;
17+
import static org.opensearch.sql.calcite.plan.rule.PPLDedupConvertRule.buildDedupNotNull;
18+
import static org.opensearch.sql.calcite.plan.rule.PPLDedupConvertRule.buildDedupOrNull;
1919
import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_FOR_MAIN;
2020
import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_FOR_RARE_TOP;
2121
import static org.opensearch.sql.calcite.utils.PlanUtils.ROW_NUMBER_COLUMN_FOR_STREAMSTATS;
@@ -146,9 +146,9 @@
146146
import org.opensearch.sql.ast.tree.Values;
147147
import org.opensearch.sql.ast.tree.Window;
148148
import org.opensearch.sql.calcite.plan.AliasFieldsWrappable;
149-
import org.opensearch.sql.calcite.plan.LogicalSystemLimit;
150-
import org.opensearch.sql.calcite.plan.LogicalSystemLimit.SystemLimitType;
151149
import org.opensearch.sql.calcite.plan.OpenSearchConstants;
150+
import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit;
151+
import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit.SystemLimitType;
152152
import org.opensearch.sql.calcite.utils.BinUtils;
153153
import org.opensearch.sql.calcite.utils.JoinAndLookupUtils;
154154
import org.opensearch.sql.calcite.utils.PPLHintUtils;
@@ -1330,7 +1330,7 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) {
13301330
: duplicatedFieldNames.stream()
13311331
.map(a -> (RexNode) context.relBuilder.field(a))
13321332
.toList();
1333-
buildDedupNotNull(context, dedupeFields, allowedDuplication, true);
1333+
buildDedupNotNull(context.relBuilder, dedupeFields, allowedDuplication);
13341334
}
13351335
// add LogicalSystemLimit after dedup
13361336
addSysLimitForJoinSubsearch(context);
@@ -1388,7 +1388,7 @@ public RelNode visitJoin(Join node, CalcitePlanContext context) {
13881388
List<RexNode> dedupeFields =
13891389
getRightColumnsInJoinCriteria(context.relBuilder, joinCondition);
13901390

1391-
buildDedupNotNull(context, dedupeFields, allowedDuplication, true);
1391+
buildDedupNotNull(context.relBuilder, dedupeFields, allowedDuplication);
13921392
}
13931393
// add LogicalSystemLimit after dedup
13941394
addSysLimitForJoinSubsearch(context);
@@ -1565,81 +1565,13 @@ public RelNode visitDedupe(Dedupe node, CalcitePlanContext context) {
15651565
List<RexNode> dedupeFields =
15661566
node.getFields().stream().map(f -> rexVisitor.analyze(f, context)).toList();
15671567
if (keepEmpty) {
1568-
buildDedupOrNull(context, dedupeFields, allowedDuplication);
1568+
buildDedupOrNull(context.relBuilder, dedupeFields, allowedDuplication);
15691569
} else {
1570-
buildDedupNotNull(context, dedupeFields, allowedDuplication, false);
1570+
buildDedupNotNull(context.relBuilder, dedupeFields, allowedDuplication);
15711571
}
15721572
return context.relBuilder.peek();
15731573
}
15741574

1575-
private static void buildDedupOrNull(
1576-
CalcitePlanContext context, List<RexNode> dedupeFields, Integer allowedDuplication) {
1577-
/*
1578-
* | dedup 2 a, b keepempty=true
1579-
* LogicalProject(...)
1580-
* +- LogicalFilter(condition=[OR(IS NULL(a), IS NULL(b), <=(_row_number_dedup_, 1))])
1581-
* +- LogicalProject(..., _row_number_dedup_=[ROW_NUMBER() OVER (PARTITION BY a, b ORDER BY a, b)])
1582-
* +- ...
1583-
*/
1584-
RexNode rowNumber =
1585-
context
1586-
.relBuilder
1587-
.aggregateCall(SqlStdOperatorTable.ROW_NUMBER)
1588-
.over()
1589-
.partitionBy(dedupeFields)
1590-
.rowsTo(RexWindowBounds.CURRENT_ROW)
1591-
.as(ROW_NUMBER_COLUMN_FOR_DEDUP);
1592-
context.relBuilder.projectPlus(rowNumber);
1593-
RexNode _row_number_dedup_ = context.relBuilder.field(ROW_NUMBER_COLUMN_FOR_DEDUP);
1594-
// Filter (isnull('a) OR isnull('b) OR '_row_number_dedup_ <= n)
1595-
context.relBuilder.filter(
1596-
context.relBuilder.or(
1597-
context.relBuilder.or(dedupeFields.stream().map(context.relBuilder::isNull).toList()),
1598-
context.relBuilder.lessThanOrEqual(
1599-
_row_number_dedup_, context.relBuilder.literal(allowedDuplication))));
1600-
// DropColumns('_row_number_dedup_)
1601-
context.relBuilder.projectExcept(_row_number_dedup_);
1602-
}
1603-
1604-
private static void buildDedupNotNull(
1605-
CalcitePlanContext context,
1606-
List<RexNode> dedupeFields,
1607-
Integer allowedDuplication,
1608-
boolean fromJoinMaxOption) {
1609-
/*
1610-
* | dedup 2 a, b keepempty=false
1611-
* LogicalProject(...)
1612-
* +- LogicalFilter(condition=[<=(_row_number_dedup_, n)]))
1613-
* +- LogicalProject(..., _row_number_dedup_=[ROW_NUMBER() OVER (PARTITION BY a, b ORDER BY a, b)])
1614-
* +- LogicalFilter(condition=[AND(IS NOT NULL(a), IS NOT NULL(b))])
1615-
* +- ...
1616-
*/
1617-
// Filter (isnotnull('a) AND isnotnull('b))
1618-
String rowNumberAlias =
1619-
fromJoinMaxOption ? ROW_NUMBER_COLUMN_FOR_JOIN_MAX_DEDUP : ROW_NUMBER_COLUMN_FOR_DEDUP;
1620-
context.relBuilder.filter(
1621-
context.relBuilder.and(dedupeFields.stream().map(context.relBuilder::isNotNull).toList()));
1622-
// Window [row_number() windowspecdefinition('a, 'b, 'a ASC NULLS FIRST, 'b ASC NULLS FIRST,
1623-
// specifiedwindowoundedpreceding$(), currentrow$())) AS _row_number_dedup_], ['a, 'b], ['a ASC
1624-
// NULLS FIRST, 'b ASC NULLS FIRST]
1625-
RexNode rowNumber =
1626-
context
1627-
.relBuilder
1628-
.aggregateCall(SqlStdOperatorTable.ROW_NUMBER)
1629-
.over()
1630-
.partitionBy(dedupeFields)
1631-
.rowsTo(RexWindowBounds.CURRENT_ROW)
1632-
.as(rowNumberAlias);
1633-
context.relBuilder.projectPlus(rowNumber);
1634-
RexNode rowNumberField = context.relBuilder.field(rowNumberAlias);
1635-
// Filter ('_row_number_dedup_ <= n)
1636-
context.relBuilder.filter(
1637-
context.relBuilder.lessThanOrEqual(
1638-
rowNumberField, context.relBuilder.literal(allowedDuplication)));
1639-
// DropColumns('_row_number_dedup_)
1640-
context.relBuilder.projectExcept(rowNumberField);
1641-
}
1642-
16431575
@Override
16441576
public RelNode visitWindow(Window node, CalcitePlanContext context) {
16451577
visitChildren(node, context);

core/src/main/java/org/opensearch/sql/calcite/CalciteRexNodeVisitor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@
7070
import org.opensearch.sql.ast.expression.subquery.ScalarSubquery;
7171
import org.opensearch.sql.ast.expression.subquery.SubqueryExpression;
7272
import org.opensearch.sql.ast.tree.UnresolvedPlan;
73-
import org.opensearch.sql.calcite.plan.LogicalSystemLimit;
74-
import org.opensearch.sql.calcite.plan.LogicalSystemLimit.SystemLimitType;
73+
import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit;
74+
import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit.SystemLimitType;
7575
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;
7676
import org.opensearch.sql.calcite.utils.PlanUtils;
7777
import org.opensearch.sql.calcite.utils.SubsearchUtils;
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.calcite.plan.rel;
7+
8+
import java.util.List;
9+
import lombok.Getter;
10+
import org.apache.calcite.plan.RelOptCluster;
11+
import org.apache.calcite.plan.RelOptPlanner;
12+
import org.apache.calcite.plan.RelTraitSet;
13+
import org.apache.calcite.rel.RelNode;
14+
import org.apache.calcite.rel.RelWriter;
15+
import org.apache.calcite.rel.SingleRel;
16+
import org.apache.calcite.rex.RexNode;
17+
import org.opensearch.sql.exception.CalciteUnsupportedException;
18+
19+
/** Relational expression representing a dedup command. */
20+
@Getter
21+
public abstract class Dedup extends SingleRel {
22+
final List<RexNode> dedupeFields;
23+
final Integer allowedDuplication;
24+
final Boolean keepEmpty;
25+
final Boolean consecutive;
26+
27+
/** */
28+
protected Dedup(
29+
RelOptCluster cluster,
30+
RelTraitSet traitSet,
31+
RelNode input,
32+
List<RexNode> dedupeFields,
33+
Integer allowedDuplication,
34+
Boolean keepEmpty,
35+
Boolean consecutive) {
36+
super(cluster, traitSet, input);
37+
if (allowedDuplication <= 0) {
38+
throw new IllegalArgumentException("Number of duplicate events must be greater than 0");
39+
}
40+
if (consecutive) {
41+
throw new CalciteUnsupportedException("Consecutive deduplication is unsupported in Calcite");
42+
}
43+
this.dedupeFields = dedupeFields;
44+
this.allowedDuplication = allowedDuplication;
45+
this.keepEmpty = keepEmpty;
46+
this.consecutive = consecutive;
47+
}
48+
49+
@Override
50+
public final RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
51+
return copy(
52+
traitSet,
53+
sole(inputs),
54+
this.dedupeFields,
55+
this.allowedDuplication,
56+
this.keepEmpty,
57+
this.consecutive);
58+
}
59+
60+
public abstract Dedup copy(
61+
RelTraitSet traitSet,
62+
RelNode input,
63+
List<RexNode> dedupeFields,
64+
Integer allowedDuplication,
65+
Boolean keepEmpty,
66+
Boolean consecutive);
67+
68+
public Dedup copy(RelNode input, List<RexNode> dedupeFields) {
69+
return this.copy(
70+
this.getTraitSet(),
71+
input,
72+
dedupeFields,
73+
this.allowedDuplication,
74+
this.keepEmpty,
75+
this.consecutive);
76+
}
77+
78+
@Override
79+
public RelWriter explainTerms(RelWriter pw) {
80+
return super.explainTerms(pw)
81+
.item("dedup_fields", dedupeFields)
82+
.item("allowed_dedup", allowedDuplication)
83+
.item("keepEmpty", keepEmpty)
84+
.item("consecutive", consecutive);
85+
}
86+
87+
@Override
88+
public void register(RelOptPlanner planner) {}
89+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.calcite.plan.rel;
7+
8+
import static org.opensearch.sql.calcite.plan.rule.PPLDedupConvertRule.DEDUP_CONVERT_RULE;
9+
10+
import java.util.List;
11+
import org.apache.calcite.plan.Convention;
12+
import org.apache.calcite.plan.RelOptCluster;
13+
import org.apache.calcite.plan.RelOptPlanner;
14+
import org.apache.calcite.plan.RelTraitSet;
15+
import org.apache.calcite.rel.RelNode;
16+
import org.apache.calcite.rex.RexNode;
17+
18+
public class LogicalDedup extends Dedup {
19+
20+
protected LogicalDedup(
21+
RelOptCluster cluster,
22+
RelTraitSet traitSet,
23+
RelNode input,
24+
List<RexNode> dedupeFields,
25+
Integer allowedDuplication,
26+
Boolean keepEmpty,
27+
Boolean consecutive) {
28+
super(cluster, traitSet, input, dedupeFields, allowedDuplication, keepEmpty, consecutive);
29+
}
30+
31+
@Override
32+
public Dedup copy(
33+
RelTraitSet traitSet,
34+
RelNode input,
35+
List<RexNode> dedupeFields,
36+
Integer allowedDuplication,
37+
Boolean keepEmpty,
38+
Boolean consecutive) {
39+
assert traitSet.containsIfApplicable(Convention.NONE);
40+
return new LogicalDedup(
41+
getCluster(), traitSet, input, dedupeFields, allowedDuplication, keepEmpty, consecutive);
42+
}
43+
44+
public static LogicalDedup create(
45+
RelNode input,
46+
List<RexNode> dedupeFields,
47+
Integer allowedDuplication,
48+
Boolean keepEmpty,
49+
Boolean consecutive) {
50+
final RelOptCluster cluster = input.getCluster();
51+
RelTraitSet traitSet = cluster.traitSetOf(Convention.NONE);
52+
return new LogicalDedup(
53+
cluster, traitSet, input, dedupeFields, allowedDuplication, keepEmpty, consecutive);
54+
}
55+
56+
@Override
57+
public void register(RelOptPlanner planner) {
58+
planner.addRule(DEDUP_CONVERT_RULE);
59+
}
60+
}

core/src/main/java/org/opensearch/sql/calcite/plan/LogicalSystemLimit.java renamed to core/src/main/java/org/opensearch/sql/calcite/plan/rel/LogicalSystemLimit.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package org.opensearch.sql.calcite.plan;
6+
package org.opensearch.sql.calcite.plan.rel;
77

88
import java.util.Collections;
99
import java.util.List;

core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchTableScan.java renamed to core/src/main/java/org/opensearch/sql/calcite/plan/rel/OpenSearchTableScan.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package org.opensearch.sql.calcite.plan;
6+
package org.opensearch.sql.calcite.plan.rel;
77

88
import com.google.common.collect.ImmutableList;
99
import org.apache.calcite.adapter.enumerable.EnumerableConvention;
@@ -14,6 +14,7 @@
1414
import org.apache.calcite.plan.RelOptTable;
1515
import org.apache.calcite.rel.core.TableScan;
1616
import org.apache.calcite.rel.rules.CoreRules;
17+
import org.opensearch.sql.calcite.plan.rule.OpenSearchRules;
1718

1819
/** Relational expression representing a scan of an OpenSearch type. */
1920
public abstract class OpenSearchTableScan extends TableScan implements EnumerableRel {

core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchRuleConfig.java renamed to core/src/main/java/org/opensearch/sql/calcite/plan/rule/OpenSearchRuleConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package org.opensearch.sql.calcite.plan;
6+
package org.opensearch.sql.calcite.plan.rule;
77

88
import org.apache.calcite.plan.Contexts;
99
import org.apache.calcite.plan.RelRule;

core/src/main/java/org/opensearch/sql/calcite/plan/OpenSearchRules.java renamed to core/src/main/java/org/opensearch/sql/calcite/plan/rule/OpenSearchRules.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package org.opensearch.sql.calcite.plan;
6+
package org.opensearch.sql.calcite.plan.rule;
77

88
import com.google.common.collect.ImmutableList;
99
import java.util.List;

core/src/main/java/org/opensearch/sql/calcite/plan/PPLAggGroupMergeRule.java renamed to core/src/main/java/org/opensearch/sql/calcite/plan/rule/PPLAggGroupMergeRule.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package org.opensearch.sql.calcite.plan;
6+
package org.opensearch.sql.calcite.plan.rule;
77

88
import java.util.ArrayList;
99
import java.util.Collection;

0 commit comments

Comments
 (0)