Skip to content

Commit

Permalink
[SPARK-51119][SQL] Readers on executors resolving EXISTS_DEFAULT shou…
Browse files Browse the repository at this point in the history
…ld not call catalogs
  • Loading branch information
szehon-ho committed Feb 6, 2025
1 parent 39433d4 commit 164bd6b
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,17 @@ object ResolveDefaultColumns extends QueryErrorsBase
// CURRENT_DEFAULT_COLUMN_METADATA.
val CURRENT_DEFAULT_COLUMN_NAME = "DEFAULT"

var defaultColumnAnalyzer: Analyzer = DefaultColumnAnalyzer
var defaultColumnOptimizer: Optimizer = DefaultColumnOptimizer

/**
* Visible for testing
*/
def setAnalyzerAndOptimizer(analyzer: Analyzer, optimizer: Optimizer): Unit = {
this.defaultColumnAnalyzer = analyzer
this.defaultColumnOptimizer = optimizer
}

/**
* Finds "current default" expressions in CREATE/REPLACE TABLE columns and constant-folds them.
*
Expand Down Expand Up @@ -287,12 +298,12 @@ object ResolveDefaultColumns extends QueryErrorsBase

// Analyze the parse result.
val plan = try {
val analyzer: Analyzer = DefaultColumnAnalyzer
val analyzer: Analyzer = defaultColumnAnalyzer
val analyzed = analyzer.execute(Project(Seq(Alias(parsed, colName)()), OneRowRelation()))
analyzer.checkAnalysis(analyzed)
// Eagerly execute finish-analysis and constant-folding rules before checking whether the
// expression is foldable and resolved.
ConstantFolding(DefaultColumnOptimizer.FinishAnalysis(analyzed))
ConstantFolding(defaultColumnOptimizer.FinishAnalysis(analyzed))
} catch {
case ex: AnalysisException =>
throw QueryCompilationErrors.defaultValuesUnresolvedExprError(
Expand Down Expand Up @@ -320,6 +331,67 @@ object ResolveDefaultColumns extends QueryErrorsBase
coerceDefaultValue(analyzed, dataType, statementType, colName, defaultSQL)
}

/**
* Analyze EXISTS_DEFAULT value. This skips some steps of analyze as most of the
* analysis has been done before.
*
* VisibleForTesting
*/
def analyzeExistingDefault(field: StructField,
analyzer: Analyzer = DefaultColumnAnalyzer): Expression = {
val colName = field.name
val dataType = field.dataType
val defaultSQL = field.metadata.getString(EXISTS_DEFAULT_COLUMN_METADATA_KEY)

// Parse the expression.
lazy val parser = new CatalystSqlParser()
val parsed: Expression = try {
parser.parseExpression(defaultSQL)
} catch {
case ex: ParseException =>
throw QueryCompilationErrors.defaultValuesUnresolvedExprError(
"", colName, defaultSQL, ex)
}
// Check invariants before moving on to analysis.
if (parsed.containsPattern(PLAN_EXPRESSION)) {
throw QueryCompilationErrors.defaultValuesMayNotContainSubQueryExpressions(
"", colName, defaultSQL)
}

// Analyze the parse result.
val plan = try {
val analyzer: Analyzer = defaultColumnAnalyzer
val analyzed = analyzer.execute(Project(Seq(Alias(parsed, colName)()), OneRowRelation()))
analyzer.checkAnalysis(analyzed)
// Eagerly execute constant-folding rules before checking whether the
// expression is foldable and resolved.
ConstantFolding(analyzed)
} catch {
case ex: AnalysisException =>
throw QueryCompilationErrors.defaultValuesUnresolvedExprError(
"", colName, defaultSQL, ex)
}
val analyzed: Expression = plan.collectFirst {
case Project(Seq(a: Alias), OneRowRelation()) => a.child
}.get

// Extra check, expressions should already be resolved and foldable
if (!analyzed.foldable) {
throw QueryCompilationErrors.defaultValueNotConstantError(defaultSQL, colName, defaultSQL)
}

if (!analyzed.resolved) {
throw QueryCompilationErrors.defaultValuesUnresolvedExprError(
"",
colName,
defaultSQL,
cause = null)
}

// Perform implicit coercion from the provided expression type to the required column type.
coerceDefaultValue(analyzed, dataType, defaultSQL, colName, defaultSQL)
}

/**
* If the provided default value is a literal of a wider type than the target column,
* but the literal value fits within the narrower type, just coerce it for convenience.
Expand Down Expand Up @@ -407,7 +479,7 @@ object ResolveDefaultColumns extends QueryErrorsBase
val defaultValue: Option[String] = field.getExistenceDefaultValue()
defaultValue.map { text: String =>
val expr = try {
val expr = analyze(field, "", EXISTS_DEFAULT_COLUMN_METADATA_KEY)
val expr = analyzeExistingDefault(field)
expr match {
case _: ExprLiteral | _: Cast => expr
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@
package org.apache.spark.sql.types

import com.fasterxml.jackson.databind.ObjectMapper
import org.mockito.Mockito.withSettings
import org.mockito.stubbing.Answer
import org.scalatestplus.mockito.MockitoSugar.mock

import org.apache.spark.{SparkException, SparkFunSuite, SparkIllegalArgumentException}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{caseInsensitiveResolution, caseSensitiveResolution}
import org.apache.spark.sql.catalyst.analysis.{caseInsensitiveResolution, caseSensitiveResolution, Analyzer}
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DayTimeIntervalType => DT}
import org.apache.spark.sql.types.{YearMonthIntervalType => YM}
Expand Down Expand Up @@ -798,4 +803,44 @@ class StructTypeSuite extends SparkFunSuite with SQLHelper {
assert(
mapper.readTree(mapWithNestedArray.json) == mapper.readTree(expectedJson))
}

test("SPARK-51119: Readers on executors resolving EXISTS_DEFAULT should not call catalogs") {
val failingAnswer: Answer[Any] = m => {
if (!(m.getMethod.getName == "v1SessionCatalog")) {
fail("should not be called")
} else {
null
}
}

val dummyCatalogManager = mock[CatalogManager](withSettings().defaultAnswer(failingAnswer))
val dummyAnalyzer = new Analyzer(dummyCatalogManager)
val dummyOptimizer = new Optimizer(dummyCatalogManager) {}

val source1 = StructType(Array(
StructField("c1", LongType, true,
new MetadataBuilder()
.putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, "CAST(42 AS BIGINT)")
.putString(
ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "CAST(42 AS BIGINT)")
.build()),
StructField("c2", StringType, true,
new MetadataBuilder()
.putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, "'abc'")
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "'abc'")
.build()),
StructField("c3", BooleanType)))
ResolveDefaultColumns.setAnalyzerAndOptimizer(dummyAnalyzer, dummyOptimizer)
try {
assert(ResolveDefaultColumns.existenceDefaultValues(source1).length == 3)
assert(ResolveDefaultColumns.existenceDefaultValues(source1)(0) == 42)
assert(ResolveDefaultColumns.existenceDefaultValues(source1)(1)
== UTF8String.fromString("abc"))
assert(ResolveDefaultColumns.existenceDefaultValues(source1)(2) == null)
} finally {
ResolveDefaultColumns.setAnalyzerAndOptimizer(
ResolveDefaultColumns.DefaultColumnAnalyzer,
ResolveDefaultColumns.DefaultColumnOptimizer)
}
}
}

0 comments on commit 164bd6b

Please sign in to comment.