Skip to content

Commit 91bfbba

Browse files
mihailoale-dbdongjoon-hyun
authored andcommitted
[SPARK-53348][SQL][4.0] Always persist ANSI value when creating a view or assume it when querying if not stored
### What changes were proposed in this pull request? I propose that we always store ANSI value when creating a view because otherwise users can be affected by unwanted behavior. For example if user creates a view on version that has ANSI = false by default he expects this not to fail. ``` CREATE VIEW view AS SELECT CAST('abc' AS INT) AS a; SELECT * FROM view; ``` But if user queries the view on the version which has ANSI = true by default, above query is going to fail (because when we don't store the value, and we store it only if explicitly set, we use the default one). Number of this and similar use cases is huge, because ANSI impact area is huge and thus I propose that we always store the value. If the value is not stored, I propose that we use createVersion field to determine whether the ANSI value should be true (Spark 4.0.0 and above) or false (lower than Spark 4.0.0). If the createVersion field wasn't stored during view creation, I propose that we assume that the ANSI = false because number of those views is incomparable larger than the ones expecting ANSI = true ### Why are the changes needed? To improve user experience. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added suite. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52147 from mihailoale-db/ansi40backport. Authored-by: mihailoale-db <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 9c0c843 commit 91bfbba

File tree

8 files changed

+225
-13
lines changed

8 files changed

+225
-13
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,10 @@ object Analyzer {
258258
"spark.sql.expressionTreeChangeLog.level"
259259
)
260260

