Skip to content

Commit f959716

Browse files
szehon-hodongjoon-hyun
authored andcommitted
[SPARK-51119][SQL] Readers on executors resolving EXISTS_DEFAULT should not call catalogs
### What changes were proposed in this pull request? Simplify the resolution of EXISTS_DEFAULT on ResolveDefaultColumns::getExistenceDefaultValues(), which are called from file readers on executors. ### Why are the changes needed? Spark executors unnecessary contacts catalogs when resolving EXISTS_DEFAULTS (used for default values for existing data) for a column. Detailed explanation: The code path for default values first runs an analysis of the user-provided CURRENT_DEFAULT value for a column (to evaluate functions, etc), and uses the result sql to save as the column's EXISTS_DEFAULT. EXISTS_DEFAULT is then used to avoid having to rewrite existing data using backfill to fill this value in the files. When reading existing files, Spark then attempts to resolve the EXISTS_DEFAULT metadata and use the value for null values it finds in that column. The problem is, this second step on read redundantly runs all the analyzer rules again and finish analysis rules on EXISTS_DEFAULTS, some of which contact the catalog unnecessarily. Some of those rules are unnecessary as they were already run before to get the value. Worse, it may cause exceptions if the executors are not configured properly to reach the catalog, such as: ``` Caused by: org.apache.spark.SparkException: Failed during instantiating constructor for catalog 'spark_catalog': org.apache.spark.sql.delta.catalog.DeltaCatalog. at org.apache.spark.sql.errors.QueryExecutionErrors$.failedToInstantiateConstructorForCatalogError(QueryExecutionErrors.scala:2400) at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:84) at org.apache.spark.sql.connector.catalog.CatalogManager.loadV2SessionCatalog(CatalogManager.scala:72) at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$v2SessionCatalog$2(CatalogManager.scala:94) at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86) at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$v2SessionCatalog$1(CatalogManager.scala:94) at scala.Option.map(Option.scala:230) at org.apache.spark.sql.connector.catalog.CatalogManager.v2SessionCatalog(CatalogManager.scala:93) at org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:55) at org.apache.spark.sql.connector.catalog.CatalogManager.currentCatalog(CatalogManager.scala:130) at org.apache.spark.sql.connector.catalog.CatalogManager.currentNamespace(CatalogManager.scala:101) at org.apache.spark.sql.catalyst.optimizer.ReplaceCurrentLike.apply(finishAnalysis.scala:172) at org.apache.spark.sql.catalyst.optimizer.ReplaceCurrentLike.apply(finishAnalysis.scala:169) at org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.$anonfun$apply$1(Optimizer.scala:502) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:91) at org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.apply(Optimizer.scala:502) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.analyze(ResolveDefaultColumnsUtil.scala:301) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.analyze(ResolveDefaultColumnsUtil.scala:266) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$getExistenceDefaultValues$2(ResolveDefaultColumnsUtil.scala:427) at scala.Option.map(Option.scala:230) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$getExistenceDefaultValues$1(ResolveDefaultColumnsUtil.scala:425) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.getExistenceDefaultValues(ResolveDefaultColumnsUtil.scala:423) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$existenceDefaultValues$2(ResolveDefaultColumnsUtil.scala:498) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.existenceDefaultValues(ResolveDefaultColumnsUtil.scala:496) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.existenceDefaultValues(ResolveDefaultColumnsUtil.scala) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:350) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:373) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.$anonfun$apply$5(ParquetFileFormat.scala:441) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1561) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.apply(ParquetFileFormat.scala:428) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.apply(ParquetFileFormat.scala:258) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:639) ... 21 more Caused by: java.lang.IllegalStateException: No active or default Spark session found ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added a test in StructTypeSuite. I had to expose for testing some members in ResolveDefaultColumns. ### Was this patch authored or co-authored using generative AI tooling? No Closes #49840 from szehon-ho/SPARK-51119. Authored-by: Szehon Ho <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 937decc) Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 371541d commit f959716

File tree

2 files changed

+47
-16
lines changed

2 files changed

+47
-16
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,11 @@ import scala.util.Try
4040
import org.apache.commons.codec.binary.{Hex => ApacheHex}
4141
import org.json4s.JsonAST._
4242

