Skip to content

Commit 6f13efb

Browse files
committed
[SPARK-51119][SQL][FOLLOW-UP] Add fallback to ResolveDefaultColumnsUtil.existenceDefaultValues
1 parent e397207 commit 6f13efb

File tree

2 files changed

+47
-5
lines changed

2 files changed

+47
-5
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,15 @@ package org.apache.spark.sql.catalyst.util
1919

2020
import scala.collection.mutable.ArrayBuffer
2121

22-
import org.apache.spark.{SparkThrowable, SparkUnsupportedOperationException}
22+
import org.apache.spark.{SparkException, SparkThrowable, SparkUnsupportedOperationException}
2323
import org.apache.spark.internal.{Logging, MDC}
2424
import org.apache.spark.internal.LogKeys._
2525
import org.apache.spark.sql.AnalysisException
2626
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
2727
import org.apache.spark.sql.catalyst.analysis._
2828
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, InMemoryCatalog, SessionCatalog}
2929
import org.apache.spark.sql.catalyst.expressions._
30+
import org.apache.spark.sql.catalyst.expressions.{Literal => ExprLiteral}
3031
import org.apache.spark.sql.catalyst.optimizer.{ConstantFolding, Optimizer}
3132
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
3233
import org.apache.spark.sql.catalyst.plans.logical._
@@ -340,12 +341,35 @@ object ResolveDefaultColumns extends QueryErrorsBase
340341
throw QueryCompilationErrors.defaultValuesMayNotContainSubQueryExpressions(
341342
"", field.name, defaultSQL)
342343
}
343-
if (!expr.resolved) {
344-
throw QueryCompilationErrors.defaultValuesUnresolvedExprError(
345-
"", field.name, defaultSQL, null)
344+
345+
val resolvedExpr = expr match {
346+
case _: ExprLiteral | _: Cast => expr
347+
case _ =>
348+
// In most cases, column existsDefault should already be persisted as resolved sql
349+
// statements but because they are fetched from external catalog, it is possible
350+
// that this assumption does not hold so we fallback to full analysis if we encounter
351+
// an unresolved existsDefault
352+
val fallback = fallbackResolveExistenceDefaultValue(field)
353+
if (!fallback.resolved) {
354+
throw QueryCompilationErrors.defaultValuesUnresolvedExprError(
355+
"", field.name, defaultSQL, null)
356+
}
357+
fallback
346358
}
347359

348-
coerceDefaultValue(expr, field.dataType, "", field.name, defaultSQL)
360+
coerceDefaultValue(resolvedExpr, field.dataType, "", field.name, defaultSQL)
361+
}
362+
363+
private def fallbackResolveExistenceDefaultValue(field: StructField): Expression = {
364+
val defaultValue: Option[String] = field.getExistenceDefaultValue()
365+
defaultValue.map { text: String =>
366+
val expr = analyze(field, "", EXISTS_DEFAULT_COLUMN_METADATA_KEY)
367+
expr match {
368+
case _: ExprLiteral | _: Cast => expr
369+
case _ => throw SparkException.internalError(s"parse existence default as literal err," +
370+
s" field name: ${field.name}, value: $text")
371+
}
372+
}.orNull
349373
}
350374

351375
/**

sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -831,4 +831,22 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper {
831831
validateConvertedDefaults("c4", VariantType, "parse_json(null)", "CAST(NULL AS VARIANT)")
832832

833833
}
834+
835+
test("SPARK-51119: Add fallback to process unresolved EXISTS_DEFAULT") {
836+
val source = StructType(
837+
Array(
838+
StructField("c1", VariantType, true,
839+
new MetadataBuilder()
840+
.putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, "parse_json(null)")
841+
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "parse_json(null)")
842+
.build()),
843+
StructField("c0", StringType, true,
844+
new MetadataBuilder()
845+
.putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, "current_catalog()")
846+
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "current_catalog()")
847+
.build())))
848+
val res = ResolveDefaultColumns.existenceDefaultValues(source)
849+
assert(res(0) == null)
850+
assert(res(1) == "spark_catalog")
851+
}
834852
}

0 commit comments

Comments
 (0)