Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-51119][SQL] Readers on executors resolving EXISTS_DEFAULT should not call catalogs #49840

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

szehon-ho
Copy link
Contributor

@szehon-ho szehon-ho commented Feb 6, 2025

What changes were proposed in this pull request?

Simplify the resolution of EXISTS_DEFAULT on ResolveDefaultColumns::getExistenceDefaultValues(), which are called from file readers on executors.

Why are the changes needed?

Spark executors unnecessary contacts catalogs when resolving EXISTS_DEFAULTS (used for default values for existing data) for a column.

Detailed explanation: The code path for default values first runs an analysis of the user-provided CURRENT_DEFAULT value for a column (to evaluate functions, etc), and uses the result sql to save as the column's EXISTS_DEFAULT. EXISTS_DEFAULT is then used to avoid having to rewrite existing data using backfill to fill this value in the files. When reading existing files, Spark then attempts to resolve the EXISTS_DEFAULT metadata and use the value for null values it finds in that column.

The problem is, this second step on read redundantly runs all the analyzer rules again and finish analysis rules on EXISTS_DEFAULTS, some of which contact the catalog unnecessarily. Some of those rules are unnecessary as they were already run before to get the value.

Worse, it may cause exceptions if the executors are not configured properly to reach the catalog, such as:

Caused by: org.apache.spark.SparkException: Failed during instantiating constructor for catalog 'spark_catalog': org.apache.spark.sql.delta.catalog.DeltaCatalog. at org.apache.spark.sql.errors.QueryExecutionErrors$.failedToInstantiateConstructorForCatalogError(QueryExecutionErrors.scala:2400) at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:84) at org.apache.spark.sql.connector.catalog.CatalogManager.loadV2SessionCatalog(CatalogManager.scala:72) at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$v2SessionCatalog$2(CatalogManager.scala:94) at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86) at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$v2SessionCatalog$1(CatalogManager.scala:94) at scala.Option.map(Option.scala:230) at org.apache.spark.sql.connector.catalog.CatalogManager.v2SessionCatalog(CatalogManager.scala:93) at org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:55) at org.apache.spark.sql.connector.catalog.CatalogManager.currentCatalog(CatalogManager.scala:130) at org.apache.spark.sql.connector.catalog.CatalogManager.currentNamespace(CatalogManager.scala:101) at org.apache.spark.sql.catalyst.optimizer.ReplaceCurrentLike.apply(finishAnalysis.scala:172) at org.apache.spark.sql.catalyst.optimizer.ReplaceCurrentLike.apply(finishAnalysis.scala:169) at org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.$anonfun$apply$1(Optimizer.scala:502) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:91) at org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.apply(Optimizer.scala:502) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.analyze(ResolveDefaultColumnsUtil.scala:301) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.analyze(ResolveDefaultColumnsUtil.scala:266) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$getExistenceDefaultValues$2(ResolveDefaultColumnsUtil.scala:427) at scala.Option.map(Option.scala:230) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$getExistenceDefaultValues$1(ResolveDefaultColumnsUtil.scala:425) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.getExistenceDefaultValues(ResolveDefaultColumnsUtil.scala:423) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$existenceDefaultValues$2(ResolveDefaultColumnsUtil.scala:498) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.existenceDefaultValues(ResolveDefaultColumnsUtil.scala:496) at org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.existenceDefaultValues(ResolveDefaultColumnsUtil.scala) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:350) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:373) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.$anonfun$apply$5(ParquetFileFormat.scala:441) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1561) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.apply(ParquetFileFormat.scala:428) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.apply(ParquetFileFormat.scala:258) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:639) ... 21 more Caused by: java.lang.IllegalStateException: No active or default Spark session found

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added a test in StructTypeSuite. I had to expose for testing some members in ResolveDefaultColumns.

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the SQL label Feb 6, 2025
Copy link
Contributor Author

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving some explanations

*
* VisibleForTesting
*/
def analyzeExistingDefault(field: StructField,
Copy link
Contributor Author

@szehon-ho szehon-ho Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is simpler version of analyze, used for existsDefaultValues (which is called from executors). Make in another method, as we have another may have an opportunity to simplify further, but for now it seems some part of analysis is still needed to resolve some functions like array(). The problematic code of FinishAnalysis was removed though.

/**
* Visible for testing
*/
def setAnalyzerAndOptimizer(analyzer: Analyzer, optimizer: Optimizer): Unit = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its hard to reproduce the issue in unit test, so I end up mocking these members to verify that the catalogs are not called.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant