diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 7c4dc7383b..aab0cbc17a 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -33,8 +33,8 @@ import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan import org.apache.spark.sql.internal.SQLConf import org.apache.comet.CometConf -import org.apache.comet.CometConf.{COMET_DPP_FALLBACK_ENABLED, COMET_EXEC_ENABLED, COMET_NATIVE_SCAN_IMPL, COMET_SCHEMA_EVOLUTION_ENABLED, SCAN_NATIVE_ICEBERG_COMPAT} -import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, isCometScanEnabled, withInfo} +import org.apache.comet.CometConf._ +import org.apache.comet.CometSparkSessionExtensions.{isCometLoaded, isCometScanEnabled, withInfo, withInfos} import org.apache.comet.parquet.{CometParquetScan, SupportsComet} case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] { @@ -86,14 +86,21 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] { val fallbackReasons = new ListBuffer[String]() if (!CometScanExec.isFileFormatSupported(r.fileFormat)) { fallbackReasons += s"Unsupported file format ${r.fileFormat}" - return withInfo(scanExec, fallbackReasons.mkString(", ")) + return withInfos(scanExec, fallbackReasons.toSet) } val scanImpl = COMET_NATIVE_SCAN_IMPL.get() if (scanImpl == CometConf.SCAN_NATIVE_DATAFUSION && !COMET_EXEC_ENABLED.get()) { fallbackReasons += s"Full native scan disabled because ${COMET_EXEC_ENABLED.key} disabled" - return withInfo(scanExec, fallbackReasons.mkString(", ")) + return withInfos(scanExec, fallbackReasons.toSet) + } + + if (scanImpl == CometConf.SCAN_NATIVE_DATAFUSION && scanExec.bucketedScan) { + // https://github.com/apache/datafusion-comet/issues/1719 + fallbackReasons += + "Full native scan disabled because bucketed scan is not supported" + return withInfos(scanExec, fallbackReasons.toSet) } val (schemaSupported, partitionSchemaSupported) = scanImpl match { @@ -117,7 +124,7 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] { if (schemaSupported && partitionSchemaSupported) { CometScanExec(scanExec, session) } else { - withInfo(scanExec, fallbackReasons.mkString(", ")) + withInfos(scanExec, fallbackReasons.toSet) } case _ => @@ -152,7 +159,7 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] { scanExec.copy(scan = cometScan), runtimeFilters = scanExec.runtimeFilters) } else { - withInfo(scanExec, fallbackReasons.mkString(", ")) + withInfos(scanExec, fallbackReasons.toSet) } // Iceberg scan @@ -179,7 +186,7 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] { scanExec.clone().asInstanceOf[BatchScanExec], runtimeFilters = scanExec.runtimeFilters) } else { - withInfo(scanExec, fallbackReasons.mkString(", ")) + withInfos(scanExec, fallbackReasons.toSet) } case other => diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala index aca8535cb0..28a369ead4 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala @@ -1470,6 +1470,10 @@ class CometExecSuite extends CometTestBase { } test("bucketed table") { + // native_datafusion actually passes this test, but in the case where buckets are pruned it fails, so we're + // falling back for bucketed scans entirely as a workaround. + // https://github.com/apache/datafusion-comet/issues/1719 + assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION) val bucketSpec = Some(BucketSpec(8, Seq("i", "j"), Nil)) val bucketedTableTestSpecLeft = BucketedTableTestSpec(bucketSpec, expectedShuffle = false) val bucketedTableTestSpecRight = BucketedTableTestSpec(bucketSpec, expectedShuffle = false)