Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-51119][SQL] Readers on executors resolving EXISTS_DEFAULT should not call catalogs #49840

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ import scala.util.Try
import org.apache.commons.codec.binary.{Hex => ApacheHex}
import org.json4s.JsonAST._

import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, ScalaReflection}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, FunctionIdentifier, InternalRow, ScalaReflection}
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, UnresolvedFunction}
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.trees.TreePattern
import org.apache.spark.sql.catalyst.trees.TreePattern.{LITERAL, NULL_LITERAL, TRUE_OR_FALSE_LITERAL}
import org.apache.spark.sql.catalyst.types._
Expand Down Expand Up @@ -265,6 +267,23 @@ object Literal {
s"Literal must have a corresponding value to ${dataType.catalogString}, " +
s"but class ${Utils.getSimpleName(value.getClass)} found.")
}

def fromSQL(sql: String): Expression = {
CatalystSqlParser.parseExpression(sql).transformUp {
szehon-ho marked this conversation as resolved.
Show resolved Hide resolved
case u: UnresolvedFunction =>
assert(u.nameParts.length == 1)
assert(!u.isDistinct)
assert(u.filter.isEmpty)
assert(!u.ignoreNulls)
assert(u.orderingWithinGroup.isEmpty)
assert(!u.isInternal)
FunctionRegistry.builtin.lookupFunction(FunctionIdentifier(u.nameParts.head), u.arguments)
} match {
case c: Cast if c.needsTimeZone =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the CAST can be nested inside array/map/struct, we should put this case match inside the transformUp, together with case u: UnresolvedFunction

@szehon-ho can you make a followup PR for it?

Copy link
Contributor Author

@szehon-ho szehon-ho Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan sure, let me do that.

BTW, I looked a little bit and couldnt reproduce a failure with the current implementation using a following unit test with a nested cast:

  test("SPARK-51119: array of timestamp should have timezone if default values castable") {
    withTable("t") {
      sql(s"CREATE TABLE t(key int, c ARRAY<STRING> DEFAULT " +
        s"ARRAY(CAST(timestamp '2018-11-17' AS STRING))) " +
        s"USING parquet")
      sql("INSERT INTO t (key) VALUES(1)")
      checkAnswer(sql("select * from t"), Row(1, Array("2018-11-17 00:00:00")))
    }
  }

Unlike the failing case of top-level cast:

  test("SPARK-46958: timestamp should have timezone for resolvable if default values castable") {
    val defaults = Seq("timestamp '2018-11-17'", "CAST(timestamp '2018-11-17' AS STRING)")
    defaults.foreach { default =>
      withTable("t") {
        sql(s"CREATE TABLE t(key int, c STRING DEFAULT $default) " +
          s"USING parquet")
        sql("INSERT INTO t (key) VALUES(1)")
        checkAnswer(sql("select * from t"), Row(1, "2018-11-17 00:00:00"))
      }
    }
  }

EXISTS_DEFAULT is saved without a cast in the first case: ARRAY('2018-11-17 00:00:00') (looks like it got evaluated)
and with a cast in the second case: CAST(TIMESTAMP '2018-11-17 00:00:00' AS STRING)

So I think in this particular scenario, it doesnt matter. But agree that it is better to have it, as we are making a generic method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm looking at the previous test failure

Cause: org.apache.spark.sql.AnalysisException: [INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION] Failed to execute  command because the destination column or variable `c` has a DEFAULT value CAST(TIMESTAMP '2018-11-17 00:00:00' AS STRING), which fails to resolve as a valid expression. SQLSTATE: 42623

CAST(TIMESTAMP '2018-11-17 00:00:00' AS STRING) can't be generated by Literal#sql. Seems we have some misunderstanding about how this existing default string is generated. @szehon-ho can you take a closer look?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

synced offline, see the other comment.

c.withTimeZone(SQLConf.get.sessionLocalTimeZone)
case e: Expression => e
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ package org.apache.spark.sql.catalyst.util

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.{SparkException, SparkThrowable, SparkUnsupportedOperationException}
import org.apache.spark.{SparkThrowable, SparkUnsupportedOperationException}
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.{Literal => ExprLiteral}
import org.apache.spark.sql.catalyst.optimizer.{ConstantFolding, Optimizer}
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -320,6 +319,29 @@ object ResolveDefaultColumns extends QueryErrorsBase
coerceDefaultValue(analyzed, dataType, statementType, colName, defaultSQL)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the CAST is added here, but it should be constant-folded before we generate the existing default string. We need to debug it.

Copy link
Contributor Author

@szehon-ho szehon-ho Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

synced with @cloud-fan offline, this is not constant folded after this line, when analyzing to create EXISTS_DEFAULT. So in the input of analyzeExistsDefault() , EXISTS_DEFAULT sometimes has a top level CAST

}

/**
* Analyze EXISTS_DEFAULT value. This skips some steps of analyze as most of the
* analysis has been done before.
*/
private def analyzeExistenceDefaultValue(field: StructField): Expression = {
val defaultSQL = field.metadata.getString(EXISTS_DEFAULT_COLUMN_METADATA_KEY)

// Parse the expression.
val expr = Literal.fromSQL(defaultSQL)

// Check invariants
if (expr.containsPattern(PLAN_EXPRESSION)) {
throw QueryCompilationErrors.defaultValuesMayNotContainSubQueryExpressions(
"", field.name, defaultSQL)
}
if (!expr.resolved) {
throw QueryCompilationErrors.defaultValuesUnresolvedExprError(
"", field.name, defaultSQL, null)
}

coerceDefaultValue(expr, field.dataType, "", field.name, defaultSQL)
}

/**
* If the provided default value is a literal of a wider type than the target column,
* but the literal value fits within the narrower type, just coerce it for convenience.
Expand Down Expand Up @@ -405,19 +427,9 @@ object ResolveDefaultColumns extends QueryErrorsBase
def getExistenceDefaultValues(schema: StructType): Array[Any] = {
schema.fields.map { field: StructField =>
val defaultValue: Option[String] = field.getExistenceDefaultValue()
defaultValue.map { text: String =>
val expr = try {
val expr = analyze(field, "", EXISTS_DEFAULT_COLUMN_METADATA_KEY)
expr match {
case _: ExprLiteral | _: Cast => expr
}
} catch {
// AnalysisException thrown from analyze is already formatted, throw it directly.
case ae: AnalysisException => throw ae
case _: MatchError =>
throw SparkException.internalError(s"parse existence default as literal err," +
s" field name: ${field.name}, value: $text")
}
defaultValue.map { _: String =>
val expr = analyzeExistenceDefaultValue(field)

// The expression should be a literal value by this point, possibly wrapped in a cast
// function. This is enforced by the execution of commands that assign default values.
expr.eval()
Expand Down