261-
def retainResolutionConfigsForAnalysis(newConf: SQLConf, existingConf: SQLConf): Unit = {
261+
def retainResolutionConfigsForAnalysis(
262+
newConf: SQLConf,
263+
existingConf: SQLConf,
264+
createSparkVersion: String = ""): Unit = {
262265
val retainedConfigs = existingConf.getAllConfs.filter { case (key, _) =>
263266
// Also apply catalog configs
264267
RETAINED_ANALYSIS_FLAGS.contains(key) || key.startsWith("spark.sql.catalog.")
@@ -267,6 +270,25 @@ object Analyzer {
267270
retainedConfigs.foreach { case (k, v) =>
268271
newConf.settings.put(k, v)
269272
}
273+
274+
trySetAnsiValue(newConf, createSparkVersion)
275+
}
276+
277+
/**
278+
* In case ANSI value wasn't persisted for a view or a UDF, we set it to `true` in case Spark
279+
* version used to create the view is 4.0.0 or higher. We set it to `false` in case Spark version
280+
* is lower than 4.0.0 or if the Spark version wasn't stored (in that case we assume that the
281+
* value is `false`)
282+
*/
283+
def trySetAnsiValue(sqlConf: SQLConf, createSparkVersion: String = ""): Unit = {
284+
if (conf.getConf(SQLConf.ASSUME_ANSI_FALSE_IF_NOT_PERSISTED) &&
285+
!sqlConf.settings.containsKey(SQLConf.ANSI_ENABLED.key)) {
286+
if (createSparkVersion.startsWith("4.")) {
287+
sqlConf.settings.put(SQLConf.ANSI_ENABLED.key, "true")
288+
} else {
289+
sqlConf.settings.put(SQLConf.ANSI_ENABLED.key, "false")
290+
}
291+
}
270292
}
271293
}
272294

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewResolution.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,13 @@ object ViewResolution {
4141
view
4242
)
4343
}
44-
SQLConf.withExistingConf(View.effectiveSQLConf(view.desc.viewSQLConfigs, view.isTempView)) {
44+
SQLConf.withExistingConf(
45+
View.effectiveSQLConf(
46+
configs = view.desc.viewSQLConfigs,
47+
isTempView = view.isTempView,
48+
createSparkVersion = view.desc.createVersion
49+
)
50+
) {
4551
resolveChild(view.child)
4652
}
4753
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -982,7 +982,13 @@ class SessionCatalog(
982982
objectType = Some("VIEW"),
983983
objectName = Some(metadata.qualifiedName)
984984
)
985-
val parsedPlan = SQLConf.withExistingConf(View.effectiveSQLConf(viewConfigs, isTempView)) {
985+
val parsedPlan = SQLConf.withExistingConf(
986+
View.effectiveSQLConf(
987+
configs = viewConfigs,
988+
isTempView = isTempView,
989+
createSparkVersion = metadata.createVersion
990+
)
991+
) {
986992
CurrentOrigin.withOrigin(origin) {
987993
parser.parseQuery(viewText)
988994
}
@@ -1010,7 +1016,11 @@ class SessionCatalog(
10101016
// Note that, the column names may have duplication, e.g. `CREATE VIEW v(x, y) AS
10111017
// SELECT 1 col, 2 col`. We need to make sure that the matching attributes have the same
10121018
// number of duplications, and pick the corresponding attribute by ordinal.
1013-
val viewConf = View.effectiveSQLConf(metadata.viewSQLConfigs, isTempView)
1019+
val viewConf = View.effectiveSQLConf(
1020+
configs = metadata.viewSQLConfigs,
1021+
isTempView = isTempView,
1022+
createSparkVersion = metadata.createVersion
1023+
)
10141024
val normalizeColName: String => String = if (viewConf.caseSensitiveAnalysis) {
10151025
identity
10161026
} else {
@@ -1617,6 +1627,7 @@ class SessionCatalog(
16171627
// Use captured SQL configs when parsing a SQL function.
16181628
val conf = new SQLConf()
16191629
function.getSQLConfigs.foreach { case (k, v) => conf.settings.put(k, v) }
1630+
Analyzer.trySetAnsiValue(conf)
16201631
SQLConf.withExistingConf(conf) {
16211632
val inputParam = function.inputParam
16221633
val returnType = function.getScalarFuncReturnType

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -835,7 +835,10 @@ case class View(
835835
}
836836

837837
object View {
838-
def effectiveSQLConf(configs: Map[String, String], isTempView: Boolean): SQLConf = {
838+
def effectiveSQLConf(
839+
configs: Map[String, String],
840+
isTempView: Boolean,
841+
createSparkVersion: String = ""): SQLConf = {
839842
val activeConf = SQLConf.get
840843
// For temporary view, we always use captured sql configs
841844
if (activeConf.useCurrentSQLConfigsForView && !isTempView) return activeConf
@@ -844,7 +847,12 @@ object View {
844847
for ((k, v) <- configs) {
845848
sqlConf.settings.put(k, v)
846849
}
847-
Analyzer.retainResolutionConfigsForAnalysis(newConf = sqlConf, existingConf = activeConf)
850+
Analyzer.retainResolutionConfigsForAnalysis(
851+
newConf = sqlConf,
852+
existingConf = activeConf,
853+
createSparkVersion = createSparkVersion
854+
)
855+
848856
sqlConf
849857
}
850858
}

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5594,6 +5594,15 @@ object SQLConf {
55945594
.booleanConf
55955595
.createWithDefault(true)
55965596

5597+
val ASSUME_ANSI_FALSE_IF_NOT_PERSISTED =
5598+
buildConf("spark.sql.assumeAnsiFalseIfNotPersisted.enabled")
5599+
.internal()
5600+
.doc("If enabled, assume ANSI mode is false if not persisted during view or UDF " +
5601+
"creation. Otherwise use the default value.")
5602+
.version("4.0.1")
5603+
.booleanConf
5604+
.createWithDefault(true)
5605+
55975606
/**
55985607
* Holds information about keys that have been deprecated.
55995608
*

sql/core/src/main/scala/org/apache/spark/sql/execution/command/CreateUserDefinedFunctionCommand.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.command
1919

2020
import java.util.Locale
2121

22+
import scala.collection.mutable
23+
2224
import org.apache.spark.sql.AnalysisException
2325
import org.apache.spark.sql.catalyst.FunctionIdentifier
2426
import org.apache.spark.sql.catalyst.catalog.{LanguageSQL, RoutineLanguage, UserDefinedFunctionErrors}
@@ -87,10 +89,25 @@ object CreateUserDefinedFunctionCommand {
8789
* [[org.apache.spark.sql.catalyst.expressions.ExpressionInfo]], all SQL configs and other
8890
* function properties (such as the function parameters and the function return type)
8991
* are saved together in a property map.
92+
*
93+
* Here we only capture the SQL configs that are modifiable and should be captured, i.e. not in
94+
* the denyList and in the allowList. Besides mentioned ones we also capture `ANSI_ENABLED`.
95+
*
96+
* We need to always capture them to make sure we apply the same configs when querying the
97+
* function.
9098
*/
9199
def sqlConfigsToProps(conf: SQLConf): Map[String, String] = {
92100
val modifiedConfs = ViewHelper.getModifiedConf(conf)
93-
modifiedConfs.map { case (key, value) => s"$SQL_CONFIG_PREFIX$key" -> value }
101+
102+
val alwaysCaptured = Seq(SQLConf.ANSI_ENABLED)
103+
.filter(c => !modifiedConfs.contains(c.key))
104+
.map(c => (c.key, conf.getConf(c).toString))
105+
106+
val props = new mutable.HashMap[String, String]
107+
for ((key, value) <- modifiedConfs ++ alwaysCaptured) {
108+
props.put(s"$SQL_CONFIG_PREFIX$key", value)
109+
}
110+
props.toMap
94111
}
95112

96113
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -481,16 +481,19 @@ object ViewHelper extends SQLConfHelper with Logging {
481481
}
482482

483483
/**
484-
* Convert the view SQL configs to `properties`.
484+
* Convert the view SQL configs to `properties`. Here we only capture the SQL configs that are
485+
* modifiable and should be captured, i.e. not in the denyList and in the allowList. We also
486+
* capture `SESSION_LOCAL_TIMEZONE` whose default value relies on the JVM system timezone and
487+
* the `ANSI_ENABLED` value.
488+
*
489+
* We need to always capture them to make sure we apply the same configs when querying the view.
485490
*/
486491
private def sqlConfigsToProps(conf: SQLConf): Map[String, String] = {
487492
val modifiedConfs = getModifiedConf(conf)
488-
// Some configs have dynamic default values, such as SESSION_LOCAL_TIMEZONE whose
489-
// default value relies on the JVM system timezone. We need to always capture them to
490-
// to make sure we apply the same configs when reading the view.
491-
val alwaysCaptured = Seq(SQLConf.SESSION_LOCAL_TIMEZONE)
493+
494+
val alwaysCaptured = Seq(SQLConf.SESSION_LOCAL_TIMEZONE, SQLConf.ANSI_ENABLED)
492495
.filter(c => !modifiedConfs.contains(c.key))
493-
.map(c => (c.key, conf.getConf(c)))
496+
.map(c => (c.key, conf.getConf(c).toString))
494497

495498
val props = new mutable.HashMap[String, String]
496499
for ((key, value) <- modifiedConfs ++ alwaysCaptured) {
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql
19+
20+
import org.scalactic.source.Position
21+
import org.scalatest.Tag
22+
23+
import org.apache.spark.SparkConf
24+
import org.apache.spark.sql.catalyst.TableIdentifier
25+
import org.apache.spark.sql.catalyst.analysis.SQLScalarFunction
26+
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, SQLFunction}
27+
import org.apache.spark.sql.catalyst.expressions.Alias
28+
import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project, View}
29+
import org.apache.spark.sql.internal.SQLConf
30+
import org.apache.spark.sql.test.SharedSparkSession
31+
import org.apache.spark.sql.types.StructType
32+
33+
/**
34+
* This suite tests if default ANSI value is persisted for views and functions if not explicitly
35+
* set.
36+
*/
37+
class DefaultANSIValueSuite extends QueryTest with SharedSparkSession {
38+
39+
override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(
40+
implicit pos: Position): Unit = {
41+
if (!sys.env.get("SPARK_ANSI_SQL_MODE").contains("false")) {
42+
super.test(testName, testTags: _*)(testFun)
43+
}
44+
}
45+
46+
protected override def sparkConf: SparkConf = {
47+
super.sparkConf
48+
.set(SQLConf.ASSUME_ANSI_FALSE_IF_NOT_PERSISTED.key, "true")
49+
}
50+
51+
private val testViewName = "test_view"
52+
private val testFunctionName = "test_function"
53+
54+
test("Default ANSI value is stored for views") {
55+
withView(testViewName) {
56+
testView(expectedAnsiValue = true)
57+
}
58+
}
59+
60+
test("Explicitly set ANSI value is respected over default one for views") {
61+
withView(testViewName) {
62+
withSQLConf("spark.sql.ansi.enabled" -> "false") {
63+
testView(expectedAnsiValue = false)
64+
}
65+
}
66+
67+
withView(testViewName) {
68+
withSQLConf("spark.sql.ansi.enabled" -> "true") {
69+
testView(expectedAnsiValue = true)
70+
}
71+
}
72+
}
73+
74+
test("Default ANSI value is stored for functions") {
75+
withUserDefinedFunction(testFunctionName -> false) {
76+
testFunction(expectedAnsiValue = true)
77+
}
78+
}
79+
80+
test("Explicitly set ANSI value is respected over default one for functions") {
81+
withUserDefinedFunction(testFunctionName -> false) {
82+
withSQLConf("spark.sql.ansi.enabled" -> "false") {
83+
testFunction(expectedAnsiValue = false)
84+
}
85+
}
86+
87+
withUserDefinedFunction(testFunctionName -> false) {
88+
withSQLConf("spark.sql.ansi.enabled" -> "true") {
89+
testFunction(expectedAnsiValue = true)
90+
}
91+
}
92+
}
93+
94+
test("ANSI value is set to false if not persisted for views") {
95+
val catalogTable = new CatalogTable(
96+
identifier = TableIdentifier(testViewName),
97+
tableType = CatalogTableType.VIEW,
98+
storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
99+
schema = new StructType(),
100+
properties = Map.empty[String, String]
101+
)
102+
val view = View(desc = catalogTable, isTempView = false, child = OneRowRelation())
103+
104+
val sqlConf = View.effectiveSQLConf(view.desc.viewSQLConfigs, view.isTempView)
105+
106+
assert(sqlConf.settings.get("spark.sql.ansi.enabled") == "false")
107+
}
108+
109+
private def testView(expectedAnsiValue: Boolean): Unit = {
110+
sql(s"CREATE VIEW $testViewName AS SELECT CAST('string' AS BIGINT) AS alias")
111+
112+
val viewMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(testViewName))
113+
114+
assert(
115+
viewMetadata.properties("view.sqlConfig.spark.sql.ansi.enabled") == expectedAnsiValue.toString
116+
)
117+
}
118+
119+
private def testFunction(expectedAnsiValue: Boolean): Unit = {
120+
sql(
121+
s"""
122+
|CREATE OR REPLACE FUNCTION $testFunctionName()
123+
|RETURN SELECT CAST('string' AS BIGINT) AS alias
124+
|""".stripMargin)
125+
126+
val df = sql(s"select $testFunctionName()")
127+
128+
assert(
129+
df.queryExecution.analyzed.asInstanceOf[Project]
130+
.projectList.head.asInstanceOf[Alias]
131+
.child.asInstanceOf[SQLScalarFunction]
132+
.function.asInstanceOf[SQLFunction]
133+
.properties.get("sqlConfig.spark.sql.ansi.enabled").get == expectedAnsiValue.toString
134+
)
135+
}
136+
}

0 commit comments

Comments
 (0)