Skip to content

feat: Add experimental auto mode for COMET_PARQUET_SCAN_IMPL #1747

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 56 commits into from
Jun 13, 2025
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
e47fc0a
Move some logic into scan execs
andygrove May 16, 2025
7a50855
improve type checking for sinks
andygrove May 16, 2025
2a78694
move usingDataSourceExecWithIncompatTypes* to CometTestBase
andygrove May 16, 2025
8592b8f
scalastyle
andygrove May 16, 2025
52875ff
scalastyle
andygrove May 16, 2025
f5fd69b
scalastyle
andygrove May 16, 2025
0d11599
fix regression
andygrove May 16, 2025
4d5c4b0
add shuffle fuzz test:
andygrove May 16, 2025
332be9b
scalastyle
andygrove May 16, 2025
ada181a
scalastyle
andygrove May 16, 2025
e8b95d4
oops
andygrove May 16, 2025
e3c5b09
fix?
andygrove May 16, 2025
0920360
remove some config uses and add TODO comments for others
andygrove May 16, 2025
ddcee36
fix?
andygrove May 16, 2025
1a106dc
scalastyle
andygrove May 16, 2025
3634fc1
fix?
andygrove May 16, 2025
13eea7a
Merge branch 'scan-refactor-2' into scan-refactor-3
andygrove May 17, 2025
e5e89f3
improve
andygrove May 17, 2025
a6fd752
improve
andygrove May 17, 2025
6e32787
fix
andygrove May 17, 2025
773743e
fix
andygrove May 17, 2025
ead2362
refactor
andygrove May 17, 2025
d963d31
improve
andygrove May 17, 2025
0f49dc0
improve
andygrove May 17, 2025
f20f846
fix
andygrove May 17, 2025
ac3c99e
update diffs
andygrove May 17, 2025
deccbfe
update diffs
andygrove May 17, 2025
64876cd
add auto scan impl mode
andygrove May 17, 2025
9bf4aeb
fix
andygrove May 17, 2025
728d8bd
Merge branch 'scan-refactor-3' into scan-auto-mode
andygrove May 17, 2025
8868276
update test
andygrove May 17, 2025
82a930c
update test
andygrove May 17, 2025
e6ae57b
scalastyle
andygrove May 17, 2025
86351d1
fix miri
andygrove May 18, 2025
5638dda
Merge branch 'scan-refactor-3' into scan-auto-mode
andygrove May 18, 2025
28f13ea
upmerge
andygrove May 20, 2025
be4bac2
update docs
andygrove May 20, 2025
580fdeb
update docs
andygrove May 20, 2025
1dd4f08
add CI workflow
andygrove May 20, 2025
77e7e39
experimenting
andygrove May 20, 2025
987608f
scalastyle
andygrove May 20, 2025
fbde112
upmerge
andygrove May 29, 2025
6e2ac70
revert change to default value
andygrove Jun 2, 2025
e765956
upmerge
andygrove Jun 2, 2025
5719a38
fix
andygrove Jun 2, 2025
972d446
address feedback
andygrove Jun 3, 2025
9a39b23
Merge remote-tracking branch 'apache/main' into scan-auto-mode
andygrove Jun 12, 2025
4203798
explicitly set scan impl in tests
andygrove Jun 13, 2025
255c8d9
Merge remote-tracking branch 'apache/main' into scan-auto-mode
andygrove Jun 13, 2025
eee6ff9
run some tests in auto mode
andygrove Jun 13, 2025
292cf31
fix
andygrove Jun 13, 2025
fad4e53
fix one test
andygrove Jun 13, 2025
cc044b8
fix
andygrove Jun 13, 2025
bd1858f
update compat docs
andygrove Jun 13, 2025
a852569
address feedback
andygrove Jun 13, 2025
630b774
fmt
andygrove Jun 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ object CometConf extends ShimCometConf {
val SCAN_NATIVE_COMET = "native_comet"
val SCAN_NATIVE_DATAFUSION = "native_datafusion"
val SCAN_NATIVE_ICEBERG_COMPAT = "native_iceberg_compat"
val SCAN_AUTO = "auto"

val COMET_NATIVE_SCAN_IMPL: ConfigEntry[String] = conf("spark.comet.scan.impl")
.doc(
Expand All @@ -99,7 +100,8 @@ object CometConf extends ShimCometConf {
.internal()
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(Set(SCAN_NATIVE_COMET, SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT))
.checkValues(
Set(SCAN_NATIVE_COMET, SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT, SCAN_AUTO))
.createWithDefault(sys.env
.getOrElse("COMET_PARQUET_SCAN_IMPL", SCAN_NATIVE_COMET)
.toLowerCase(Locale.ROOT))
Expand Down
17 changes: 11 additions & 6 deletions docs/source/user-guide/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,6 @@ Comet aims to provide consistent results with the version of Apache Spark that i

This guide offers information about areas of functionality where there are known differences.

# Compatibility Guide

Comet aims to provide consistent results with the version of Apache Spark that is being used.

This guide offers information about areas of functionality where there are known differences.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section appeared twice

## Parquet Scans

Comet currently has three distinct implementations of the Parquet scan operator. The configuration property
Expand All @@ -56,6 +50,8 @@ implementation:

The new scans currently have the following limitations:

Issues common to both `native_datafusion` and `native_iceberg_compat`:

- When reading Parquet files written by systems other than Spark that contain columns with the logical types `UINT_8`
or `UINT_16`, Comet will produce different results than Spark because Spark does not preserve or understand these
logical types. Arrow-based readers, such as DataFusion and Comet do respect these types and read the data as unsigned
Expand All @@ -65,10 +61,19 @@ types (regardless of the logical type). This behavior can be disabled by setting
- Reading legacy INT96 timestamps contained within complex types can produce different results to Spark
- There is a known performance issue when pushing filters down to Parquet. See the [Comet Tuning Guide] for more
information.
- Reading maps containing complex types can result in errors or incorrect results [#1754]
- `PARQUET_FIELD_ID_READ_ENABLED` is not respected [#1758]
- There are failures in the Spark SQL test suite when enabling these new scans (tracking issues: [#1542] and [#1545]).

Issues specific to `native_datafusion`:

- Bucketed scans are not supported
- No support for row indexes

[#1545]: https://github.com/apache/datafusion-comet/issues/1545
[#1542]: https://github.com/apache/datafusion-comet/issues/1542
[#1754]: https://github.com/apache/datafusion-comet/issues/1754
[#1758]: https://github.com/apache/datafusion-comet/issues/1758
[Comet Tuning Guide]: tuning.md

## ANSI mode
Expand Down
17 changes: 11 additions & 6 deletions docs/templates/compatibility-template.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,6 @@ Comet aims to provide consistent results with the version of Apache Spark that i

This guide offers information about areas of functionality where there are known differences.

# Compatibility Guide

Comet aims to provide consistent results with the version of Apache Spark that is being used.

This guide offers information about areas of functionality where there are known differences.

## Parquet Scans

Comet currently has three distinct implementations of the Parquet scan operator. The configuration property
Expand All @@ -56,6 +50,8 @@ implementation:

The new scans currently have the following limitations:

Issues common to both `native_datafusion` and `native_iceberg_compat`:

- When reading Parquet files written by systems other than Spark that contain columns with the logical types `UINT_8`
or `UINT_16`, Comet will produce different results than Spark because Spark does not preserve or understand these
logical types. Arrow-based readers, such as DataFusion and Comet do respect these types and read the data as unsigned
Expand All @@ -65,10 +61,19 @@ The new scans currently have the following limitations:
- Reading legacy INT96 timestamps contained within complex types can produce different results to Spark
- There is a known performance issue when pushing filters down to Parquet. See the [Comet Tuning Guide] for more
information.
- Reading maps containing complex types can result in errors or incorrect results [#1754]
- `PARQUET_FIELD_ID_READ_ENABLED` is not respected [#1758]
- There are failures in the Spark SQL test suite when enabling these new scans (tracking issues: [#1542] and [#1545]).

Issues specific to `native_datafusion`:

- Bucketed scans are not supported
- No support for row indexes

[#1545]: https://github.com/apache/datafusion-comet/issues/1545
[#1542]: https://github.com/apache/datafusion-comet/issues/1542
[#1754]: https://github.com/apache/datafusion-comet/issues/1754
[#1758]: https://github.com/apache/datafusion-comet/issues/1758
[Comet Tuning Guide]: tuning.md

## ANSI mode
Expand Down
28 changes: 24 additions & 4 deletions spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,21 +93,41 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] {
return withInfos(scanExec, fallbackReasons.toSet)
}

val scanImpl = COMET_NATIVE_SCAN_IMPL.get()
if (scanImpl == CometConf.SCAN_NATIVE_DATAFUSION && !COMET_EXEC_ENABLED.get()) {
var scanImpl = COMET_NATIVE_SCAN_IMPL.get()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also log a message to indicate that auto scan was enabled (and the scan implementation it selected)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is a good idea. I will add that today.


// if scan is auto then pick best available scan
if (scanImpl == SCAN_AUTO) {
val typeChecker = CometScanTypeChecker(SCAN_NATIVE_DATAFUSION)
val schemaSupported =
typeChecker.isSchemaSupported(scanExec.requiredSchema, fallbackReasons)
val partitionSchemaSupported =
typeChecker.isSchemaSupported(r.partitionSchema, fallbackReasons)

// TODO these checks are not yet exhaustive. For example, native_datafusion does
// not support reading from object stores such as S3 yet

if (COMET_EXEC_ENABLED
.get() && schemaSupported && partitionSchemaSupported && !scanExec.bucketedScan) {
scanImpl = SCAN_NATIVE_DATAFUSION
} else {
scanImpl = SCAN_NATIVE_COMET
}
}

if (scanImpl == SCAN_NATIVE_DATAFUSION && !COMET_EXEC_ENABLED.get()) {
fallbackReasons +=
s"Full native scan disabled because ${COMET_EXEC_ENABLED.key} disabled"
return withInfos(scanExec, fallbackReasons.toSet)
}

if (scanImpl == CometConf.SCAN_NATIVE_DATAFUSION && scanExec.bucketedScan) {
if (scanImpl == SCAN_NATIVE_DATAFUSION && scanExec.bucketedScan) {
// https://github.com/apache/datafusion-comet/issues/1719
fallbackReasons +=
"Full native scan disabled because bucketed scan is not supported"
return withInfos(scanExec, fallbackReasons.toSet)
}

val typeChecker = new CometScanTypeChecker(scanImpl)
val typeChecker = CometScanTypeChecker(scanImpl)
val schemaSupported =
typeChecker.isSchemaSupported(scanExec.requiredSchema, fallbackReasons)
val partitionSchemaSupported =
Expand Down
Loading