Skip to content

Commit 4731c85

Browse files
szehon-hocloud-fan
authored andcommitted
[SPARK-51119][SQL][FOLLOW-UP] Fix missing fallback case for parsing corrupt exists_default value
### What changes were proposed in this pull request? Add another fallback for broken (non-resolved) exists_default values for SPARK-51119 original fix. ### Why are the changes needed? #49962 added a fallback in case there were already broken (ie, non-resolved) persisted default values in catalogs. A broken one is something like 'current_database, current_user, current_timestamp' , these are non-deterministic and will bring wrong results in EXISTS_DEFAULT, where user expects the value resolved when they set the default. However, this fallback missed one case when the current_xxx is in a cast. This fixes it. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add to existing unit test in StructTypeSuite ### Was this patch authored or co-authored using generative AI tooling? No Closes #50685 from szehon-ho/SPARK-51119-follow-3. Authored-by: Szehon Ho <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 516859f) Signed-off-by: Wenchen Fan <[email protected]>
1 parent ccfb64b commit 4731c85

File tree

2 files changed

+27
-11
lines changed

2 files changed

+27
-11
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ object ResolveDefaultColumns extends QueryErrorsBase
371371
val expr = Literal.fromSQL(defaultSQL) match {
372372
// EXISTS_DEFAULT will have a cast from analyze() due to coerceDefaultValue
373373
// hence we need to add timezone to the cast if necessary
374-
case c: Cast if c.needsTimeZone =>
374+
case c: Cast if c.child.resolved && c.needsTimeZone =>
375375
c.withTimeZone(SQLConf.get.sessionLocalTimeZone)
376376
case e: Expression => e
377377
}

sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.spark.sql.types.DayTimeIntervalType._
3535
import org.apache.spark.sql.types.StructType.fromDDL
3636
import org.apache.spark.sql.types.YearMonthIntervalType._
3737
import org.apache.spark.unsafe.types.UTF8String
38+
import org.apache.spark.unsafe.types.UTF8String.LongWrapper
3839

3940
class StructTypeSuite extends SparkFunSuite with SQLHelper {
4041

@@ -835,18 +836,33 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper {
835836
test("SPARK-51119: Add fallback to process unresolved EXISTS_DEFAULT") {
836837
val source = StructType(
837838
Array(
838-
StructField("c1", VariantType, true,
839-
new MetadataBuilder()
840-
.putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, "parse_json(null)")
841-
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "parse_json(null)")
842-
.build()),
843-
StructField("c0", StringType, true,
844-
new MetadataBuilder()
845-
.putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, "current_catalog()")
846-
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "current_catalog()")
847-
.build())))
839+
StructField("c0", VariantType, true,
840+
new MetadataBuilder()
841+
.putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY,
842+
"parse_json(null)")
843+
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY,
844+
"parse_json(null)")
845+
.build()),
846+
StructField("c1", StringType, true,
847+
new MetadataBuilder()
848+
.putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY,
849+
"current_catalog()")
850+
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY,
851+
"current_catalog()")
852+
.build()),
853+
StructField("c2", StringType, true,
854+
new MetadataBuilder()
855+
.putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY,
856+
"CAST(CURRENT_TIMESTAMP AS BIGINT)")
857+
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY,
858+
"CAST(CURRENT_TIMESTAMP AS BIGINT)")
859+
.build())))
848860
val res = ResolveDefaultColumns.existenceDefaultValues(source)
849861
assert(res(0) == null)
850862
assert(res(1) == UTF8String.fromString("spark_catalog"))
863+
864+
val res2Wrapper = new LongWrapper
865+
assert(res(2).asInstanceOf[UTF8String].toLong(res2Wrapper))
866+
assert(res2Wrapper.value > 0)
851867
}
852868
}

0 commit comments

Comments
 (0)