Skip to content

Commit 2f305b6

Browse files
szehon-hocloud-fan
authored andcommitted
[SPARK-53527][SQL] Improve fallback of analyzeExistenceDefaultValue
### What changes were proposed in this pull request? #49962 added a fallback in case there were already broken (ie, non-resolved) persisted default values in catalogs. A broken one is something like 'current_database, current_user, current_timestamp' , these are non-deterministic and will bring wrong results in EXISTS_DEFAULT, where user expects the value resolved when they set the default. Add yet another fallback for broken default default value, in this case one where there are nested function calls. ### Why are the changes needed? Take the case where the EXISTS_DEFAULT is : ```CONCAT(YEAR(CURRENT_DATE), LPAD(WEEKOFYEAR(CURRENT_DATE), 2, '0'))``` the current code `Literal.fromSQL(defaultSQL)` will throw the exception before getting to the fallback: ``` Caused by: java.lang.AssertionError: assertion failed: function arguments must be resolved. at scala.Predef$.assert(Predef.scala:279) at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$.$anonfun$expressionBuilder$1(FunctionRegistry.scala:1278) at org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistryBase.lookupFunction(FunctionRegistry.scala:251) at org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistryBase.lookupFunction$(FunctionRegistry.scala:245) at org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry.lookupFunction(FunctionRegistry.scala:317) at org.apache.spark.sql.catalyst.expressions.Literal$$anonfun$fromSQL$1.applyOrElse(literals.scala:325) at org.apache.spark.sql.catalyst.expressions.Literal$$anonfun$fromSQL$1.applyOrElse(literals.scala:317) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$4(TreeNode.scala:586) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:121) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:586) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:579) at scala.collection.immutable.List.map(List.scala:251) at scala.collection.immutable.List.map(List.scala:79) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:768) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:579) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:556) at org.apache.spark.sql.catalyst.expressions.Literal$.fromSQL(literals.scala:317) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.analyzeExistenceDefaultValue(ResolveDefaultColumnsUtil.scala:393) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.getExistenceDefaultValue(ResolveDefaultColumnsUtil.scala:529) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$getExistenceDefaultValues$1(ResolveDefaultColumnsUtil.scala:524) at scala.collection.ArrayOps$.map$extension(ArrayOps.scala:936) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.getExistenceDefaultValues(ResolveDefaultColumnsUtil.scala:524) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$existenceDefaultValues$2(ResolveDefaultColumnsUtil.scala:594) at scala.Option.getOrElse(Option.scala:201) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.existenceDefaultValues(ResolveDefaultColumnsUtil.scala:592) ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add unit test in StructTypeSuite ### Was this patch authored or co-authored using generative AI tooling? No Closes #52274 from szehon-ho/more_default_value_fallback. Authored-by: Szehon Ho <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent adef6f3 commit 2f305b6

File tree

2 files changed

+35
-17
lines changed

2 files changed

+35
-17
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.catalyst.util
1919

2020
import scala.collection.mutable.ArrayBuffer
21+
import scala.util.{Failure, Success, Try}
2122
import scala.util.control.NonFatal
2223

2324
import org.apache.spark.{SparkException, SparkUnsupportedOperationException}
@@ -379,27 +380,33 @@ object ResolveDefaultColumns extends QueryErrorsBase
379380
val defaultSQL = field.metadata.getString(EXISTS_DEFAULT_COLUMN_METADATA_KEY)
380381

381382
// Parse the expression.
382-
val expr = Literal.fromSQL(defaultSQL) match {
383-
// EXISTS_DEFAULT will have a cast from analyze() due to coerceDefaultValue
384-
// hence we need to add timezone to the cast if necessary
385-
case c: Cast if c.child.resolved && c.needsTimeZone =>
386-
c.withTimeZone(SQLConf.get.sessionLocalTimeZone)
387-
case e: Expression => e
388-
}
383+
val resolvedExpr = Try(Literal.fromSQL(defaultSQL)) match {
384+
case Success(literal) =>
385+
val expr = literal match {
386+
// EXISTS_DEFAULT will have a cast from analyze() due to coerceDefaultValue
387+
// hence we need to add timezone to the cast if necessary
388+
case c: Cast if c.child.resolved && c.needsTimeZone =>
389+
c.withTimeZone(SQLConf.get.sessionLocalTimeZone)
390+
case e: Expression => e
391+
}
389392

390-
// Check invariants
391-
if (expr.containsPattern(PLAN_EXPRESSION)) {
392-
throw QueryCompilationErrors.defaultValuesMayNotContainSubQueryExpressions(
393-
"", field.name, defaultSQL)
394-
}
393+
// Check invariants
394+
if (expr.containsPattern(PLAN_EXPRESSION)) {
395+
throw QueryCompilationErrors.defaultValuesMayNotContainSubQueryExpressions(
396+
"", field.name, defaultSQL)
397+
}
398+
399+
expr match {
400+
case _: ExprLiteral => expr
401+
case c: Cast if c.resolved => expr
402+
case _ =>
403+
fallbackResolveExistenceDefaultValue(field)
404+
}
395405

396-
val resolvedExpr = expr match {
397-
case _: ExprLiteral => expr
398-
case c: Cast if c.resolved => expr
399-
case _ =>
406+
case Failure(_) =>
407+
// If Literal.fromSQL fails, use fallback resolution
400408
fallbackResolveExistenceDefaultValue(field)
401409
}
402-
403410
coerceDefaultValue(resolvedExpr, field.dataType, "", field.name, defaultSQL)
404411
}
405412

sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -856,6 +856,13 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper {
856856
"CAST(CURRENT_TIMESTAMP AS BIGINT)")
857857
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY,
858858
"CAST(CURRENT_TIMESTAMP AS BIGINT)")
859+
.build()),
860+
StructField("c3", StringType, true,
861+
new MetadataBuilder()
862+
.putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY,
863+
"CONCAT(YEAR(CURRENT_DATE), LPAD(WEEKOFYEAR(CURRENT_DATE), 2, '0'))")
864+
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY,
865+
"CONCAT(YEAR(CURRENT_DATE), LPAD(WEEKOFYEAR(CURRENT_DATE), 2, '0'))")
859866
.build())))
860867
val res = ResolveDefaultColumns.existenceDefaultValues(source)
861868
assert(res(0) == null)
@@ -864,5 +871,9 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper {
864871
val res2Wrapper = new LongWrapper
865872
assert(res(2).asInstanceOf[UTF8String].toLong(res2Wrapper))
866873
assert(res2Wrapper.value > 0)
874+
875+
val res3Wrapper = new LongWrapper
876+
assert(res(3).asInstanceOf[UTF8String].toLong(res3Wrapper))
877+
assert(res3Wrapper.value > 0)
867878
}
868879
}

0 commit comments

Comments
 (0)