-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-48530][SQL] Support for local variables in SQL Scripting #49445
base: master
Are you sure you want to change the base?
Changes from all commits
73cb01b
813d282
1c08f57
18da02f
cee5f1a
47934ab
399d4e8
6efe764
769607d
6225956
241fc05
068e1ec
60335db
65b69d3
fe5dc7b
4f8d2c1
ba5b8d2
33f0aac
be6052f
4b1e8e1
90b106b
7ba0923
cd4e932
fdf3c5a
52cbd17
c134fd4
3ea762d
8e9352a
78042e3
40ffa83
4a546a4
15d5554
a2b20c5
e3077a4
6ce8f9c
ccab52c
370bf65
0cea838
9895c69
cd888dd
4fe7ab5
8a6b536
db573c1
dadd517
680e5d7
7d3008e
901aa6c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.util | ||
|
||
/** | ||
* Helper trait for defining thread locals with lexical scoping. With this helper, the thread local | ||
* is private and can only be set by the [[Handle]]. The [[Handle]] only exposes the thread local | ||
* value to functions passed into its [[runWith]] method. This pattern allows for the lifetime of | ||
* the thread local value to be strictly controlled. | ||
* | ||
* Rather than calling `tl.set(...)` and `tl.remove()` you would get a handle and execute your code | ||
* in `handle.runWith { ... }`. | ||
* | ||
* Example: | ||
* {{{ | ||
* object Credentials extends LexicalThreadLocal[Int] { | ||
* def create(creds: Map[String, String]) = new Handle(Some(creds)) | ||
* } | ||
* ... | ||
* val handle = Credentials.create(Map("key" -> "value")) | ||
* assert(Credentials.get() == None) | ||
* handle.runWith { | ||
* assert(Credentials.get() == Some(Map("key" -> "value"))) | ||
* } | ||
* }}} | ||
*/ | ||
trait LexicalThreadLocal[T] { | ||
private val tl = new ThreadLocal[T] | ||
|
||
private def set(opt: Option[T]): Unit = { | ||
opt match { | ||
case Some(x) => tl.set(x) | ||
case None => tl.remove() | ||
} | ||
} | ||
|
||
protected def createHandle(opt: Option[T]): Handle = new Handle(opt) | ||
|
||
def get(): Option[T] = Option(tl.get) | ||
|
||
/** Final class representing a handle to a thread local value. */ | ||
final class Handle private[LexicalThreadLocal] (private val opt: Option[T]) { | ||
def runWith[R](f: => R): R = { | ||
val old = get() | ||
set(opt) | ||
try f finally { | ||
set(old) | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.catalyst | ||
|
||
import org.apache.spark.sql.catalyst.catalog.VariableManager | ||
import org.apache.spark.util.LexicalThreadLocal | ||
|
||
object SqlScriptingLocalVariableManager extends LexicalThreadLocal[VariableManager] { | ||
def create(variableManager: VariableManager): Handle = createHandle(Option(variableManager)) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,7 @@ import scala.collection.mutable | |
|
||
import org.apache.spark.internal.Logging | ||
import org.apache.spark.sql.AnalysisException | ||
import org.apache.spark.sql.catalyst.SqlScriptingLocalVariableManager | ||
import org.apache.spark.sql.catalyst.expressions._ | ||
import org.apache.spark.sql.catalyst.expressions.SubExprUtils.wrapOuterReference | ||
import org.apache.spark.sql.catalyst.plans.logical._ | ||
|
@@ -251,6 +252,14 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase { | |
} | ||
} | ||
|
||
/** | ||
* Look up variable by nameParts. | ||
* If in SQL Script, first check local variables, unless in EXECUTE IMMEDIATE | ||
* (EXECUTE IMMEDIATE generated query cannot access local variables). | ||
* if not found fall back to session variables. | ||
* @param nameParts NameParts of the variable. | ||
* @return Reference to the variable. | ||
*/ | ||
def lookupVariable(nameParts: Seq[String]): Option[VariableReference] = { | ||
// The temp variables live in `SYSTEM.SESSION`, and the name can be qualified or not. | ||
def maybeTempVariableName(nameParts: Seq[String]): Boolean = { | ||
|
@@ -266,22 +275,48 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase { | |
} | ||
} | ||
|
||
if (maybeTempVariableName(nameParts)) { | ||
val variableName = if (conf.caseSensitiveAnalysis) { | ||
nameParts.last | ||
} else { | ||
nameParts.last.toLowerCase(Locale.ROOT) | ||
} | ||
catalogManager.tempVariableManager.get(variableName).map { varDef => | ||
val namePartsCaseAdjusted = if (conf.caseSensitiveAnalysis) { | ||
nameParts | ||
} else { | ||
nameParts.map(_.toLowerCase(Locale.ROOT)) | ||
} | ||
|
||
SqlScriptingLocalVariableManager.get() | ||
// If sessionOnly is set to true lookup only session variables. | ||
.filterNot(_ => AnalysisContext.get.isExecuteImmediate) | ||
// If variable name is qualified with system.session.<varName> treat it as a session variable. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it is fine to leave the explicit checks here, because it's more performant this way as local variable lookup will iterate through all frames and scopes. There's no reason to do that if we have session or system.session. Also if we have it explicitly it will be safer if we make changes in the future. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. How about this
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. local variable can only be qualified by one label, right? see https://github.com/apache/spark/pull/49445/files#r1913307900 |
||
.filterNot(_ => | ||
nameParts.length == 3 | ||
&& nameParts.take(2).map(_.toLowerCase(Locale.ROOT)) == Seq("system", "session")) | ||
// If variable name is qualified with session.<varName> treat it as a session variable. | ||
.filterNot(_ => | ||
nameParts.length == 2 | ||
&& nameParts.head.toLowerCase(Locale.ROOT) == "session") | ||
// Local variable must be in format <varName> or <label>.<varName> | ||
.filter(_ => namePartsCaseAdjusted.nonEmpty && namePartsCaseAdjusted.length <= 2) | ||
dusantism-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.flatMap(_.get(namePartsCaseAdjusted)) | ||
.map { varDef => | ||
VariableReference( | ||
nameParts, | ||
FakeSystemCatalog, | ||
Identifier.of(Array(CatalogManager.SESSION_NAMESPACE), variableName), | ||
Identifier.of(Array(varDef.identifier.namespace().last), namePartsCaseAdjusted.last), | ||
varDef) | ||
} | ||
} else { | ||
None | ||
} | ||
.orElse( | ||
if (maybeTempVariableName(nameParts)) { | ||
catalogManager.tempVariableManager | ||
.get(namePartsCaseAdjusted) | ||
.map { varDef => | ||
VariableReference( | ||
nameParts, | ||
FakeSystemCatalog, | ||
Identifier.of(Array(CatalogManager.SESSION_NAMESPACE), namePartsCaseAdjusted.last), | ||
varDef | ||
)} | ||
} else { | ||
None | ||
} | ||
) | ||
} | ||
|
||
// Resolves `UnresolvedAttribute` to its value. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,9 +19,12 @@ package org.apache.spark.sql.catalyst.analysis | |
|
||
import scala.jdk.CollectionConverters._ | ||
|
||
import org.apache.spark.sql.AnalysisException | ||
import org.apache.spark.sql.catalyst.SqlScriptingLocalVariableManager | ||
import org.apache.spark.sql.catalyst.plans.logical._ | ||
import org.apache.spark.sql.catalyst.rules.Rule | ||
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, Identifier, LookupCatalog, SupportsNamespaces} | ||
import org.apache.spark.sql.errors.DataTypeErrors.toSQLId | ||
import org.apache.spark.sql.errors.QueryCompilationErrors | ||
import org.apache.spark.util.ArrayImplicits._ | ||
|
||
|
@@ -34,11 +37,30 @@ class ResolveCatalogs(val catalogManager: CatalogManager) | |
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsDown { | ||
// We only support temp variables for now and the system catalog is not properly implemented | ||
// yet. We need to resolve `UnresolvedIdentifier` for variable commands specially. | ||
case c @ CreateVariable(UnresolvedIdentifier(nameParts, _), _, _) => | ||
val resolved = resolveVariableName(nameParts) | ||
c.copy(name = resolved) | ||
case c @ CreateVariable(UnresolvedIdentifier(nameParts, _), _, _, _) => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What Spark does here is to determine where to create the variable (catalog and namespace), and then turn
We can create a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can use the same idea for |
||
// From scripts we can only create local variables, which must be unqualified, | ||
// and must not be DECLARE OR REPLACE. | ||
if (SqlScriptingLocalVariableManager.get().isDefined && | ||
!AnalysisContext.get.isExecuteImmediate) { | ||
// TODO [SPARK-50785]: Uncomment this when For Statement starts properly using local vars. | ||
// if (c.replace) { | ||
// throw new AnalysisException( | ||
// "INVALID_VARIABLE_DECLARATION.REPLACE_LOCAL_VARIABLE", | ||
// Map("varName" -> toSQLId(nameParts)) | ||
// ) | ||
// } | ||
|
||
if (nameParts.length != 1) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what if this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're right, added check we're not in EXECUTE IMMEDIATE if throwing this error. |
||
throw new AnalysisException( | ||
"INVALID_VARIABLE_DECLARATION.QUALIFIED_LOCAL_VARIABLE", | ||
Map("varName" -> toSQLId(nameParts))) | ||
} | ||
} | ||
|
||
val resolved = resolveCreateVariableName(nameParts) | ||
c.copy(name = resolved, sessionVariablesOnly = AnalysisContext.get.isExecuteImmediate) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
case d @ DropVariable(UnresolvedIdentifier(nameParts, _), _) => | ||
val resolved = resolveVariableName(nameParts) | ||
val resolved = resolveDropVariableName(nameParts) | ||
d.copy(name = resolved) | ||
|
||
case UnresolvedIdentifier(nameParts, allowTemp) => | ||
|
@@ -73,28 +95,40 @@ class ResolveCatalogs(val catalogManager: CatalogManager) | |
} | ||
} | ||
|
||
private def resolveVariableName(nameParts: Seq[String]): ResolvedIdentifier = { | ||
def ident: Identifier = Identifier.of(Array(CatalogManager.SESSION_NAMESPACE), nameParts.last) | ||
if (nameParts.length == 1) { | ||
private def resolveCreateVariableName(nameParts: Seq[String]): ResolvedIdentifier = { | ||
val ident = SqlScriptingLocalVariableManager.get() | ||
.filterNot(_ => AnalysisContext.get.isExecuteImmediate) | ||
.getOrElse(catalogManager.tempVariableManager) | ||
.createIdentifier(nameParts.last) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. An There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, what do you mean already created? Here we create the identifier, which is dependent on scripting context in the case of local variables, and then we return it in |
||
|
||
resolveVariableName(nameParts, ident) | ||
} | ||
|
||
private def resolveDropVariableName(nameParts: Seq[String]): ResolvedIdentifier = { | ||
// Only session variables can be dropped, so catalogManager.scriptingLocalVariableManager | ||
// is not checked in the case of DropVariable. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so even if there is name conflict between session and local variables, DROP VARIABLE always drop session variable? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, DROP will never consider local variables. It only works on session variables, per spec. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cc @srielau can you chime in here? My current understanding is that we don't want to allow drop of local variables - and that makes total sense. However, the current agreement/implementation completely ignores local variables when resolving variable references in the input:
output: 3 input:
result: session var will be dropped (x = 1) Here, we can see that the meaning of Anyways, I think that the second example should throw an exception stating that the local variables cannot be dropped (because Simple example why this might be important - customer may want to drop local var (not aware that it's not allowed, or by mistake) and instead of getting an exception, the session variable would be silently dropped. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 to fail explicitly. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @srielau Could you provide your input here? |
||
val ident = catalogManager.tempVariableManager.createIdentifier(nameParts.last) | ||
resolveVariableName(nameParts, ident) | ||
} | ||
|
||
private def resolveVariableName( | ||
nameParts: Seq[String], | ||
ident: Identifier): ResolvedIdentifier = nameParts.length match { | ||
case 1 => ResolvedIdentifier(FakeSystemCatalog, ident) | ||
|
||
// On declare variable, local variables support only unqualified names. | ||
// On drop variable, local variables are not supported at all. | ||
case 2 if nameParts.head.equalsIgnoreCase(CatalogManager.SESSION_NAMESPACE) => | ||
ResolvedIdentifier(FakeSystemCatalog, ident) | ||
} else if (nameParts.length == 2) { | ||
if (nameParts.head.equalsIgnoreCase(CatalogManager.SESSION_NAMESPACE)) { | ||
ResolvedIdentifier(FakeSystemCatalog, ident) | ||
} else { | ||
throw QueryCompilationErrors.unresolvedVariableError( | ||
nameParts, Seq(CatalogManager.SYSTEM_CATALOG_NAME, CatalogManager.SESSION_NAMESPACE)) | ||
} | ||
} else if (nameParts.length == 3) { | ||
if (nameParts(0).equalsIgnoreCase(CatalogManager.SYSTEM_CATALOG_NAME) && | ||
nameParts(1).equalsIgnoreCase(CatalogManager.SESSION_NAMESPACE)) { | ||
ResolvedIdentifier(FakeSystemCatalog, ident) | ||
} else { | ||
throw QueryCompilationErrors.unresolvedVariableError( | ||
nameParts, Seq(CatalogManager.SYSTEM_CATALOG_NAME, CatalogManager.SESSION_NAMESPACE)) | ||
} | ||
} else { | ||
|
||
// When there are 3 nameParts the variable must be a fully qualified session variable | ||
// i.e. "system.session.<varName>" | ||
case 3 if nameParts(0).equalsIgnoreCase(CatalogManager.SYSTEM_CATALOG_NAME) && | ||
nameParts(1).equalsIgnoreCase(CatalogManager.SESSION_NAMESPACE) => | ||
ResolvedIdentifier(FakeSystemCatalog, ident) | ||
|
||
case _ => | ||
throw QueryCompilationErrors.unresolvedVariableError( | ||
nameParts, Seq(CatalogManager.SYSTEM_CATALOG_NAME, CatalogManager.SESSION_NAMESPACE)) | ||
} | ||
nameParts, Seq(CatalogManager.SYSTEM_CATALOG_NAME, ident.namespace().head)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's check if it makes sense to throw error like this (with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Local variables can only be qualified by a label. Label's themselves are non qualified. |
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quite understand this EXECUTE IMMEDIATE hack. Can you explain it in detail with examples?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EXECUTE IMMEDIATE has 3 parts - SQL string, INTO clause and USING clause. INTO (set variables) and USING (capture variables) should be able to access local variables, however the query generated by the SQL string should not. It should be run as if it's not in a script.
We add
isExecuteImmediate
toAnalysisContext
to know if we are in a plan generated by EXECUTE IMMEDIATE. If we are, we cannot access local variables.USING clause is resolved before the SQL string, at this point
isExecuteImmeidate
is not set and we normally access local variables.INTO clause is not resolved before SQL string, so we have to make an exception for it. Since it internally uses
SetVariable
, we add a flagisExecuteImmediateIntoClause
toSetVariable
, which allows access to local variables if set totrue
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Examples:
Should work:
Should work:
Should not work:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation! Instead of hacking the
AnalysisContext
, how about we move the hack into the scripting itself? e.g. inSingleStatementExec#buildDataFrame
, if the parsed plan isExecuteImmediateQuery
, we doThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh nvm, the INTO clause can access local variables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's why I did it this way, we need a way to make an exception for INTO clause.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AnalysisContext
is a singleton instance and it's very hacky and fragile to update a global bool flag to implement this fix. I think the key problem here is still passing around state, and I have a new idea:In
SubstituteExecuteImmediate
, after we parse the query body, we tag allUnresolvedAttribute
s within the query to indicate that they are inside EXECUTE IMMEDIATE and they should not be resolved to local variables. Something like thisIn the places that match
UnresolvedAttribute
and try to look up variables, we skip local variables if the tag is present.In the new single-pass analyzer, we can have a better way to pass around states: when we top-down traverse the plan tree and see
ExecuteImmediateQuery
, we set a flag in the scope to indicate it's underExecuteImmediateQuery
and keep traversing.cc @vladimirg-db
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tags are fragile as well, because you lose them after
.copy
, but I guess it's better thanAnalysisContext
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes tags are fragile, but it's less an issue with
UnresolvedAttribute
as I can't think of any rule that copiesUnresolvedAttribute
.Anyway, I don't have a better idea now unless we completely move to the new single-pass analyzer in the future.