Skip to content

Commit 26febf7

Browse files
dusantism-dbcloud-fan
authored andcommitted
[SPARK-51247][SQL] Move SubstituteExecuteImmediate to 'resolution' batch and prepare it for SQL Scripting local variables
### What changes were proposed in this pull request? This PR changes `SubstituteExecuteImmediate` to analyze it's entire subtree within a scoped context. This will allow us to disable SQL scripting local variables in the subtree, when they are added, which is necessary in order to sandbox the generated plan. This PR also moves `SubstituteExecuteImmediate` to `resolution` batch in the analyzer. This is necessary in order to resolve arguments of EXECUTE IMMEDIATE properly, notably if the EXECUTE IMMEDIATE is the child of a `ParameterizedQuery`. This ensured proper resolution ordering i.e. first all parameters of EXECUTE IMMEDIATE will be resolved, and only then will the generated query itself be analyzed. Local variables PR - #49445 ### Why are the changes needed? They are necessaty for local variables support in SQL scripting. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing unit tests and golden files. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #49993 from dusantism-db/execute-immediate-resolution-batch. Authored-by: Dušan Tišma <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 5759882 commit 26febf7

File tree

2 files changed

+47
-19
lines changed

2 files changed

+47
-19
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,9 @@ object FakeV2SessionCatalog extends TableCatalog with FunctionCatalog with Suppo
137137
* if `t` was a permanent table when the current view was created, it
138138
* should still be a permanent table when resolving the current view,
139139
* even if a temp view `t` has been created.
140+
* @param isExecuteImmediate Whether the current plan is created by EXECUTE IMMEDIATE. Used when
141+
* resolving variables, as SQL Scripting local variables should not be
142+
* visible from EXECUTE IMMEDIATE.
140143
* @param outerPlan The query plan from the outer query that can be used to resolve star
141144
* expressions in a subquery.
142145
*/
@@ -154,6 +157,7 @@ case class AnalysisContext(
154157
referredTempFunctionNames: mutable.Set[String] = mutable.Set.empty,
155158
referredTempVariableNames: Seq[Seq[String]] = Seq.empty,
156159
outerPlan: Option[LogicalPlan] = None,
160+
isExecuteImmediate: Boolean = false,
157161

158162
/**
159163
* This is a bridge state between this fixed-point [[Analyzer]] and a single-pass [[Resolver]].
@@ -208,7 +212,16 @@ object AnalysisContext {
208212
originContext.relationCache,
209213
viewDesc.viewReferredTempViewNames,
210214
mutable.Set(viewDesc.viewReferredTempFunctionNames: _*),
211-
viewDesc.viewReferredTempVariableNames)
215+
viewDesc.viewReferredTempVariableNames,
216+
isExecuteImmediate = originContext.isExecuteImmediate)
217+
set(context)
218+
try f finally { set(originContext) }
219+
}
220+
221+
def withExecuteImmediateContext[A](f: => A): A = {
222+
val originContext = value.get()
223+
val context = originContext.copy(isExecuteImmediate = true)
224+
212225
set(context)
213226
try f finally { set(originContext) }
214227
}
@@ -325,7 +338,6 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
325338

326339
override def batches: Seq[Batch] = Seq(
327340
Batch("Substitution", fixedPoint,
328-
new SubstituteExecuteImmediate(catalogManager),
329341
// This rule optimizes `UpdateFields` expression chains so looks more like optimization rule.
330342
// However, when manipulating deeply nested schema, `UpdateFields` expression tree could be
331343
// very complex and make analysis impossible. Thus we need to optimize `UpdateFields` early
@@ -401,6 +413,10 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
401413
ResolveRowLevelCommandAssignments ::
402414
MoveParameterizedQueriesDown ::
403415
BindParameters ::
416+
new SubstituteExecuteImmediate(
417+
catalogManager,
418+
resolveChild = executeSameContext,
419+
checkAnalysis = checkAnalysis) ::
404420
typeCoercionRules() ++
405421
Seq(
406422
ResolveWithCTE,
@@ -1670,6 +1686,9 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
16701686
case s: Sort if !s.resolved || s.missingInput.nonEmpty =>
16711687
resolveReferencesInSort(s)
16721688

1689+
// Pass for Execute Immediate as arguments will be resolved by [[SubstituteExecuteImmediate]].
1690+
case e : ExecuteImmediateQuery => e
1691+
16731692
case q: LogicalPlan =>
16741693
logTrace(s"Attempting to resolve ${q.simpleString(conf.maxToStringFields)}")
16751694
q.mapExpressions(resolveExpressionByPlanChildren(_, q, includeLastResort = true))

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/executeImmediate.scala

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,14 @@ case class ExecuteImmediateQuery(
4444
}
4545

4646
/**
47-
* This rule substitutes execute immediate query node with plan that is passed as string literal
48-
* or session parameter.
47+
* This rule substitutes execute immediate query node with fully analyzed
48+
* plan that is passed as string literal or session parameter.
4949
*/
50-
class SubstituteExecuteImmediate(val catalogManager: CatalogManager)
51-
extends Rule[LogicalPlan]
52-
with ColumnResolutionHelper {
50+
class SubstituteExecuteImmediate(
51+
val catalogManager: CatalogManager,
52+
resolveChild: LogicalPlan => LogicalPlan,
53+
checkAnalysis: LogicalPlan => Unit)
54+
extends Rule[LogicalPlan] with ColumnResolutionHelper {
5355

5456
def resolveVariable(e: Expression): Expression = {
5557

@@ -106,7 +108,12 @@ class SubstituteExecuteImmediate(val catalogManager: CatalogManager)
106108

107109
override def apply(plan: LogicalPlan): LogicalPlan =
108110
plan.resolveOperatorsWithPruning(_.containsPattern(EXECUTE_IMMEDIATE), ruleId) {
109-
case ExecuteImmediateQuery(expressions, query, targetVariables) =>
111+
case e @ ExecuteImmediateQuery(expressions, _, _) if expressions.exists(!_.resolved) =>
112+
e.copy(args = resolveArguments(expressions))
113+
114+
case ExecuteImmediateQuery(expressions, query, targetVariables)
115+
if expressions.forall(_.resolved) =>
116+
110117
val queryString = extractQueryString(query)
111118
val plan = parseStatement(queryString, targetVariables)
112119

@@ -123,21 +130,16 @@ class SubstituteExecuteImmediate(val catalogManager: CatalogManager)
123130
throw QueryCompilationErrors.invalidQueryMixedQueryParameters()
124131
} else {
125132
if (posNodes.nonEmpty) {
126-
PosParameterizedQuery(
127-
plan,
128-
// We need to resolve arguments before Resolution batch to make sure
129-
// that some rule does not accidentally resolve our parameters.
130-
// We do not want this as they can resolve some unsupported parameters
131-
resolveArguments(expressions))
133+
PosParameterizedQuery(plan, expressions)
132134
} else {
133135
val aliases = expressions.collect {
134136
case e: Alias => e
135-
case u: UnresolvedAttribute => Alias(u, u.nameParts.last)()
137+
case u: VariableReference => Alias(u, u.identifier.name())()
136138
}
137139

138140
if (aliases.size != expressions.size) {
139141
val nonAliases = expressions.filter(attr =>
140-
!attr.isInstanceOf[Alias] && !attr.isInstanceOf[UnresolvedAttribute])
142+
!attr.isInstanceOf[Alias] && !attr.isInstanceOf[VariableReference])
141143

142144
throw QueryCompilationErrors.invalidQueryAllParametersMustBeNamed(nonAliases)
143145
}
@@ -148,13 +150,20 @@ class SubstituteExecuteImmediate(val catalogManager: CatalogManager)
148150
// We need to resolve arguments before Resolution batch to make sure
149151
// that some rule does not accidentally resolve our parameters.
150152
// We do not want this as they can resolve some unsupported parameters.
151-
resolveArguments(aliases))
153+
aliases)
152154
}
153155
}
154156

157+
// Fully analyze the generated plan. AnalysisContext.withExecuteImmediateContext makes sure
158+
// that SQL scripting local variables will not be accessed from the plan.
159+
val finalPlan = AnalysisContext.withExecuteImmediateContext {
160+
resolveChild(queryPlan)
161+
}
162+
checkAnalysis(finalPlan)
163+
155164
if (targetVariables.nonEmpty) {
156-
SetVariable(targetVariables, queryPlan)
157-
} else { queryPlan }
165+
SetVariable(targetVariables, finalPlan)
166+
} else { finalPlan }
158167
}
159168

160169
private def parseStatement(

0 commit comments

Comments
 (0)