Skip to content

Commit 66adbf9

Browse files
rluvatonandygrove
authored andcommitted
feat: pass ignore_nulls flag to first and last (apache#1866)
1 parent 447b646 commit 66adbf9

File tree

2 files changed

+19
-10
lines changed

2 files changed

+19
-10
lines changed

spark/src/main/scala/org/apache/comet/serde/aggregates.scala

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -232,11 +232,6 @@ object CometFirst extends CometAggregateExpressionSerde {
232232
binding: Boolean,
233233
conf: SQLConf): Option[ExprOuterClass.AggExpr] = {
234234
val first = expr.asInstanceOf[First]
235-
if (first.ignoreNulls) {
236-
// DataFusion doesn't support ignoreNulls true
237-
withInfo(aggExpr, "First not supported when ignoreNulls=true")
238-
return None
239-
}
240235
val child = first.children.head
241236
val childExpr = exprToProto(child, inputs, binding)
242237
val dataType = serializeDataType(first.dataType)
@@ -245,6 +240,7 @@ object CometFirst extends CometAggregateExpressionSerde {
245240
val builder = ExprOuterClass.First.newBuilder()
246241
builder.setChild(childExpr.get)
247242
builder.setDatatype(dataType.get)
243+
builder.setIgnoreNulls(first.ignoreNulls)
248244

249245
Some(
250246
ExprOuterClass.AggExpr
@@ -269,11 +265,6 @@ object CometLast extends CometAggregateExpressionSerde {
269265
binding: Boolean,
270266
conf: SQLConf): Option[ExprOuterClass.AggExpr] = {
271267
val last = expr.asInstanceOf[Last]
272-
if (last.ignoreNulls) {
273-
// DataFusion doesn't support ignoreNulls true
274-
withInfo(aggExpr, "Last not supported when ignoreNulls=true")
275-
return None
276-
}
277268
val child = last.children.head
278269
val childExpr = exprToProto(child, inputs, binding)
279270
val dataType = serializeDataType(last.dataType)
@@ -282,6 +273,7 @@ object CometLast extends CometAggregateExpressionSerde {
282273
val builder = ExprOuterClass.Last.newBuilder()
283274
builder.setChild(childExpr.get)
284275
builder.setDatatype(dataType.get)
276+
builder.setIgnoreNulls(last.ignoreNulls)
285277

286278
Some(
287279
ExprOuterClass.AggExpr

spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -816,6 +816,23 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
816816
}
817817
}
818818

819+
test("first/last with ignore null") {
820+
val data = Range(0, 8192).flatMap(n => Seq((n, 1), (n, 2))).toDF("a", "b")
821+
withTempDir { dir =>
822+
val filename = s"${dir.getAbsolutePath}/first_last_ignore_null.parquet"
823+
data.write.parquet(filename)
824+
withSQLConf(CometConf.COMET_BATCH_SIZE.key -> "100") {
825+
spark.read.parquet(filename).createOrReplaceTempView("t1")
826+
for (expr <- Seq("first", "last")) {
827+
// deterministic query that should return one non-null value per group
828+
val df = spark.sql(
829+
s"SELECT a, $expr(IF(b==1,null,b)) IGNORE NULLS FROM t1 GROUP BY a ORDER BY a")
830+
checkSparkAnswerAndOperator(df)
831+
}
832+
}
833+
}
834+
}
835+
819836
test("all types, with nulls") {
820837
val numValues = 2048
821838

0 commit comments

Comments
 (0)