Skip to content

[SPARK-51987][SQL] DSv2 expressions in column defaults on write #51002

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from

Conversation

aokolnychyi
Copy link
Contributor

What changes were proposed in this pull request?

This PR allows connectors to expose expression-based defaults on write.

Why are the changes needed?

These changes are needed to avoid the requirement of producing Spark SQL dialect in connectors.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

This PR comes with tests.

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the SQL label May 23, 2025
sealed class Metadata private[types] (private[types] val map: Map[String, Any])
sealed class Metadata private[types] (
private[types] val map: Map[String, Any],
@transient private[types] val runtimeMap: Map[String, Any])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am adding a runtime-only map of configs that is not serialized or exposed to the user. It will allow me to store alternative in-memory representations for certain configs. In particular, it would allow me to store SQL as well as the expression itself for default values.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Below is an example of how it is being used:

val existsDefault = extractExistsDefault(default)
val (sql, expr) = extractCurrentDefault(default)
val newMetadata = new MetadataBuilder()
  .withMetadata(f.metadata)
  .putString(EXISTS_DEFAULT_COLUMN_METADATA_KEY, existsDefault)
  .putExpression(CURRENT_DEFAULT_COLUMN_METADATA_KEY, sql, expr)
  .build()
f.copy(metadata = newMetadata)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This allows me to use the expression directly without generating/parsing SQL for it.

analyze(field.name, field.dataType, field.metadata.getString(metadataKey), statementType)
field.metadata.getExpression[Expression](metadataKey) match {
case (sql, Some(expr)) =>
analyze(field.name, field.dataType, expr, sql, statementType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this branch, sql will be only used for error message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

@@ -120,6 +122,12 @@ sealed class Metadata private[types] (private[types] val map: Map[String, Any])
map(key).asInstanceOf[T]
}

private[sql] def getExpression[E](key: String): (String, Option[E]) = {
val sql = getString(key)
val expr = if (runtimeMap != null) runtimeMap.get(key).map(_.asInstanceOf[E]) else None
Copy link

@singhpk234 singhpk234 May 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[optional]

Suggested change
val expr = if (runtimeMap != null) runtimeMap.get(key).map(_.asInstanceOf[E]) else None
val expr = Option(runtimeMap).flatMap(_.get(key).map(_.asInstanceOf[E]))

}

private def extractCurrentDefault(default: ColumnDefaultValue): (String, Option[Expression]) = {
val expr = Option(default.getExpression).flatMap(V2ExpressionUtils.toCatalyst)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[doubt] presently toCatalyst doesn't handle connector scalar udf's is the plan to enhance this in future ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally yes, but I am not sure we would want to allow them in default values.


private def extractCurrentDefault(default: ColumnDefaultValue): (String, Option[Expression]) = {
val expr = Option(default.getExpression).flatMap(V2ExpressionUtils.toCatalyst)
val sql = Option(default.getSql).orElse(expr.map(_.sql)).getOrElse {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[doubt] my understanding was .sql is not reliable (based on discussion here), wondering if this could lead to users using getMap or map.get(key), directly and extracting the SQL from the map, skip actually checking if there is an expression for it and one should use that instead ? essentially if there i an entry in the runtimeMap should we let the map.get fail ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we mean these properties to be accessible by users. We do that for built-in tables but there are proper DSv2 APIs for this like ColumnDefaultValue where the expression always takes precedence. SQL is informational in this case.

@@ -645,8 +645,21 @@ class DataSourceV2SQLSuiteV1Filter
assert(replaced.columns.length === 1,
"Replaced table should have new schema.")
val actual = replaced.columns.head
val expected = ColumnV2.create("id", LongType, false, null,
new ColumnDefaultValue("41 + 1", LiteralValue(42L, LongType)), null)
val expected = ColumnV2.create(
Copy link
Contributor Author

@aokolnychyi aokolnychyi May 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this PR, InMemoryTable exposes actual Columns, so we see the passed "current" expressions too.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 4a395bc Jun 2, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants