Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion .github/workflows/pr_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ jobs:
maven_opts: -Pspark-${{ matrix.spark-version }},scala-${{ matrix.scala-version }}
# upload test reports only for java 17
upload-test-reports: ${{ matrix.java_version == '17' }}
env:
COMET_FUZZ_TEST: "true"

linux-test-with-spark4_0:
strategy:
Expand Down Expand Up @@ -102,6 +104,8 @@ jobs:
with:
maven_opts: -Pspark-${{ matrix.spark-version }}
upload-test-reports: true
env:
COMET_FUZZ_TEST: "true"

linux-test-with-old-spark:
strategy:
Expand All @@ -127,6 +131,8 @@ jobs:
uses: ./.github/actions/java-test
with:
maven_opts: -Pspark-${{ matrix.spark-version }},scala-${{ matrix.scala-version }}
env:
COMET_FUZZ_TEST: "true"

macos-test:
strategy:
Expand Down Expand Up @@ -155,6 +161,8 @@ jobs:
uses: ./.github/actions/java-test
with:
maven_opts: -Pspark-${{ matrix.spark-version }},scala-${{ matrix.scala-version }}
env:
COMET_FUZZ_TEST: "true"

macos-aarch64-test:
strategy:
Expand Down Expand Up @@ -188,6 +196,8 @@ jobs:
uses: ./.github/actions/java-test
with:
maven_opts: -Pspark-${{ matrix.spark-version }},scala-${{ matrix.scala-version }}
env:
COMET_FUZZ_TEST: "true"

macos-test-with-spark4_0:
strategy:
Expand All @@ -212,6 +222,8 @@ jobs:
with:
maven_opts: -Pspark-${{ matrix.spark-version }}
upload-test-reports: true
env:
COMET_FUZZ_TEST: "true"

macos-aarch64-test-with-spark4_0:
strategy:
Expand Down Expand Up @@ -241,6 +253,8 @@ jobs:
with:
maven_opts: -Pspark-${{ matrix.spark-version }}
upload-test-reports: true
env:
COMET_FUZZ_TEST: "true"

macos-aarch64-test-with-old-spark:
strategy:
Expand Down Expand Up @@ -269,4 +283,5 @@ jobs:
uses: ./.github/actions/java-test
with:
maven_opts: -Pspark-${{ matrix.spark-version }},scala-${{ matrix.scala-version }}

env:
COMET_FUZZ_TEST: "true"
83 changes: 83 additions & 0 deletions spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,29 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
false
}

def isMatch(dt: DataType, at: ArgType): Boolean = {
at match {
case AnyType => true
case IntegralType =>
dt match {
case _: ByteType | _: ShortType | _: IntegerType | _: LongType => true
case _ => false
}
case NumericType =>
dt match {
case _: ByteType | _: ShortType | _: IntegerType | _: LongType => true
case _: FloatType | _: DoubleType => true
case _: DecimalType => true
case _ => false
}
case OrderedType =>
// TODO exclude map or other complex types that contain maps
true
case _ =>
false
}
}

/**
* Serializes Spark datatype to protobuf. Note that, a datatype can be serialized by this method
* doesn't mean it is supported by Comet native execution, i.e., `supportedDataType` may return
Expand Down Expand Up @@ -377,6 +400,25 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
return None

}

// check that Comet supports the input data types, using the same logic that is leveraged
// in fuzz testing
cometExpr.getSignature() match {
case Fixed(dataTypes) =>
if (aggExpr.children.length != dataTypes.length) {
withInfo(aggExpr, "Unsupported input argument count")
return None
}
val supportedTypes = dataTypes.zip(aggExpr.children.map(_.dataType)).forall {
case (expected, provided) => isMatch(provided, expected)
}
if (!supportedTypes) {
withInfo(aggExpr, "Unsupported input types")
return None
}
case _ =>
}

cometExpr.convert(aggExpr, aggExpr.aggregateFunction, inputs, binding, conf)
}

Expand Down Expand Up @@ -3011,6 +3053,10 @@ trait CometExpressionSerde {
*/
trait CometAggregateExpressionSerde {

def sql(): String

def getSignature(): Signature

/**
* Convert a Spark expression into a protocol buffer representation that can be passed into
* native code.
Expand Down Expand Up @@ -3040,3 +3086,40 @@ trait CometAggregateExpressionSerde {

/** Marker trait for an expression that is not guaranteed to be 100% compatible with Spark */
trait IncompatExpr {}

/** Represents the data type(s) that an argument accepts */
sealed trait ArgType

/** Supports any input type */
case object AnyType extends ArgType

/** Integral, floating-point, and decimal */
case object NumericType extends ArgType

/** Integral types (byte, short, int, long) */
case object IntegralType extends ArgType

/** Types that can ordered. Includes struct and array but excludes maps */
case object OrderedType extends ArgType

/*
case class ConcreteTypes(dataTypes: Seq[DataType]) extends ArgType
*/

// Base trait for expression signatures
trait Signature

// A fixed number of arguments with specific types
case class Fixed(types: Seq[ArgType]) extends Signature

/*
// A mix of fixed and optional arguments
case class FixedWithOptional(fixed: Seq[ArgType], optional: Seq[ArgType]) extends Signature

// A variadic signature, allowing for a range of arguments
case class Variadic(minArgs: Option[Int], maxArgs: Option[Int], argType: ArgType)
extends Signature

// A generic function signature that supports multiple forms
case class Overloaded(variants: Seq[Signature]) extends Signature
*/
Loading
Loading