-
Notifications
You must be signed in to change notification settings - Fork 217
feat: Add experimental auto mode for COMET_PARQUET_SCAN_IMPL
#1747
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 38 commits
e47fc0a
7a50855
2a78694
8592b8f
52875ff
f5fd69b
0d11599
4d5c4b0
332be9b
ada181a
e8b95d4
e3c5b09
0920360
ddcee36
1a106dc
3634fc1
13eea7a
e5e89f3
a6fd752
6e32787
773743e
ead2362
d963d31
0f49dc0
f20f846
ac3c99e
deccbfe
64876cd
9bf4aeb
728d8bd
8868276
82a930c
e6ae57b
86351d1
5638dda
28f13ea
be4bac2
580fdeb
1dd4f08
77e7e39
987608f
fbde112
6e2ac70
e765956
5719a38
972d446
9a39b23
4203798
255c8d9
eee6ff9
292cf31
fad4e53
cc044b8
bd1858f
a852569
630b774
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -93,21 +93,41 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] { | |
return withInfos(scanExec, fallbackReasons.toSet) | ||
} | ||
|
||
val scanImpl = COMET_NATIVE_SCAN_IMPL.get() | ||
if (scanImpl == CometConf.SCAN_NATIVE_DATAFUSION && !COMET_EXEC_ENABLED.get()) { | ||
var scanImpl = COMET_NATIVE_SCAN_IMPL.get() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we also log a message to indicate that auto scan was enabled (and the scan implementation it selected)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that is a good idea. I will add that today. |
||
|
||
// if scan is auto then pick best available scan | ||
if (scanImpl == SCAN_AUTO) { | ||
val typeChecker = CometScanTypeChecker(SCAN_NATIVE_DATAFUSION) | ||
val schemaSupported = | ||
typeChecker.isSchemaSupported(scanExec.requiredSchema, fallbackReasons) | ||
val partitionSchemaSupported = | ||
typeChecker.isSchemaSupported(r.partitionSchema, fallbackReasons) | ||
|
||
// TODO these checks are not yet exhaustive. For example, native_datafusion does | ||
// not support reading from object stores such as S3 yet | ||
|
||
if (COMET_EXEC_ENABLED | ||
.get() && schemaSupported && partitionSchemaSupported && !scanExec.bucketedScan) { | ||
scanImpl = SCAN_NATIVE_DATAFUSION | ||
} else { | ||
scanImpl = SCAN_NATIVE_COMET | ||
} | ||
} | ||
|
||
if (scanImpl == SCAN_NATIVE_DATAFUSION && !COMET_EXEC_ENABLED.get()) { | ||
fallbackReasons += | ||
s"Full native scan disabled because ${COMET_EXEC_ENABLED.key} disabled" | ||
return withInfos(scanExec, fallbackReasons.toSet) | ||
} | ||
|
||
if (scanImpl == CometConf.SCAN_NATIVE_DATAFUSION && scanExec.bucketedScan) { | ||
if (scanImpl == SCAN_NATIVE_DATAFUSION && scanExec.bucketedScan) { | ||
// https://github.com/apache/datafusion-comet/issues/1719 | ||
fallbackReasons += | ||
"Full native scan disabled because bucketed scan is not supported" | ||
return withInfos(scanExec, fallbackReasons.toSet) | ||
} | ||
|
||
val typeChecker = new CometScanTypeChecker(scanImpl) | ||
val typeChecker = CometScanTypeChecker(scanImpl) | ||
val schemaSupported = | ||
typeChecker.isSchemaSupported(scanExec.requiredSchema, fallbackReasons) | ||
val partitionSchemaSupported = | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This section appeared twice