diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogWithListener.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogWithListener.scala index c2613ff74da4a..9d4312343fd41 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogWithListener.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogWithListener.scala @@ -198,7 +198,10 @@ class ExternalCatalogWithListener(delegate: ExternalCatalog) table: String, parts: Seq[CatalogTablePartition], ignoreIfExists: Boolean): Unit = { + val partSpecs = parts.map(_.spec) + postToAll(CreatePartitionsPreEvent(db, table, partSpecs)) delegate.createPartitions(db, table, parts, ignoreIfExists) + postToAll(CreatePartitionsEvent(db, table, partSpecs)) } override def dropPartitions( @@ -208,7 +211,9 @@ class ExternalCatalogWithListener(delegate: ExternalCatalog) ignoreIfNotExists: Boolean, purge: Boolean, retainData: Boolean): Unit = { + postToAll(DropPartitionsPreEvent(db, table, partSpecs)) delegate.dropPartitions(db, table, partSpecs, ignoreIfNotExists, purge, retainData) + postToAll(DropPartitionsEvent(db, table, partSpecs)) } override def renamePartitions( @@ -216,14 +221,19 @@ class ExternalCatalogWithListener(delegate: ExternalCatalog) table: String, specs: Seq[TablePartitionSpec], newSpecs: Seq[TablePartitionSpec]): Unit = { + postToAll(RenamePartitionsPreEvent(db, table, specs, newSpecs)) delegate.renamePartitions(db, table, specs, newSpecs) + postToAll(RenamePartitionsEvent(db, table, specs, newSpecs)) } override def alterPartitions( db: String, table: String, parts: Seq[CatalogTablePartition]): Unit = { + val partSpecs = parts.map(_.spec) + postToAll(AlterPartitionsPreEvent(db, table, partSpecs)) delegate.alterPartitions(db, table, parts) + postToAll(AlterPartitionsEvent(db, table, partSpecs)) } override def getPartition( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala index e7d41644392d5..a1bbaec2b0f84 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.catalog import org.apache.spark.scheduler.SparkListenerEvent +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec /** * Event emitted by the external catalog when it is modified. Events are either fired before or @@ -202,3 +203,87 @@ case class RenameFunctionEvent( name: String, newName: String) extends FunctionEvent + +/** + * Event fired when some partitions (of a table) are created, dropped, renamed, altered. + */ +trait PartitionsEvent extends TableEvent { + /** + * Specs of the partitions which are touched. + */ + val partSpecs: Seq[TablePartitionSpec] +} + +/** + * Event fired before some partitions (of a table) are created. + */ +case class CreatePartitionsPreEvent( + database: String, + name /* of table */: String, + partSpecs: Seq[TablePartitionSpec]) + extends PartitionsEvent + +/** + * Event fired after some partitions (of a table) have been created. + */ +case class CreatePartitionsEvent( + database: String, + name /* of table */: String, + partSpecs: Seq[TablePartitionSpec]) + extends PartitionsEvent + +/** + * Event fired before some partitions (of a table) are dropped. + */ +case class DropPartitionsPreEvent( + database: String, + name /* of table */ : String, + partSpecs: Seq[TablePartitionSpec]) + extends PartitionsEvent + +/** + * Event fired after some partitions (of a table) have been dropped. + */ +case class DropPartitionsEvent( + database: String, + name /* of table */ : String, + partSpecs: Seq[TablePartitionSpec]) + extends PartitionsEvent + +/** + * Event fired before some partitions (of a table) are renamed. + */ +case class RenamePartitionsPreEvent( + database: String, + name /* of table */ : String, + partSpecs: Seq[TablePartitionSpec], + newPartSpecs: Seq[TablePartitionSpec]) + extends PartitionsEvent + +/** + * Event fired after some partitions (of a table) have been renamed. + */ +case class RenamePartitionsEvent( + database: String, + name /* of table */ : String, + partSpecs: Seq[TablePartitionSpec], + newPartSpecs: Seq[TablePartitionSpec]) + extends PartitionsEvent + +/** + * Event fired before some partitions (of a table) are altered. + */ +case class AlterPartitionsPreEvent( + database: String, + name /* of table */ : String, + partSpecs: Seq[TablePartitionSpec]) + extends PartitionsEvent + +/** + * Event fired after some partitions (of a table) have been altered. + */ +case class AlterPartitionsEvent( + database: String, + name /* of table */ : String, + partSpecs: Seq[TablePartitionSpec]) + extends PartitionsEvent diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala index 366188c3327be..7200df745458d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala @@ -209,4 +209,112 @@ class ExternalCatalogEventSuite extends SparkFunSuite { catalog.dropFunction("db5", "fn4") checkEvents(DropFunctionPreEvent("db5", "fn4") :: DropFunctionEvent("db5", "fn4") :: Nil) } + + testWithCatalog("partitions") { (catalog, checkEvents) => + // Prepare db + val db = "db1" + val dbUri = preparePath(Files.createTempDirectory(db + "_")) + val dbDefinition = CatalogDatabase( + name = db, + description = "", + locationUri = dbUri, + properties = Map.empty) + + catalog.createDatabase(dbDefinition, ignoreIfExists = false) + checkEvents( + CreateDatabasePreEvent(db) :: + CreateDatabaseEvent(db) :: Nil) + + // Prepare table + val table = "table1" + val tableUri = preparePath(Files.createTempDirectory(table + "_")) + val tableDefinition = CatalogTable( + identifier = TableIdentifier(table, Some(db)), + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty.copy(locationUri = Option(tableUri)), + schema = new StructType() + .add("year", "int") + .add("month", "int") + .add("sales", "long")) + + catalog.createTable(tableDefinition, ignoreIfExists = false) + checkEvents( + CreateTablePreEvent(db, table) :: + CreateTableEvent(db, table) :: Nil) + + // Prepare partitions + val storageFormat = CatalogStorageFormat( + locationUri = Some(tableUri), + inputFormat = Some("tableInputFormat"), + outputFormat = Some("tableOutputFormat"), + serde = None, + compressed = false, + properties = Map.empty) + val parts = Seq(CatalogTablePartition(Map("year" -> "2025", "month" -> "Jan"), storageFormat)) + val partSpecs = parts.map(_.spec) + + val newPartSpecs = Seq(Map("year" -> "2026", "month" -> "Feb")) + + // CREATE + catalog.createPartitions(db, table, parts, ignoreIfExists = false) + checkEvents( + CreatePartitionsPreEvent(db, table, partSpecs) :: + CreatePartitionsEvent(db, table, partSpecs) :: Nil) + + // Re-create with ignoreIfExists as true + catalog.createPartitions(db, table, parts, ignoreIfExists = true) + checkEvents( + CreatePartitionsPreEvent(db, table, partSpecs) :: + CreatePartitionsEvent(db, table, partSpecs) :: Nil) + + // createPartitions() failed because re-creating with ignoreIfExists as false, so PreEvent only + intercept[AnalysisException] { + catalog.createPartitions(db, table, parts, ignoreIfExists = false) + } + checkEvents(CreatePartitionsPreEvent(db, table, partSpecs) :: Nil) + + // ALTER + catalog.alterPartitions(db, table, parts) + checkEvents( + AlterPartitionsPreEvent(db, table, partSpecs) :: + AlterPartitionsEvent(db, table, partSpecs) :: + Nil) + + // RENAME + catalog.renamePartitions(db, table, partSpecs, newPartSpecs) + checkEvents( + RenamePartitionsPreEvent(db, table, partSpecs, newPartSpecs) :: + RenamePartitionsEvent(db, table, partSpecs, newPartSpecs) :: Nil) + + // renamePartitions() failed because partitions have been renamed according to newPartSpecs, + // so PreEvent only + intercept[AnalysisException] { + catalog.renamePartitions(db, table, partSpecs, newPartSpecs) + } + checkEvents(RenamePartitionsPreEvent(db, table, partSpecs, newPartSpecs) :: Nil) + + // DROP + // dropPartitions() failed + // because partition of (old) partSpecs do not exist and ignoreIfNotExists is false, + // So PreEvent only + intercept[AnalysisException] { + catalog.dropPartitions(db, table, partSpecs, + ignoreIfNotExists = false, purge = true, retainData = true) + } + checkEvents(DropPartitionsPreEvent(db, table, partSpecs) :: Nil) + + // Drop the renamed partitions + catalog.dropPartitions(db, table, newPartSpecs, + ignoreIfNotExists = false, purge = true, retainData = true) + checkEvents( + DropPartitionsPreEvent(db, table, newPartSpecs) :: + DropPartitionsEvent(db, table, newPartSpecs) :: Nil) + + // Re-drop with ignoreIfNotExists being true + catalog.dropPartitions(db, table, newPartSpecs, + ignoreIfNotExists = true, purge = true, retainData = true) + checkEvents( + DropPartitionsPreEvent(db, table, newPartSpecs) :: + DropPartitionsEvent(db, table, newPartSpecs) :: Nil) + } }