Skip to content

Commit d61c1ea

Browse files
heyihongHyukjinKwon
authored andcommitted
[SPARK-52335][CONNET][SQL] Unify the 'invalid bucket count' error for both Connect and Classic
### What changes were proposed in this pull request? This PR unifies the error handling for invalid bucket count validation between Spark Connect and Classic Spark. The main changes are: 1. Updated the error message in `error-conditions.json` for `INVALID_BUCKET_COUNT` to be more descriptive and consistent 2. Removed the legacy error condition `_LEGACY_ERROR_TEMP_1083` since its functionality is now merged into `INVALID_BUCKET_COUNT` 3. Removed the `InvalidCommandInput` class and its usage in Connect since we're now using the standard `AnalysisException` with `INVALID_BUCKET_COUNT` error condition 4. Updated the bucket count validation in `SparkConnectPlanner` to rely on the standard error handling path 5. Updated the test case in `SparkConnectProtoSuite` to verify the new unified error handling The key improvement is that both Connect and Classic now use the same error condition and message format for invalid bucket count errors, making the error handling more consistent across Spark's different interfaces. The error message now includes both the maximum allowed bucket count and the invalid value received, providing better guidance to users. This change simplifies the error handling codebase by removing duplicate error definitions and standardizing on a single error condition for this validation case. ### Why are the changes needed? The changes are needed to: 1. Provide consistent error messages across Spark Connect and Classic interfaces 2. Simplify error handling by removing duplicate error definitions 3. Improve error message clarity by including the maximum allowed bucket count in the error message 4. Maintain better code maintainability by reducing code duplication in error handling The unified error message now clearly indicates both the requirement (bucket count > 0) and the upper limit (≤ bucketing.maxBuckets), making it more helpful for users to understand and fix the issue. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? ` build/sbt "connect/testOnly *SparkConnectProtoSuite"` ### Was this patch authored or co-authored using generative AI tooling? No Closes #51039 from heyihong/SPARK-52335. Authored-by: Yihong He <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent 2367f58 commit d61c1ea

File tree

7 files changed

+12
-61
lines changed

7 files changed

