-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-48530][SQL] Support for local variables in SQL Scripting #49445
base: master
Are you sure you want to change the base?
[SPARK-48530][SQL] Support for local variables in SQL Scripting #49445
Conversation
…e ResolveCatalogs to support local vars, also to throw error when creating qualified local vars
…e paths in resolvecatalogs
…th CREATE/SET/DROP.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
Outdated
Show resolved
Hide resolved
throw QueryCompilationErrors.unresolvedVariableError( | ||
nameParts, Seq(CatalogManager.SYSTEM_CATALOG_NAME, CatalogManager.SESSION_NAMESPACE)) | ||
} | ||
nameParts, Seq(CatalogManager.SYSTEM_CATALOG_NAME, ident.namespace().head)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's check if it makes sense to throw error like this (with SYSTEM_CATALOG_NAME
) in the case of local vars or should we create a similar error, but specific to scripting?
This might imply that you can access the local var in system.<label>.<varName>
format which is not correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Local variables can only be qualified by a label. Label's themselves are non qualified.
parameters can be qualified by the procedure name. Procedure names can be qualified.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/VariableManager.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
Outdated
Show resolved
Hide resolved
@@ -1591,7 +1591,10 @@ case class TableSpec( | |||
case class CreateVariable( | |||
name: LogicalPlan, | |||
defaultExpr: DefaultValueExpression, | |||
replace: Boolean) extends UnaryCommand with SupportsSubquery { | |||
replace: Boolean, | |||
// When false, this node can create local variables from SQL scripting. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// When false, this node can create local variables from SQL scripting. | |
// When false, this node creates local variables from SQL scripting. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe name it isLocal
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isLocal makes less sense to me, as this field does not mean it is necessarily local, just that it is allowed to be local if in a SQL script.
@@ -89,4 +96,6 @@ class SqlScriptingExecutionFrame( | |||
* @param label | |||
* Label of the scope. | |||
*/ | |||
class SqlScriptingExecutionScope(val label: String) | |||
class SqlScriptingExecutionScope(val label: String) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
each scope has a default label name if not specified by users? Can users reference to this default label name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Each scope has a random UUID which is assigned as the label name, it is different for every scope and the users do not know its value.
.orElse(throw unresolvedVariableError(nameParts, identifier.namespace().toIndexedSeq)) | ||
.map(_.variables.put(name, varDef)) | ||
case _ => | ||
throw SparkException.internalError("ScriptingVariableManager.set expects 1 or 2 nameParts.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can users hit this error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, users should not be able to hit this error.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlScriptingVariableManager.scala
Outdated
Show resolved
Hide resolved
case LookupVariableMode.EXCLUDE_LOCAL_VARS => true | ||
// EXECUTE IMMEDIATE cannot access local variables from the SQL string, | ||
// only from USING and INTO clauses. | ||
case LookupVariableMode.DEFAULT => AnalysisContext.get.isExecuteImmediate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when do we hit this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the logic is simple: we should look up local variables if the scripting local variable manager ThreadLocal is present, unless we are inside EXECUTE IMMEDIATE. Looking at the code seems all call sites already know if we should include local variables or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, this logic was leftover from the previous iteration and was obsolete. I've removed the parameter entirely.
@cloud-fan I've resolved the comments, please take a look. |
We still need to know that the label is not user generated. So we do not expose the name in error messages.
Sent from my iPhone
On Jan 27, 2025, at 4:24 AM, Dušan Tišma ***@***.***> wrote:
@dusantism-db commented on this pull request.
________________________________
In sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionContext.scala<#49445 (comment)>:
@@ -89,4 +96,6 @@ class SqlScriptingExecutionFrame(
* @param label
* Label of the scope.
*/
-class SqlScriptingExecutionScope(val label: String)
+class SqlScriptingExecutionScope(val label: String) {
Each scope has a random UUID which is assigned as the label name, it is different for every scope and the users do not know its value.
—
Reply to this email directly, view it on GitHub<#49445 (comment)>, or unsubscribe<https://github.com/notifications/unsubscribe-auth/AA22CFGDJXK27P7A4ST3NY32MYQOVAVCNFSM6AAAAABU64U3PCVHI2DSMVQWIX3LMV43YUDVNRWFEZLROVSXG5CSMV3GSZLXHMZDKNZVGI4TKMRTGU>.
You are receiving this because you were mentioned.Message ID: ***@***.***>
|
SqlScriptingLocalVariableManager.get() | ||
// If sessionOnly is set to true lookup only session variables. | ||
.filterNot(_ => AnalysisContext.get.isExecuteImmediate) | ||
// If variable name is qualified with system.session.<varName> treat it as a session variable. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: session
can't be a label name so we can directly look up local variables if nameParts.length <= 2
and then fallback to session variable lookup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is fine to leave the explicit checks here, because it's more performant this way as local variable lookup will iterate through all frames and scopes. There's no reason to do that if we have session or system.session. Also if we have it explicitly it will be safer if we make changes in the future.
None | ||
} | ||
.orElse( | ||
Option.when(maybeTempVariableName(nameParts)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel the previous if-else
is more readable than Option.when
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
// ) | ||
// } | ||
|
||
if (nameParts.length != 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if this CreateVariable
is inside EXECUTE IMMEDIATE?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, added check we're not in EXECUTE IMMEDIATE if throwing this error.
} | ||
|
||
val resolved = resolveCreateVariableName(nameParts) | ||
c.copy(name = resolved, sessionVariablesOnly = AnalysisContext.get.isExecuteImmediate) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ResolvedIdentifier
needs a catalog and it's system catalog for creating session variables. What catalog should we put for creating local variables? also the system catalog?
private def resolveCreateVariableName(nameParts: Seq[String]): ResolvedIdentifier = { | ||
val ident = SqlScriptingLocalVariableManager.get() | ||
.getOrElse(catalogManager.tempVariableManager) | ||
.createIdentifier(nameParts.last) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An Identifier
is already created, we can directly return ResolvedIdentifier(FakeSystemCatalog, ident)
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, what do you mean already created? Here we create the identifier, which is dependent on scripting context in the case of local variables, and then we return it in ResolvedIdentifier(FakeSystemCatalog, ident)
after checking for errors.
@@ -269,6 +269,94 @@ class SqlScriptingParserSuite extends SparkFunSuite with SQLHelper { | |||
assert(exception.origin.line.contains(3)) | |||
} | |||
|
|||
test("compound: forbidden label - system") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it because compound statement can be nested and an inner label may be qualified as system.session
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or in the case of a local variable named session, which could be a struct, then we could have system.session.x
which could refer to either the session variable x
or the column x
in local variable session
.
throw unresolvedVariableError(namePartsCaseAdjusted, Seq("SYSTEM", "SESSION")) | ||
) | ||
|
||
variableManager.set( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is different from create
with overrideIfExists = true
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's different in SqlScriptingLocalVariableManager
. Especially so because local variables do not support overrideIfExists, this will be enforced when FOR statement is refactored to use proper local variables.
case c @ CreateVariable(UnresolvedIdentifier(nameParts, _), _, _) => | ||
val resolved = resolveVariableName(nameParts) | ||
c.copy(name = resolved) | ||
case c @ CreateVariable(UnresolvedIdentifier(nameParts, _), _, _, _) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What Spark does here is to determine where to create the variable (catalog and namespace), and then turn UnresolvedIdentifier
into qualified ResolvedIdentifier
. I think we don't need an extra sessionVariablesOnly
flag in CreateVariable
, the qualified ResolvedIdentifier
can determine everything.
- If the variable name is already qualified (
session.var
orsystem.session.var
), always fully qualify it tosystem.session.var
or fail if the qualifier is notsystem.session
. This is because users can create session variables explicitly (via qualified names) anywhere. - If the variable name is unqualified: If we are not in script or we are inside EXECUTE IMMEDIATE, qualify it to
system.session.var
. Otherwise, qualify it tolocal.current_scope_label_name.var
We can create a FakeLocalCatalog
following FakeSystemCatalog
. In CreateVariableExec
, if the catalog is FakeLocalCatalog
, the script local variable manager must be present and we create local variables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use the same idea for DropVariable
and SetVariable
import org.apache.spark.sql.catalyst.expressions.Expression | ||
import org.apache.spark.sql.catalyst.plans.logical.{CaseStatement, CompoundBody, CompoundPlanStatement, CreateVariable, DropVariable, ForStatement, IfElseStatement, IterateStatement, LeaveStatement, LogicalPlan, LoopStatement, RepeatStatement, SingleStatement, WhileStatement} | ||
import org.apache.spark.sql.catalyst.trees.Origin | ||
import org.apache.spark.sql.catalyst.plans.logical.{CaseStatement, CompoundBody, CompoundPlanStatement, ForStatement, IfElseStatement, IterateStatement, LeaveStatement, LogicalPlan, LoopStatement, RepeatStatement, SingleStatement, WhileStatement} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you remove unused imports, please. I have tried to build it locally but failed w/:
[ERROR] [Error] /Users/maxim.gekk/proj/review-dusantism-db_scripting-local-variables-2/sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingInterpreter.scala:21: Unused import
What changes were proposed in this pull request?
This pull request introduces support for local variables in SQL scripting.
Behavior:
Local variables are declared in the headers of compound bodies, and are bound to it's scope. Variables of the same name are allowed in nested scopes, where the innermost variable will be resolved. Optionally, a local variable can be qualified with the label of the compound body in which it was declared, which would allow accessing variables which are not the innermost in the current scope.
Local variables have resolution priority over session variables, session variable resolution is attempted after local variable resolution. The exception to this is with fully qualified session variables, in the format
system.session.<varName>
orsession.<varName>
. System and session are forbidden for use as compound body labels.Local variables must not be qualified on declaration, can be set using
SET VAR
and cannot beDROPPED
.They also should not be allowed to be declared with
DECLARE OR REPLACE
, however this is not implemented on this PR asFOR
statement relies on this behavior.FOR
statement must be updated in a separate PR to use proper local variables, as the current implementation is simulating them using session variables.Implementation notes:
As core depends on catalyst, it's impossible to import code from core(where most of SQL scripting implementation is located) to catalyst. To solve this a trait
VariableManager
is introduced, which is then implemented in core and injected to catalyst. ThisVariableManager
is basically a wrapper aroundSqlScriptingExecutionContext
and provides methods for getting/setting/creating variables.This injection is tricky because we want to have one
ScriptingVariableManager
per script.Options considered to achieve this are:
EXECUTE IMMEDIATE
we could simply not pass in the script context and it would behave as if outside of a script. This is the intended behavior forEXECUTE IMMEDIATE
. The problem with this approach is it seems hard to implement. The call stack would be as follows:Analyzer.executeAndCheck
->HybridAnalyzer.apply
->RuleExecutor.executeAndTrack
->Analyzer.execute
(overridden from RuleExecutor) ->Analyzer.withNewAnalysisContext
. Implementing this context propagation would require changing the signatures of all of these methods, including superclass methods likeexecute
andexecuteAndTrack
.CatalogManager
.CatalogManager's
lifetime is tied to the session, so to allow for multiple scripts to execute in the same time we would need to e.g. have a mapscriptUUID -> VariableManager
, and to have thescriptUUID
as aThreadLocal
variable in theCatalogManager
. The drawback of this approach is that the script has to clean up it's resources after execution, and also that it's more complicated to e.g. forbidEXECUTE IMMEDIATE
from accessing local variables.Currently the second option seems better to me, however I am open to suggestions on how to approach this.
Why are the changes needed?
Currently, local variables are simulated using session variables in SQL scripting, which is a temporary solution and bad in many ways.
Does this PR introduce any user-facing change?
Yes, this change introduces multiple new types of errors.
How was this patch tested?
Tests were added to SqlScriptingExecutionSuite and SqlScriptingParserSuite.
Was this patch authored or co-authored using generative AI tooling?
No.