Skip to content

Commit 4d2e06f

Browse files
authored
Revert "test(14691): demonstrate EnforceSorting can remove a needed coalesce (#14919)" (#14950)
This reverts commit 32224b4.
1 parent 382e232 commit 4d2e06f

File tree

4 files changed

+4
-139
lines changed

4 files changed

+4
-139
lines changed

datafusion/core/tests/physical_optimizer/enforce_distribution.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ fn csv_exec_multiple_sorted(output_ordering: Vec<LexOrdering>) -> Arc<DataSource
220220
.build()
221221
}
222222

223-
pub(crate) fn projection_exec_with_alias(
223+
fn projection_exec_with_alias(
224224
input: Arc<dyn ExecutionPlan>,
225225
alias_pairs: Vec<(String, String)>,
226226
) -> Arc<dyn ExecutionPlan> {

datafusion/core/tests/physical_optimizer/enforce_sorting.rs

Lines changed: 1 addition & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,11 @@
1717

1818
use std::sync::Arc;
1919

20-
use crate::physical_optimizer::enforce_distribution::projection_exec_with_alias;
21-
use crate::physical_optimizer::sanity_checker::{
22-
assert_sanity_check, assert_sanity_check_err,
23-
};
2420
use crate::physical_optimizer::test_utils::{
2521
aggregate_exec, bounded_window_exec, check_integrity, coalesce_batches_exec,
2622
coalesce_partitions_exec, create_test_schema, create_test_schema2,
2723
create_test_schema3, filter_exec, global_limit_exec, hash_join_exec, limit_exec,
28-
local_limit_exec, memory_exec, parquet_exec, parquet_exec_with_stats,
29-
repartition_exec, schema, single_partitioned_aggregate, sort_exec, sort_expr,
24+
local_limit_exec, memory_exec, parquet_exec, repartition_exec, sort_exec, sort_expr,
3025
sort_expr_options, sort_merge_join_exec, sort_preserving_merge_exec,
3126
sort_preserving_merge_exec_with_fetch, spr_repartition_exec, stream_exec_ordered,
3227
union_exec, RequirementsTestExec,
@@ -3351,62 +3346,3 @@ async fn test_window_partial_constant_and_set_monotonicity() -> Result<()> {
33513346

33523347
Ok(())
33533348
}
3354-
3355-
#[tokio::test]
3356-
async fn test_preserve_needed_coalesce() -> Result<()> {
3357-
// Input to EnforceSorting, from our test case.
3358-
let plan = projection_exec_with_alias(
3359-
union_exec(vec![parquet_exec_with_stats(); 2]),
3360-
vec![
3361-
("a".to_string(), "a".to_string()),
3362-
("b".to_string(), "value".to_string()),
3363-
],
3364-
);
3365-
let plan = Arc::new(CoalescePartitionsExec::new(plan));
3366-
let schema = schema();
3367-
let sort_key = LexOrdering::new(vec![PhysicalSortExpr {
3368-
expr: col("a", &schema).unwrap(),
3369-
options: SortOptions::default(),
3370-
}]);
3371-
let plan: Arc<dyn ExecutionPlan> =
3372-
single_partitioned_aggregate(plan, vec![("a".to_string(), "a1".to_string())]);
3373-
let plan = sort_exec(sort_key, plan);
3374-
3375-
// Starting plan: as in our test case.
3376-
assert_eq!(
3377-
get_plan_string(&plan),
3378-
vec![
3379-
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
3380-
" AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]",
3381-
" CoalescePartitionsExec",
3382-
" ProjectionExec: expr=[a@0 as a, b@1 as value]",
3383-
" UnionExec",
3384-
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
3385-
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
3386-
],
3387-
);
3388-
// Test: plan is valid.
3389-
assert_sanity_check(&plan, true);
3390-
3391-
// EnforceSorting will remove the coalesce, and add an SPM further up (above the aggregate).
3392-
let optimizer = EnforceSorting::new();
3393-
let optimized = optimizer.optimize(plan, &Default::default())?;
3394-
assert_eq!(
3395-
get_plan_string(&optimized),
3396-
vec![
3397-
"SortPreservingMergeExec: [a@0 ASC]",
3398-
" SortExec: expr=[a@0 ASC], preserve_partitioning=[true]",
3399-
" AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]",
3400-
" ProjectionExec: expr=[a@0 as a, b@1 as value]",
3401-
" UnionExec",
3402-
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
3403-
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
3404-
],
3405-
);
3406-
3407-
// Bug: Plan is now invalid.
3408-
let err = "does not satisfy distribution requirements: HashPartitioned[[a@0]]). Child-0 output partitioning: UnknownPartitioning(2)";
3409-
assert_sanity_check_err(&optimized, err);
3410-
3411-
Ok(())
3412-
}

datafusion/core/tests/physical_optimizer/sanity_checker.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,7 @@ fn create_test_schema2() -> SchemaRef {
388388
}
389389

390390
/// Check if sanity checker should accept or reject plans.
391-
pub(crate) fn assert_sanity_check(plan: &Arc<dyn ExecutionPlan>, is_sane: bool) {
391+
fn assert_sanity_check(plan: &Arc<dyn ExecutionPlan>, is_sane: bool) {
392392
let sanity_checker = SanityCheckPlan::new();
393393
let opts = ConfigOptions::default();
394394
assert_eq!(
@@ -397,14 +397,6 @@ pub(crate) fn assert_sanity_check(plan: &Arc<dyn ExecutionPlan>, is_sane: bool)
397397
);
398398
}
399399

400-
/// Assert reason for sanity check failure.
401-
pub(crate) fn assert_sanity_check_err(plan: &Arc<dyn ExecutionPlan>, err: &str) {
402-
let sanity_checker = SanityCheckPlan::new();
403-
let opts = ConfigOptions::default();
404-
let error = sanity_checker.optimize(plan.clone(), &opts).unwrap_err();
405-
assert!(error.message().contains(err));
406-
}
407-
408400
/// Check if the plan we created is as expected by comparing the plan
409401
/// formatted as a string.
410402
fn assert_plan(plan: &dyn ExecutionPlan, expected_lines: Vec<&str>) {

datafusion/core/tests/physical_optimizer/test_utils.rs

Lines changed: 1 addition & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,9 @@ use datafusion::datasource::memory::MemorySourceConfig;
3030
use datafusion::datasource::physical_plan::ParquetSource;
3131
use datafusion::datasource::source::DataSourceExec;
3232
use datafusion_common::config::ConfigOptions;
33-
use datafusion_common::stats::Precision;
3433
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
3534
use datafusion_common::utils::expr::COUNT_STAR_EXPANSION;
36-
use datafusion_common::{ColumnStatistics, JoinType, Result, Statistics};
35+
use datafusion_common::{JoinType, Result};
3736
use datafusion_datasource::file_scan_config::FileScanConfig;
3837
use datafusion_execution::object_store::ObjectStoreUrl;
3938
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
@@ -103,44 +102,6 @@ pub fn schema() -> SchemaRef {
103102
]))
104103
}
105104

106-
fn int64_stats() -> ColumnStatistics {
107-
ColumnStatistics {
108-
null_count: Precision::Absent,
109-
sum_value: Precision::Absent,
110-
max_value: Precision::Exact(1_000_000.into()),
111-
min_value: Precision::Exact(0.into()),
112-
distinct_count: Precision::Absent,
113-
}
114-
}
115-
116-
fn column_stats() -> Vec<ColumnStatistics> {
117-
vec![
118-
int64_stats(), // a
119-
int64_stats(), // b
120-
int64_stats(), // c
121-
ColumnStatistics::default(),
122-
ColumnStatistics::default(),
123-
]
124-
}
125-
126-
/// Create parquet datasource exec using schema from [`schema`].
127-
pub(crate) fn parquet_exec_with_stats() -> Arc<DataSourceExec> {
128-
let mut statistics = Statistics::new_unknown(&schema());
129-
statistics.num_rows = Precision::Inexact(10);
130-
statistics.column_statistics = column_stats();
131-
132-
let config = FileScanConfig::new(
133-
ObjectStoreUrl::parse("test:///").unwrap(),
134-
schema(),
135-
Arc::new(ParquetSource::new(Default::default())),
136-
)
137-
.with_file(PartitionedFile::new("x".to_string(), 10000))
138-
.with_statistics(statistics);
139-
assert_eq!(config.statistics.num_rows, Precision::Inexact(10));
140-
141-
config.build()
142-
}
143-
144105
pub fn create_test_schema() -> Result<SchemaRef> {
145106
let nullable_column = Field::new("nullable_col", DataType::Int32, true);
146107
let non_nullable_column = Field::new("non_nullable_col", DataType::Int32, false);
@@ -561,30 +522,6 @@ pub fn build_group_by(input_schema: &SchemaRef, columns: Vec<String>) -> Physica
561522
PhysicalGroupBy::new_single(group_by_expr.clone())
562523
}
563524

564-
pub(crate) fn single_partitioned_aggregate(
565-
input: Arc<dyn ExecutionPlan>,
566-
alias_pairs: Vec<(String, String)>,
567-
) -> Arc<dyn ExecutionPlan> {
568-
let schema = schema();
569-
let group_by = alias_pairs
570-
.iter()
571-
.map(|(column, alias)| (col(column, &input.schema()).unwrap(), alias.to_string()))
572-
.collect::<Vec<_>>();
573-
let group_by = PhysicalGroupBy::new_single(group_by);
574-
575-
Arc::new(
576-
AggregateExec::try_new(
577-
AggregateMode::SinglePartitioned,
578-
group_by,
579-
vec![],
580-
vec![],
581-
input,
582-
schema,
583-
)
584-
.unwrap(),
585-
)
586-
}
587-
588525
pub fn assert_plan_matches_expected(
589526
plan: &Arc<dyn ExecutionPlan>,
590527
expected: &[&str],

0 commit comments

Comments
 (0)