Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-51247][SQL] Move SubstituteExecuteImmediate to 'resolution' batch and prepare it for SQL Scripting local variables. #49993

Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ object FakeV2SessionCatalog extends TableCatalog with FunctionCatalog with Suppo
* if `t` was a permanent table when the current view was created, it
* should still be a permanent table when resolving the current view,
* even if a temp view `t` has been created.
* @param isExecuteImmediate Whether the current plan is created by EXECUTE IMMEDIATE. Used when
* resolving variables, as SQL Scripting local variables should not be
* visible from EXECUTE IMMEDIATE.
* @param outerPlan The query plan from the outer query that can be used to resolve star
* expressions in a subquery.
*/
Expand All @@ -154,6 +157,7 @@ case class AnalysisContext(
referredTempFunctionNames: mutable.Set[String] = mutable.Set.empty,
referredTempVariableNames: Seq[Seq[String]] = Seq.empty,
outerPlan: Option[LogicalPlan] = None,
isExecuteImmediate: Boolean = false,

/**
* This is a bridge state between this fixed-point [[Analyzer]] and a single-pass [[Resolver]].
Expand Down Expand Up @@ -208,7 +212,16 @@ object AnalysisContext {
originContext.relationCache,
viewDesc.viewReferredTempViewNames,
mutable.Set(viewDesc.viewReferredTempFunctionNames: _*),
viewDesc.viewReferredTempVariableNames)
viewDesc.viewReferredTempVariableNames,
isExecuteImmediate = originContext.isExecuteImmediate)
set(context)
try f finally { set(originContext) }
}

def withExecuteImmediateContext[A](f: => A): A = {
val originContext = value.get()
val context = originContext.copy(isExecuteImmediate = true)

set(context)
try f finally { set(originContext) }
}
Expand Down Expand Up @@ -325,7 +338,6 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor

override def batches: Seq[Batch] = Seq(
Batch("Substitution", fixedPoint,
new SubstituteExecuteImmediate(catalogManager),
// This rule optimizes `UpdateFields` expression chains so looks more like optimization rule.
// However, when manipulating deeply nested schema, `UpdateFields` expression tree could be
// very complex and make analysis impossible. Thus we need to optimize `UpdateFields` early
Expand Down Expand Up @@ -401,6 +413,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
ResolveRowLevelCommandAssignments ::
MoveParameterizedQueriesDown ::
BindParameters ::
new SubstituteExecuteImmediate(catalogManager, resolveChild = executeSameContext) ::
typeCoercionRules() ++
Seq(
ResolveWithCTE,
Expand Down Expand Up @@ -1670,6 +1683,9 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
case s: Sort if !s.resolved || s.missingInput.nonEmpty =>
resolveReferencesInSort(s)

// Pass for Execute Immediate as arguments will be resolved by [[SubstituteExecuteImmediate]].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably clean up this later: I don't think we need special arguments resolution in SubstituteExecuteImmediate. EXECUTE IMMEDIATE is a leaf node and it can't resolve arguments to any columns, so it should be fine to go with normal resolution code path and resolve the arguments to session variables.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my knowledge, can you clarify on this?
IIUC, this comment in the code refers to the parameters specified in the USING clause in EXECUTE IMMEDIATE, i.e. their resolution in the query. In the USING clause we have <expr> AS <alias> syntax that can basically rename any session var and we should be able to use local vars here as well.
How would this work through the regular resolution path? Am I missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I put the pass here is to not resolve targetVariables. For example if we have a session variable x and we want to set it with INTO clause of EXECUTE IMMEDIATE, if we don't have this pass here x will get resolved to it's value. This doesn't make sense as we don't want the variables value, we want it's VariableReference

Copy link
Contributor

@cloud-fan cloud-fan Feb 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I see. It's similar to how we resolve SetVariable. We also special-case SetVariable in this rule ResolveReferences

case e : ExecuteImmediateQuery => e

case q: LogicalPlan =>
logTrace(s"Attempting to resolve ${q.simpleString(conf.maxToStringFields)}")
q.mapExpressions(resolveExpressionByPlanChildren(_, q, includeLastResort = true))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,13 @@ case class ExecuteImmediateQuery(
}

/**
* This rule substitutes execute immediate query node with plan that is passed as string literal
* or session parameter.
* This rule substitutes execute immediate query node with fully analyzed
* plan that is passed as string literal or session parameter.
*/
class SubstituteExecuteImmediate(val catalogManager: CatalogManager)
extends Rule[LogicalPlan]
with ColumnResolutionHelper {
class SubstituteExecuteImmediate(
val catalogManager: CatalogManager,
resolveChild: LogicalPlan => LogicalPlan)
extends Rule[LogicalPlan] with ColumnResolutionHelper {

def resolveVariable(e: Expression): Expression = {

Expand Down Expand Up @@ -106,7 +107,12 @@ class SubstituteExecuteImmediate(val catalogManager: CatalogManager)

override def apply(plan: LogicalPlan): LogicalPlan =
plan.resolveOperatorsWithPruning(_.containsPattern(EXECUTE_IMMEDIATE), ruleId) {
case ExecuteImmediateQuery(expressions, query, targetVariables) =>
case e @ ExecuteImmediateQuery(expressions, _, _) if expressions.exists(!_.resolved) =>
e.copy(args = resolveArguments(expressions))

case ExecuteImmediateQuery(expressions, query, targetVariables)
if expressions.forall(_.resolved) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this always going to be true, considering that we didn't match the previous case at this point?

Copy link
Contributor Author

@dusantism-db dusantism-db Feb 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is, i put the additional case to make it more explicit / robust. For example if the order changes or the previous case is moved out of the rule (for example to ResolveReferences), this would still work properly.


val queryString = extractQueryString(query)
val plan = parseStatement(queryString, targetVariables)

Expand All @@ -123,21 +129,16 @@ class SubstituteExecuteImmediate(val catalogManager: CatalogManager)
throw QueryCompilationErrors.invalidQueryMixedQueryParameters()
} else {
if (posNodes.nonEmpty) {
PosParameterizedQuery(
plan,
// We need to resolve arguments before Resolution batch to make sure
// that some rule does not accidentally resolve our parameters.
// We do not want this as they can resolve some unsupported parameters
resolveArguments(expressions))
PosParameterizedQuery(plan, expressions)
} else {
val aliases = expressions.collect {
case e: Alias => e
case u: UnresolvedAttribute => Alias(u, u.nameParts.last)()
case u: VariableReference => Alias(u, u.identifier.name())()
}

if (aliases.size != expressions.size) {
val nonAliases = expressions.filter(attr =>
!attr.isInstanceOf[Alias] && !attr.isInstanceOf[UnresolvedAttribute])
!attr.isInstanceOf[Alias] && !attr.isInstanceOf[VariableReference])

throw QueryCompilationErrors.invalidQueryAllParametersMustBeNamed(nonAliases)
}
Expand All @@ -148,13 +149,19 @@ class SubstituteExecuteImmediate(val catalogManager: CatalogManager)
// We need to resolve arguments before Resolution batch to make sure
// that some rule does not accidentally resolve our parameters.
// We do not want this as they can resolve some unsupported parameters.
resolveArguments(aliases))
aliases)
}
}

// Fully analyze the generated plan. AnalysisContext.withExecuteImmediateContext makes sure
// that SQL scripting local variables will not be accessed from the plan.
val finalPlan = AnalysisContext.withExecuteImmediateContext {
resolveChild(queryPlan)
}

if (targetVariables.nonEmpty) {
SetVariable(targetVariables, queryPlan)
} else { queryPlan }
SetVariable(targetVariables, finalPlan)
} else { finalPlan }
}

private def parseStatement(
Expand Down