-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-51247][SQL] Move SubstituteExecuteImmediate to 'resolution' batch and prepare it for SQL Scripting local variables. #49993
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,12 +44,14 @@ case class ExecuteImmediateQuery( | |
} | ||
|
||
/** | ||
* This rule substitutes execute immediate query node with plan that is passed as string literal | ||
* or session parameter. | ||
* This rule substitutes execute immediate query node with fully analyzed | ||
* plan that is passed as string literal or session parameter. | ||
*/ | ||
class SubstituteExecuteImmediate(val catalogManager: CatalogManager) | ||
extends Rule[LogicalPlan] | ||
with ColumnResolutionHelper { | ||
class SubstituteExecuteImmediate( | ||
val catalogManager: CatalogManager, | ||
resolveChild: LogicalPlan => LogicalPlan, | ||
checkAnalysis: LogicalPlan => Unit) | ||
extends Rule[LogicalPlan] with ColumnResolutionHelper { | ||
|
||
def resolveVariable(e: Expression): Expression = { | ||
|
||
|
@@ -106,7 +108,12 @@ class SubstituteExecuteImmediate(val catalogManager: CatalogManager) | |
|
||
override def apply(plan: LogicalPlan): LogicalPlan = | ||
plan.resolveOperatorsWithPruning(_.containsPattern(EXECUTE_IMMEDIATE), ruleId) { | ||
case ExecuteImmediateQuery(expressions, query, targetVariables) => | ||
case e @ ExecuteImmediateQuery(expressions, _, _) if expressions.exists(!_.resolved) => | ||
e.copy(args = resolveArguments(expressions)) | ||
|
||
case ExecuteImmediateQuery(expressions, query, targetVariables) | ||
if expressions.forall(_.resolved) => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. isn't this always going to be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is, i put the additional case to make it more explicit / robust. For example if the order changes or the previous case is moved out of the rule (for example to |
||
|
||
val queryString = extractQueryString(query) | ||
val plan = parseStatement(queryString, targetVariables) | ||
|
||
|
@@ -123,21 +130,16 @@ class SubstituteExecuteImmediate(val catalogManager: CatalogManager) | |
throw QueryCompilationErrors.invalidQueryMixedQueryParameters() | ||
} else { | ||
if (posNodes.nonEmpty) { | ||
PosParameterizedQuery( | ||
plan, | ||
// We need to resolve arguments before Resolution batch to make sure | ||
// that some rule does not accidentally resolve our parameters. | ||
// We do not want this as they can resolve some unsupported parameters | ||
resolveArguments(expressions)) | ||
PosParameterizedQuery(plan, expressions) | ||
} else { | ||
val aliases = expressions.collect { | ||
case e: Alias => e | ||
case u: UnresolvedAttribute => Alias(u, u.nameParts.last)() | ||
case u: VariableReference => Alias(u, u.identifier.name())() | ||
} | ||
|
||
if (aliases.size != expressions.size) { | ||
val nonAliases = expressions.filter(attr => | ||
!attr.isInstanceOf[Alias] && !attr.isInstanceOf[UnresolvedAttribute]) | ||
!attr.isInstanceOf[Alias] && !attr.isInstanceOf[VariableReference]) | ||
|
||
throw QueryCompilationErrors.invalidQueryAllParametersMustBeNamed(nonAliases) | ||
} | ||
|
@@ -148,13 +150,20 @@ class SubstituteExecuteImmediate(val catalogManager: CatalogManager) | |
// We need to resolve arguments before Resolution batch to make sure | ||
// that some rule does not accidentally resolve our parameters. | ||
// We do not want this as they can resolve some unsupported parameters. | ||
resolveArguments(aliases)) | ||
aliases) | ||
} | ||
} | ||
|
||
// Fully analyze the generated plan. AnalysisContext.withExecuteImmediateContext makes sure | ||
// that SQL scripting local variables will not be accessed from the plan. | ||
val finalPlan = AnalysisContext.withExecuteImmediateContext { | ||
resolveChild(queryPlan) | ||
cloud-fan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
checkAnalysis(finalPlan) | ||
|
||
if (targetVariables.nonEmpty) { | ||
SetVariable(targetVariables, queryPlan) | ||
} else { queryPlan } | ||
SetVariable(targetVariables, finalPlan) | ||
} else { finalPlan } | ||
} | ||
|
||
private def parseStatement( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can probably clean up this later: I don't think we need special arguments resolution in
SubstituteExecuteImmediate
. EXECUTE IMMEDIATE is a leaf node and it can't resolve arguments to any columns, so it should be fine to go with normal resolution code path and resolve the arguments to session variables.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For my knowledge, can you clarify on this?
IIUC, this comment in the code refers to the parameters specified in the USING clause in EXECUTE IMMEDIATE, i.e. their resolution in the query. In the USING clause we have
<expr> AS <alias>
syntax that can basically rename any session var and we should be able to use local vars here as well.How would this work through the regular resolution path? Am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I put the pass here is to not resolve
targetVariables
. For example if we have a session variablex
and we want to set it with INTO clause of EXECUTE IMMEDIATE, if we don't have this pass herex
will get resolved to it's value. This doesn't make sense as we don't want the variables value, we want it'sVariableReference
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh I see. It's similar to how we resolve
SetVariable
. We also special-caseSetVariable
in this ruleResolveReferences