88import io .trino .spi .Page ;
99import io .trino .spi .block .Block ;
1010import io .trino .spi .type .BooleanType ;
11- import java .util .function .BiFunction ;
1211import org .opensearch .sql .dqe .function .expression .BlockExpression ;
1312
1413/**
15- * Physical operator that filters rows based on a predicate. Supports two modes: a row-at-a-time
16- * BiFunction predicate (legacy) and a vectorized BlockExpression predicate (new).
14+ * Physical operator that filters rows based on a vectorized {@link BlockExpression} predicate.
1715 *
1816 * <p>Uses Trino's {@link Page#copyPositions(int[], int, int)} for efficient row selection.
1917 */
2018public class FilterOperator implements Operator {
2119
2220 private final Operator source ;
23- private final BiFunction <Page , Integer , Boolean > predicate ;
24- private final BlockExpression blockPredicate ;
25-
26- /**
27- * Create a FilterOperator with a row-at-a-time predicate.
28- *
29- * @param source child operator providing input pages
30- * @param predicate function that receives (page, position) and returns true to keep the row
31- */
32- public FilterOperator (Operator source , BiFunction <Page , Integer , Boolean > predicate ) {
33- this .source = source ;
34- this .predicate = predicate ;
35- this .blockPredicate = null ;
36- }
21+ private final BlockExpression predicate ;
3722
3823 /**
3924 * Create a FilterOperator with a vectorized BlockExpression predicate.
4025 *
4126 * @param source child operator providing input pages
42- * @param blockPredicate expression that produces a BOOLEAN Block for filtering
27+ * @param predicate expression that produces a BOOLEAN Block for filtering
4328 */
44- public FilterOperator (Operator source , BlockExpression blockPredicate ) {
29+ public FilterOperator (Operator source , BlockExpression predicate ) {
4530 this .source = source ;
46- this .predicate = null ;
47- this .blockPredicate = blockPredicate ;
31+ this .predicate = predicate ;
4832 }
4933
5034 @ Override
@@ -59,18 +43,10 @@ public Page processNextBatch() {
5943 int [] selectedPositions = new int [positionCount ];
6044 int selectedCount = 0 ;
6145
62- if (blockPredicate != null ) {
63- Block filterResult = blockPredicate .evaluate (page );
64- for (int pos = 0 ; pos < positionCount ; pos ++) {
65- if (!filterResult .isNull (pos ) && BooleanType .BOOLEAN .getBoolean (filterResult , pos )) {
66- selectedPositions [selectedCount ++] = pos ;
67- }
68- }
69- } else {
70- for (int pos = 0 ; pos < positionCount ; pos ++) {
71- if (predicate .apply (page , pos )) {
72- selectedPositions [selectedCount ++] = pos ;
73- }
46+ Block filterResult = predicate .evaluate (page );
47+ for (int pos = 0 ; pos < positionCount ; pos ++) {
48+ if (!filterResult .isNull (pos ) && BooleanType .BOOLEAN .getBoolean (filterResult , pos )) {
49+ selectedPositions [selectedCount ++] = pos ;
7450 }
7551 }
7652
0 commit comments