+12
-61
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2382,7 +2382,7 @@
23822382
},
23832383
"INVALID_BUCKET_COUNT" : {
23842384
"message" : [
2385-
"BucketBy must specify a bucket count > 0, received <numBuckets> instead."
2385+
"Number of buckets should be greater than 0 but less than or equal to bucketing.maxBuckets (`<bucketingMaxBuckets>`). Got `<numBuckets>`."
23862386
],
23872387
"sqlState" : "22003"
23882388
},
@@ -7042,11 +7042,6 @@
70427042
"Partition [<specString>] did not specify locationUri."
70437043
]
70447044
},
7045-
"_LEGACY_ERROR_TEMP_1083" : {
7046-
"message" : [
7047-
"Number of buckets should be greater than 0 but less than or equal to bucketing.maxBuckets (`<bucketingMaxBuckets>`). Got `<numBuckets>`."
7048-
]
7049-
},
70507045
"_LEGACY_ERROR_TEMP_1089" : {
70517046
"message" : [
70527047
"Column statistics deserialization is not supported for column <name> of data type: <dataType>."

python/pyspark/errors/exceptions/connect.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -371,12 +371,6 @@ class InvalidPlanInput(SparkConnectGrpcException):
371371
"""
372372

373373

374-
class InvalidCommandInput(SparkConnectGrpcException):
375-
"""
376-
Error thrown when a connect command is not valid.
377-
"""
378-
379-
380374
class StreamingPythonRunnerInitializationException(
381375
SparkConnectGrpcException, BaseStreamingPythonRunnerInitException
382376
):
@@ -411,7 +405,6 @@ class PickleException(SparkConnectGrpcException, BasePickleException):
411405
"org.apache.spark.SparkNoSuchElementException": SparkNoSuchElementException,
412406
"org.apache.spark.SparkException": SparkException,
413407
"org.apache.spark.sql.connect.common.InvalidPlanInput": InvalidPlanInput,
414-
"org.apache.spark.sql.connect.common.InvalidCommandInput": InvalidCommandInput,
415408
"org.apache.spark.api.python.StreamingPythonRunner"
416409
"$StreamingPythonRunnerInitializationException": StreamingPythonRunnerInitializationException,
417410
}

sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1276,7 +1276,7 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
12761276

12771277
def invalidBucketNumberError(bucketingMaxBuckets: Int, numBuckets: Int): Throwable = {
12781278
new AnalysisException(
1279-
errorClass = "_LEGACY_ERROR_TEMP_1083",
1279+
errorClass = "INVALID_BUCKET_COUNT",
12801280
messageParameters = Map(
12811281
"bucketingMaxBuckets" -> bucketingMaxBuckets.toString,
12821282
"numBuckets" -> numBuckets.toString))

sql/connect/common/src/main/scala/org/apache/spark/sql/connect/common/InvalidCommandInput.scala

Lines changed: 0 additions & 36 deletions
This file was deleted.

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/InvalidInputErrors.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import scala.collection.mutable
2121

2222
import org.apache.spark.SparkThrowableHelper
2323
import org.apache.spark.connect.proto
24-
import org.apache.spark.sql.connect.common.{InvalidCommandInput, InvalidPlanInput}
24+
import org.apache.spark.sql.connect.common.InvalidPlanInput
2525
import org.apache.spark.sql.errors.DataTypeErrors.{quoteByDefault, toSQLType}
2626
import org.apache.spark.sql.types.DataType
2727

@@ -214,9 +214,6 @@ object InvalidInputErrors {
214214
def unionByNameAllowMissingColRequiresByName(): InvalidPlanInput =
215215
InvalidPlanInput("UnionByName `allowMissingCol` can be true only if `byName` is true.")
216216

217-
def invalidBucketCount(numBuckets: Int): InvalidCommandInput =
218-
InvalidCommandInput("INVALID_BUCKET_COUNT", Map("numBuckets" -> numBuckets.toString))
219-
220217
def unsupportedUserDefinedFunctionImplementation(clazz: Class[_]): InvalidPlanInput =
221218
InvalidPlanInput(s"Unsupported UserDefinedFunction implementation: ${clazz}")
222219
}

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3077,9 +3077,6 @@ class SparkConnectPlanner(
30773077
if (writeOperation.hasBucketBy) {
30783078
val op = writeOperation.getBucketBy
30793079
val cols = op.getBucketColumnNamesList.asScala
3080-
if (op.getNumBuckets <= 0) {
3081-
throw InvalidInputErrors.invalidBucketCount(op.getNumBuckets)
3082-
}
30833080
w.bucketBy(op.getNumBuckets, cols.head, cols.tail.toSeq: _*)
30843081
}
30853082

sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{CollectMetrics, Distinct, Lo
3434
import org.apache.spark.sql.catalyst.types.DataTypeUtils
3535
import org.apache.spark.sql.classic.ClassicConversions._
3636
import org.apache.spark.sql.classic.DataFrame
37-
import org.apache.spark.sql.connect.common.{InvalidCommandInput, InvalidPlanInput}
37+
import org.apache.spark.sql.connect.common.InvalidPlanInput
3838
import org.apache.spark.sql.connect.common.LiteralValueProtoConverter.toLiteralProto
3939
import org.apache.spark.sql.connect.dsl.MockRemoteSession
4040
import org.apache.spark.sql.connect.dsl.commands._
@@ -657,13 +657,18 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest {
657657
}
658658

659659
test("Write with invalid bucketBy configuration") {
660-
val cmd = localRelation.write(bucketByCols = Seq("id"), numBuckets = Some(0))
660+
val cmd = localRelation.write(
661+
tableName = Some("testtable"),
662+
tableSaveMethod = Some("save_as_table"),
663+
format = Some("parquet"),
664+
bucketByCols = Seq("id"),
665+
numBuckets = Some(0))
661666
checkError(
662-
exception = intercept[InvalidCommandInput] {
667+
exception = intercept[AnalysisException] {
663668
transform(cmd)
664669
},
665670
condition = "INVALID_BUCKET_COUNT",
666-
parameters = Map("numBuckets" -> "0"))
671+
parameters = Map("bucketingMaxBuckets" -> "100000", "numBuckets" -> "0"))
667672
}
668673

669674
test("Write to Path") {

0 commit comments

Comments
 (0)