43-
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, ScalaReflection}
43+
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, FunctionIdentifier, InternalRow, ScalaReflection}
44+
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, UnresolvedFunction}
4445
import org.apache.spark.sql.catalyst.expressions.codegen._
4546
import org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils
47+
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
4648
import org.apache.spark.sql.catalyst.trees.TreePattern
4749
import org.apache.spark.sql.catalyst.trees.TreePattern.{LITERAL, NULL_LITERAL, TRUE_OR_FALSE_LITERAL}
4850
import org.apache.spark.sql.catalyst.types._
@@ -265,6 +267,23 @@ object Literal {
265267
s"Literal must have a corresponding value to ${dataType.catalogString}, " +
266268
s"but class ${Utils.getSimpleName(value.getClass)} found.")
267269
}
270+
271+
def fromSQL(sql: String): Expression = {
272+
CatalystSqlParser.parseExpression(sql).transformUp {
273+
case u: UnresolvedFunction =>
274+
assert(u.nameParts.length == 1)
275+
assert(!u.isDistinct)
276+
assert(u.filter.isEmpty)
277+
assert(!u.ignoreNulls)
278+
assert(u.orderingWithinGroup.isEmpty)
279+
assert(!u.isInternal)
280+
FunctionRegistry.builtin.lookupFunction(FunctionIdentifier(u.nameParts.head), u.arguments)
281+
} match {
282+
case c: Cast if c.needsTimeZone =>
283+
c.withTimeZone(SQLConf.get.sessionLocalTimeZone)
284+
case e: Expression => e
285+
}
286+
}
268287
}
269288

270289
/**

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,14 @@ package org.apache.spark.sql.catalyst.util
1919

2020
import scala.collection.mutable.ArrayBuffer
2121

22-
import org.apache.spark.{SparkException, SparkThrowable, SparkUnsupportedOperationException}
22+
import org.apache.spark.{SparkThrowable, SparkUnsupportedOperationException}
2323
import org.apache.spark.internal.{Logging, MDC}
2424
import org.apache.spark.internal.LogKeys._
2525
import org.apache.spark.sql.AnalysisException
2626
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
2727
import org.apache.spark.sql.catalyst.analysis._
2828
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, InMemoryCatalog, SessionCatalog}
2929
import org.apache.spark.sql.catalyst.expressions._
30-
import org.apache.spark.sql.catalyst.expressions.{Literal => ExprLiteral}
3130
import org.apache.spark.sql.catalyst.optimizer.{ConstantFolding, Optimizer}
3231
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
3332
import org.apache.spark.sql.catalyst.plans.logical._
@@ -320,6 +319,29 @@ object ResolveDefaultColumns extends QueryErrorsBase
320319
coerceDefaultValue(analyzed, dataType, statementType, colName, defaultSQL)
321320
}
322321

322+
/**
323+
* Analyze EXISTS_DEFAULT value. This skips some steps of analyze as most of the
324+
* analysis has been done before.
325+
*/
326+
private def analyzeExistenceDefaultValue(field: StructField): Expression = {
327+
val defaultSQL = field.metadata.getString(EXISTS_DEFAULT_COLUMN_METADATA_KEY)
328+
329+
// Parse the expression.
330+
val expr = Literal.fromSQL(defaultSQL)
331+
332+
// Check invariants
333+
if (expr.containsPattern(PLAN_EXPRESSION)) {
334+
throw QueryCompilationErrors.defaultValuesMayNotContainSubQueryExpressions(
335+
"", field.name, defaultSQL)
336+
}
337+
if (!expr.resolved) {
338+
throw QueryCompilationErrors.defaultValuesUnresolvedExprError(
339+
"", field.name, defaultSQL, null)
340+
}
341+
342+
coerceDefaultValue(expr, field.dataType, "", field.name, defaultSQL)
343+
}
344+
323345
/**
324346
* If the provided default value is a literal of a wider type than the target column,
325347
* but the literal value fits within the narrower type, just coerce it for convenience.
@@ -405,19 +427,9 @@ object ResolveDefaultColumns extends QueryErrorsBase
405427
def getExistenceDefaultValues(schema: StructType): Array[Any] = {
406428
schema.fields.map { field: StructField =>
407429
val defaultValue: Option[String] = field.getExistenceDefaultValue()
408-
defaultValue.map { text: String =>
409-
val expr = try {
410-
val expr = analyze(field, "", EXISTS_DEFAULT_COLUMN_METADATA_KEY)
411-
expr match {
412-
case _: ExprLiteral | _: Cast => expr
413-
}
414-
} catch {
415-
// AnalysisException thrown from analyze is already formatted, throw it directly.
416-
case ae: AnalysisException => throw ae
417-
case _: MatchError =>
418-
throw SparkException.internalError(s"parse existence default as literal err," +
419-
s" field name: ${field.name}, value: $text")
420-
}
430+
defaultValue.map { _: String =>
431+
val expr = analyzeExistenceDefaultValue(field)
432+
421433
// The expression should be a literal value by this point, possibly wrapped in a cast
422434
// function. This is enforced by the execution of commands that assign default values.
423435
expr.eval()

0 commit comments

Comments
 (0)