|
47 | 47 | import org.apache.calcite.adapter.enumerable.RexToLixTranslator; |
48 | 48 | import org.apache.calcite.plan.RelOptTable; |
49 | 49 | import org.apache.calcite.plan.ViewExpanders; |
| 50 | +import org.apache.calcite.rel.BiRel; |
| 51 | +import org.apache.calcite.rel.RelCollation; |
| 52 | +import org.apache.calcite.rel.RelHomogeneousShuttle; |
50 | 53 | import org.apache.calcite.rel.RelNode; |
51 | 54 | import org.apache.calcite.rel.core.Aggregate; |
52 | 55 | import org.apache.calcite.rel.core.JoinRelType; |
| 56 | +import org.apache.calcite.rel.core.SetOp; |
| 57 | +import org.apache.calcite.rel.core.Sort; |
| 58 | +import org.apache.calcite.rel.core.Uncollect; |
| 59 | +import org.apache.calcite.rel.logical.LogicalProject; |
| 60 | +import org.apache.calcite.rel.logical.LogicalSort; |
53 | 61 | import org.apache.calcite.rel.logical.LogicalValues; |
54 | 62 | import org.apache.calcite.rel.type.RelDataType; |
55 | 63 | import org.apache.calcite.rel.type.RelDataTypeFamily; |
|
145 | 153 | import org.opensearch.sql.ast.tree.Rex; |
146 | 154 | import org.opensearch.sql.ast.tree.SPath; |
147 | 155 | import org.opensearch.sql.ast.tree.Search; |
148 | | -import org.opensearch.sql.ast.tree.Sort; |
149 | 156 | import org.opensearch.sql.ast.tree.Sort.SortOption; |
150 | 157 | import org.opensearch.sql.ast.tree.StreamWindow; |
151 | 158 | import org.opensearch.sql.ast.tree.SubqueryAlias; |
@@ -679,7 +686,7 @@ private void removeFieldIfExists( |
679 | 686 | } |
680 | 687 |
|
681 | 688 | @Override |
682 | | - public RelNode visitSort(Sort node, CalcitePlanContext context) { |
| 689 | + public RelNode visitSort(org.opensearch.sql.ast.tree.Sort node, CalcitePlanContext context) { |
683 | 690 | visitChildren(node, context); |
684 | 691 | List<RexNode> sortList = |
685 | 692 | node.getSortList().stream() |
@@ -727,25 +734,161 @@ public RelNode visitHead(Head node, CalcitePlanContext context) { |
727 | 734 | return context.relBuilder.peek(); |
728 | 735 | } |
729 | 736 |
|
730 | | - private static final String REVERSE_ROW_NUM = "__reverse_row_num__"; |
| 737 | + /** |
| 738 | + * Backtrack through the RelNode tree to find the first Sort node with non-empty collation. Stops |
| 739 | + * at blocking operators that break ordering: |
| 740 | + * |
| 741 | + * <ul> |
| 742 | + * <li>Aggregate - aggregation destroys input ordering |
| 743 | + * <li>BiRel - covers Join, Correlate, and other binary relations |
| 744 | + * <li>SetOp - covers Union, Intersect, Except |
| 745 | + * <li>Uncollect - unnesting operation that may change ordering |
| 746 | + * <li>Project with window functions (RexOver) - ordering determined by window's ORDER BY |
| 747 | + * </ul> |
| 748 | + * |
| 749 | + * @param node the starting RelNode to backtrack from |
| 750 | + * @return the collation found, or null if no sort or blocking operator encountered |
| 751 | + */ |
| 752 | + private RelCollation backtrackForCollation(RelNode node) { |
| 753 | + while (node != null) { |
| 754 | + // Check for blocking operators that destroy collation |
| 755 | + // BiRel covers Join, Correlate, and other binary relations |
| 756 | + // SetOp covers Union, Intersect, Except |
| 757 | + // Uncollect unnests arrays/multisets which may change ordering |
| 758 | + if (node instanceof Aggregate |
| 759 | + || node instanceof BiRel |
| 760 | + || node instanceof SetOp |
| 761 | + || node instanceof Uncollect) { |
| 762 | + return null; |
| 763 | + } |
| 764 | + |
| 765 | + // Project with window functions has ordering determined by the window's ORDER BY clause |
| 766 | + // We should not destroy its output order by inserting a reversed sort |
| 767 | + if (node instanceof LogicalProject && ((LogicalProject) node).containsOver()) { |
| 768 | + return null; |
| 769 | + } |
| 770 | + |
| 771 | + // Check for Sort node with collation |
| 772 | + if (node instanceof Sort) { |
| 773 | + Sort sort = (Sort) node; |
| 774 | + if (sort.getCollation() != null && !sort.getCollation().getFieldCollations().isEmpty()) { |
| 775 | + return sort.getCollation(); |
| 776 | + } |
| 777 | + } |
| 778 | + |
| 779 | + // Continue to child node |
| 780 | + if (node.getInputs().isEmpty()) { |
| 781 | + break; |
| 782 | + } |
| 783 | + node = node.getInput(0); |
| 784 | + } |
| 785 | + return null; |
| 786 | + } |
| 787 | + |
| 788 | + /** |
| 789 | + * Insert a reversed sort node after finding the original sort in the tree. This rebuilds the tree |
| 790 | + * with the reversed sort inserted right after the original sort. |
| 791 | + * |
| 792 | + * @param root the root of the tree to rebuild |
| 793 | + * @param reversedCollation the reversed collation to insert |
| 794 | + * @param context the Calcite plan context |
| 795 | + * @return the rebuilt tree with reversed sort inserted |
| 796 | + */ |
| 797 | + private RelNode insertReversedSortInTree( |
| 798 | + RelNode root, RelCollation reversedCollation, CalcitePlanContext context) { |
| 799 | + return root.accept( |
| 800 | + new RelHomogeneousShuttle() { |
| 801 | + boolean sortFound = false; |
| 802 | + |
| 803 | + @Override |
| 804 | + public RelNode visit(RelNode other) { |
| 805 | + if (!sortFound && other instanceof Sort) { |
| 806 | + Sort sort = (Sort) other; |
| 807 | + // Treat a Sort with fetch or offset as a barrier (limit node). |
| 808 | + // Place the reversed sort above the barrier to preserve limit semantics, |
| 809 | + // rather than inserting below the downstream collation Sort. |
| 810 | + if (sort.fetch != null || sort.offset != null) { |
| 811 | + sortFound = true; |
| 812 | + RelNode visitedBarrier = super.visit(other); |
| 813 | + return LogicalSort.create(visitedBarrier, reversedCollation, null, null); |
| 814 | + } |
| 815 | + // Found a collation Sort - replace in-place with reversed collation. |
| 816 | + // Stacking a reversed sort on top would create consecutive sorts, and |
| 817 | + // Calcite's SortRemoveRule would merge them keeping the original direction. |
| 818 | + if (sort.getCollation() != null |
| 819 | + && !sort.getCollation().getFieldCollations().isEmpty()) { |
| 820 | + sortFound = true; |
| 821 | + RelNode visitedInput = sort.getInput().accept(this); |
| 822 | + return LogicalSort.create(visitedInput, reversedCollation, null, null); |
| 823 | + } |
| 824 | + } |
| 825 | + // For all other nodes, continue traversal |
| 826 | + return super.visit(other); |
| 827 | + } |
| 828 | + }); |
| 829 | + } |
731 | 830 |
|
732 | 831 | @Override |
733 | 832 | public RelNode visitReverse( |
734 | 833 | org.opensearch.sql.ast.tree.Reverse node, CalcitePlanContext context) { |
735 | 834 | visitChildren(node, context); |
736 | | - // Add ROW_NUMBER() column |
737 | | - RexNode rowNumber = |
738 | | - context |
739 | | - .relBuilder |
740 | | - .aggregateCall(SqlStdOperatorTable.ROW_NUMBER) |
741 | | - .over() |
742 | | - .rowsTo(RexWindowBounds.CURRENT_ROW) |
743 | | - .as(REVERSE_ROW_NUM); |
744 | | - context.relBuilder.projectPlus(rowNumber); |
745 | | - // Sort by row number descending |
746 | | - context.relBuilder.sort(context.relBuilder.desc(context.relBuilder.field(REVERSE_ROW_NUM))); |
747 | | - // Remove row number column |
748 | | - context.relBuilder.projectExcept(context.relBuilder.field(REVERSE_ROW_NUM)); |
| 835 | + |
| 836 | + // Check if there's an existing sort to reverse |
| 837 | + List<RelCollation> collations = |
| 838 | + context.relBuilder.getCluster().getMetadataQuery().collations(context.relBuilder.peek()); |
| 839 | + RelCollation collation = collations != null && !collations.isEmpty() ? collations.get(0) : null; |
| 840 | + |
| 841 | + if (collation != null && !collation.getFieldCollations().isEmpty()) { |
| 842 | + // If there's an existing sort, reverse its direction |
| 843 | + RelCollation reversedCollation = PlanUtils.reverseCollation(collation); |
| 844 | + RelNode currentNode = context.relBuilder.peek(); |
| 845 | + if (currentNode instanceof Sort) { |
| 846 | + Sort existingSort = (Sort) currentNode; |
| 847 | + if (existingSort.getCollation() != null |
| 848 | + && !existingSort.getCollation().getFieldCollations().isEmpty() |
| 849 | + && existingSort.fetch == null |
| 850 | + && existingSort.offset == null) { |
| 851 | + // Pure collation sort (no fetch/offset) - replace in-place to avoid consecutive |
| 852 | + // sorts. Calcite's SortRemoveRule merges consecutive LogicalSort nodes and keeps |
| 853 | + // the lower sort's direction, which discards the reversed direction. |
| 854 | + // Replacing in-place avoids this issue. |
| 855 | + RelCollation reversedFromSort = PlanUtils.reverseCollation(existingSort.getCollation()); |
| 856 | + RelNode replacedSort = |
| 857 | + LogicalSort.create(existingSort.getInput(), reversedFromSort, null, null); |
| 858 | + PlanUtils.replaceTop(context.relBuilder, replacedSort); |
| 859 | + } else { |
| 860 | + // Sort with fetch/offset (limit) or fetch-only Sort - add a separate reversed |
| 861 | + // sort on top so the "limit then reverse" semantics are preserved. |
| 862 | + context.relBuilder.sort(reversedCollation); |
| 863 | + } |
| 864 | + } else { |
| 865 | + context.relBuilder.sort(reversedCollation); |
| 866 | + } |
| 867 | + } else { |
| 868 | + // Collation not found on current node - try backtracking |
| 869 | + RelNode currentNode = context.relBuilder.peek(); |
| 870 | + RelCollation backtrackCollation = backtrackForCollation(currentNode); |
| 871 | + |
| 872 | + if (backtrackCollation != null && !backtrackCollation.getFieldCollations().isEmpty()) { |
| 873 | + // Found collation through backtracking - rebuild tree with reversed sort |
| 874 | + RelCollation reversedCollation = PlanUtils.reverseCollation(backtrackCollation); |
| 875 | + RelNode rebuiltTree = insertReversedSortInTree(currentNode, reversedCollation, context); |
| 876 | + // Replace the current node in the builder with the rebuilt tree |
| 877 | + context.relBuilder.build(); // Pop the current node |
| 878 | + context.relBuilder.push(rebuiltTree); // Push the rebuilt tree |
| 879 | + } else { |
| 880 | + // Check if @timestamp field exists in the row type |
| 881 | + List<String> fieldNames = context.relBuilder.peek().getRowType().getFieldNames(); |
| 882 | + if (fieldNames.contains(OpenSearchConstants.IMPLICIT_FIELD_TIMESTAMP)) { |
| 883 | + // If @timestamp exists, sort by it in descending order |
| 884 | + context.relBuilder.sort( |
| 885 | + context.relBuilder.desc( |
| 886 | + context.relBuilder.field(OpenSearchConstants.IMPLICIT_FIELD_TIMESTAMP))); |
| 887 | + } |
| 888 | + // If neither collation nor @timestamp exists, ignore the reverse command (no-op) |
| 889 | + } |
| 890 | + } |
| 891 | + |
749 | 892 | return context.relBuilder.peek(); |
750 | 893 | } |
751 | 894 |
|
|
0 commit comments