diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 9e5264d8d4f3..1fb34ee93fb4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -31,7 +31,7 @@ import org.apache.spark.internal.LogKeys.EXTENDED_EXPLAIN_GENERATOR import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, ExtendedExplainGenerator, Row} import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker} -import org.apache.spark.sql.catalyst.analysis.{LazyExpression, NameParameterizedQuery, UnsupportedOperationChecker} +import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, LazyExpression, NameParameterizedQuery, UnsupportedOperationChecker} import org.apache.spark.sql.catalyst.expressions.codegen.ByteCodeStats import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{AppendData, Command, CommandResult, CompoundBody, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, ReturnAnswer, Union, WithCTE} @@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.classic.SparkSession import org.apache.spark.sql.execution.adaptive.{AdaptiveExecutionContext, InsertAdaptiveSparkPlan} import org.apache.spark.sql.execution.bucketing.{CoalesceBucketsInJoin, DisableUnnecessaryBucketedScan} +import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand import org.apache.spark.sql.execution.dynamicpruning.PlanDynamicPruningFilters import org.apache.spark.sql.execution.exchange.EnsureRequirements import org.apache.spark.sql.execution.reuse.ReuseExchangeAndSubquery @@ -215,6 +216,14 @@ class QueryExecution( } def optimizedPlan: LogicalPlan = lazyOptimizedPlan.get + // Eliminate SubqueryAliases from the optimized plan to help correct the explain result. + def optimizedPlanWithoutSubqueries: LogicalPlan = { + optimizedPlan match { + case s: CreateDataSourceTableAsSelectCommand => + s.copy(query = EliminateSubqueryAliases(s.query)) + case _ => optimizedPlan + } + } def assertOptimized(): Unit = optimizedPlan @@ -365,7 +374,7 @@ class QueryExecution( } QueryPlan.append(analyzed, append, verbose, addSuffix, maxFields) append("\n== Optimized Logical Plan ==\n") - QueryPlan.append(optimizedPlan, append, verbose, addSuffix, maxFields) + QueryPlan.append(optimizedPlanWithoutSubqueries, append, verbose, addSuffix, maxFields) append("\n== Physical Plan ==\n") QueryPlan.append(executedPlan, append, verbose, addSuffix, maxFields) extendedExplainInfo(append, executedPlan) @@ -408,7 +417,8 @@ class QueryExecution( } // only show optimized logical plan and physical plan append("== Optimized Logical Plan ==\n") - QueryPlan.append(optimizedPlan, append, verbose = true, addSuffix = true, maxFields) + QueryPlan.append( + optimizedPlanWithoutSubqueries, append, verbose = true, addSuffix = true, maxFields) append("\n== Physical Plan ==\n") QueryPlan.append(executedPlan, append, verbose = true, addSuffix = false, maxFields) append("\n") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index ee21d7e970df..9144d00a52df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.trees.{LeafLike, UnaryLike} import org.apache.spark.sql.connector.ExternalCommandRunner import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.{CommandExecutionMode, ExplainMode, LeafExecNode, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.execution.streaming.IncrementalExecution @@ -65,6 +66,37 @@ case class ExecutedCommandExec(cmd: RunnableCommand) extends LeafExecNode { override lazy val metrics: Map[String, SQLMetric] = cmd.metrics + // Cache the optimized command to avoid recomputation + @transient private lazy val executedQuery: Option[QueryPlan[_]] = { + cmd match { + case cmd: CreateDataSourceTableAsSelectCommand => + try { + SparkSession.getActiveSession match { + case Some(spark) => + try { + val qe = spark.sessionState.executePlan(cmd.query) + Some(qe.executedPlan) + } catch { + case _: Exception => Some(cmd.query) + } + case None => Some(cmd.query) + } + } catch { + case _: Exception => Some(cmd.query) + } + case _ => None + } + } + + // Override to return the optimized command instead of the command + override def innerChildren: Seq[QueryPlan[_]] = { + cmd match { + case cmd: CreateDataSourceTableAsSelectCommand => + executedQuery.toSeq + case _ => cmd :: Nil + } + } + /** * A concrete command should override this lazy field to wrap up any side effects caused by the * command or any other computation that should be evaluated exactly once. The value of this field @@ -79,8 +111,6 @@ case class ExecutedCommandExec(cmd: RunnableCommand) extends LeafExecNode { cmd.run(session).map(converter(_).asInstanceOf[InternalRow]) } - override def innerChildren: Seq[QueryPlan[_]] = cmd :: Nil - override def output: Seq[Attribute] = cmd.output override def nodeName: String = "Execute " + cmd.nodeName diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index f29d2267f75f..e530c786788c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -23,7 +23,7 @@ import org.apache.spark.internal.LogKeys._ import org.apache.spark.internal.MDC import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, LogicalPlan, WithCTE} +import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDef, LogicalPlan, Statistics, WithCTE} import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, CharVarcharUtils} import org.apache.spark.sql.classic.ClassicConversions.castToImpl import org.apache.spark.sql.errors.QueryCompilationErrors @@ -149,6 +149,9 @@ case class CreateDataSourceTableAsSelectCommand( assert(query.resolved) override def innerChildren: Seq[LogicalPlan] = query :: Nil + // Override stats to return stats from the inner query + override def stats: Statistics = query.stats + override def run(sparkSession: SparkSession): Seq[Row] = { assert(table.tableType != CatalogTableType.VIEW) assert(table.provider.isDefined) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala index 95c2fcbd7b5d..5203e1004bcb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -300,4 +300,37 @@ class CreateTableAsSelectSuite extends DataSourceTest with SharedSparkSession { stop = 57)) } } + + test("SPARK-48660: EXPLAIN COST should exclude SubqueryAlias") { + withTable("source_table") { + // Create source table with data + sql(""" + CREATE TABLE source_table ( + id INT, + name STRING + ) USING PARQUET + """) + + // Get explain output for CTAS + val explainResult = sql(""" + EXPLAIN COST + CREATE TABLE target_table + USING PARQUET + AS SELECT * FROM source_table WHERE id > 0 + """).collect() + + val explainOutput = explainResult.map(_.getString(0)).mkString("\n") + + // The explain output should not eliminate SubqueryAlias + assert(!explainOutput.contains("SubqueryAlias"), + s"EXPLAIN COST output should not contain SubqueryAlias. Output: $explainOutput") + assert(explainOutput.contains("Statistics"), + s"EXPLAIN COST output should contain statistics information. Output: $explainOutput") + + // The explain output should contain pushdown + assert(explainOutput.contains("PushedFilters"), + s"EXPLAIN COST output should contain pushdown information. Output: $explainOutput") + } + } + }