-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-51987][SQL] DSv2 expressions in column defaults on write #51002
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
sealed class Metadata private[types] (private[types] val map: Map[String, Any]) | ||
sealed class Metadata private[types] ( | ||
private[types] val map: Map[String, Any], | ||
@transient private[types] val runtimeMap: Map[String, Any]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am adding a runtime-only map of configs that is not serialized or exposed to the user. It will allow me to store alternative in-memory representations for certain configs. In particular, it would allow me to store SQL as well as the expression itself for default values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Below is an example of how it is being used:
val existsDefault = extractExistsDefault(default)
val (sql, expr) = extractCurrentDefault(default)
val newMetadata = new MetadataBuilder()
.withMetadata(f.metadata)
.putString(EXISTS_DEFAULT_COLUMN_METADATA_KEY, existsDefault)
.putExpression(CURRENT_DEFAULT_COLUMN_METADATA_KEY, sql, expr)
.build()
f.copy(metadata = newMetadata)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This allows me to use the expression directly without generating/parsing SQL for it.
analyze(field.name, field.dataType, field.metadata.getString(metadataKey), statementType) | ||
field.metadata.getExpression[Expression](metadataKey) match { | ||
case (sql, Some(expr)) => | ||
analyze(field.name, field.dataType, expr, sql, statementType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in this branch, sql
will be only used for error message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
@@ -120,6 +122,12 @@ sealed class Metadata private[types] (private[types] val map: Map[String, Any]) | |||
map(key).asInstanceOf[T] | |||
} | |||
|
|||
private[sql] def getExpression[E](key: String): (String, Option[E]) = { | |||
val sql = getString(key) | |||
val expr = if (runtimeMap != null) runtimeMap.get(key).map(_.asInstanceOf[E]) else None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[optional]
val expr = if (runtimeMap != null) runtimeMap.get(key).map(_.asInstanceOf[E]) else None | |
val expr = Option(runtimeMap).flatMap(_.get(key).map(_.asInstanceOf[E])) |
} | ||
|
||
private def extractCurrentDefault(default: ColumnDefaultValue): (String, Option[Expression]) = { | ||
val expr = Option(default.getExpression).flatMap(V2ExpressionUtils.toCatalyst) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[doubt] presently toCatalyst
doesn't handle connector scalar udf's is the plan to enhance this in future ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally yes, but I am not sure we would want to allow them in default values.
|
||
private def extractCurrentDefault(default: ColumnDefaultValue): (String, Option[Expression]) = { | ||
val expr = Option(default.getExpression).flatMap(V2ExpressionUtils.toCatalyst) | ||
val sql = Option(default.getSql).orElse(expr.map(_.sql)).getOrElse { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[doubt] my understanding was .sql
is not reliable (based on discussion here), wondering if this could lead to users using getMap
or map.get(key), directly and extracting the SQL from the map, skip actually checking if there is an expression for it and one should use that instead ? essentially if there i an entry in the runtimeMap should we let the map.get fail ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we mean these properties to be accessible by users. We do that for built-in tables but there are proper DSv2 APIs for this like ColumnDefaultValue
where the expression always takes precedence. SQL is informational in this case.
@@ -645,8 +645,21 @@ class DataSourceV2SQLSuiteV1Filter | |||
assert(replaced.columns.length === 1, | |||
"Replaced table should have new schema.") | |||
val actual = replaced.columns.head | |||
val expected = ColumnV2.create("id", LongType, false, null, | |||
new ColumnDefaultValue("41 + 1", LiteralValue(42L, LongType)), null) | |||
val expected = ColumnV2.create( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After this PR, InMemoryTable
exposes actual Columns, so we see the passed "current" expressions too.
thanks, merging to master! |
What changes were proposed in this pull request?
This PR allows connectors to expose expression-based defaults on write.
Why are the changes needed?
These changes are needed to avoid the requirement of producing Spark SQL dialect in connectors.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
This PR comes with tests.
Was this patch authored or co-authored using generative AI tooling?
No.