diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala index 323f715ede2ce..58b6314e27ade 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.util import scala.collection.mutable.ArrayBuffer -import org.apache.spark.{SparkThrowable, SparkUnsupportedOperationException} +import org.apache.spark.{SparkException, SparkThrowable, SparkUnsupportedOperationException} import org.apache.spark.internal.{Logging, MDC} import org.apache.spark.internal.LogKeys._ import org.apache.spark.sql.AnalysisException @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.{Literal => ExprLiteral} import org.apache.spark.sql.catalyst.optimizer.{ConstantFolding, Optimizer} import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.catalyst.plans.logical._ @@ -340,12 +341,39 @@ object ResolveDefaultColumns extends QueryErrorsBase throw QueryCompilationErrors.defaultValuesMayNotContainSubQueryExpressions( "", field.name, defaultSQL) } - if (!expr.resolved) { - throw QueryCompilationErrors.defaultValuesUnresolvedExprError( - "", field.name, defaultSQL, null) + + val resolvedExpr = expr match { + case _: ExprLiteral => expr + case c: Cast if c.resolved => expr + case _ => + fallbackResolveExistenceDefaultValue(field) } - coerceDefaultValue(expr, field.dataType, "", field.name, defaultSQL) + coerceDefaultValue(resolvedExpr, field.dataType, "", field.name, defaultSQL) + } + + // In most cases, column existsDefault should already be persisted as resolved + // and constant-folded literal sql, but because they are fetched from external catalog, + // it is possible that this assumption does not hold, so we fallback to full analysis + // if we encounter an unresolved existsDefault + private def fallbackResolveExistenceDefaultValue( + field: StructField): Expression = { + field.getExistenceDefaultValue().map { defaultSQL: String => + + logWarning(log"Encountered unresolved exists default value: " + + log"'${MDC(COLUMN_DEFAULT_VALUE, defaultSQL)}' " + + log"for column ${MDC(COLUMN_NAME, field.name)} " + + log"with ${MDC(COLUMN_DATA_TYPE_SOURCE, field.dataType)}, " + + log"falling back to full analysis.") + + val expr = analyze(field, "", EXISTS_DEFAULT_COLUMN_METADATA_KEY) + val literal = expr match { + case _: ExprLiteral | _: Cast => expr + case _ => throw SparkException.internalError(s"parse existence default as literal err," + + s" field name: ${field.name}, value: $defaultSQL") + } + literal + }.orNull } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala index f247806fb4130..472a5c09f7c32 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala @@ -831,4 +831,22 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper { validateConvertedDefaults("c4", VariantType, "parse_json(null)", "CAST(NULL AS VARIANT)") } + + test("SPARK-51119: Add fallback to process unresolved EXISTS_DEFAULT") { + val source = StructType( + Array( + StructField("c1", VariantType, true, + new MetadataBuilder() + .putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, "parse_json(null)") + .putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "parse_json(null)") + .build()), + StructField("c0", StringType, true, + new MetadataBuilder() + .putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, "current_catalog()") + .putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "current_catalog()") + .build()))) + val res = ResolveDefaultColumns.existenceDefaultValues(source) + assert(res(0) == null) + assert(res(1) == UTF8String.fromString("spark_catalog")) + } }