Skip to content

[SPARK-52306][SQL] Add support of ALTER TABLE ... UNSET SERDEPROPERTIES #51017

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -5837,6 +5837,11 @@
"ALTER TABLE SET SERDE is not supported for table <tableName> created with the datasource API. Consider using an external Hive table or updating the table properties with compatible options for your table format."
]
},
"ALTER_TABLE_UNSET_SERDE_PROPERTIES_FOR_DATASOURCE_TABLE" : {
"message" : [
"ALTER TABLE UNSET SERDEPROPERTIES is not supported for table <tableName> created with the datasource API. Consider using an external Hive table or updating the table properties with compatible options for your table format."
]
},
"ANALYZE_UNCACHED_TEMP_VIEW" : {
"message" : [
"The ANALYZE TABLE FOR COLUMNS command can operate on temporary views that have been cached already. Consider to cache the view <viewName>."
Expand Down
10 changes: 10 additions & 0 deletions docs/sql-ref-syntax-ddl-alter-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,16 @@ ALTER TABLE table_identifier [ partition_spec ] SET SERDE serde_class_name
[ WITH SERDEPROPERTIES ( key1 = val1, key2 = val2, ... ) ]
```

#### UNSET SERDE PROPERTIES

`ALTER TABLE UNSET` command can also be used to drop the SERDE properties for tables and Hive table partitions.

##### Syntax

```sql
ALTER TABLE table_identifier partition_spec UNSET SERDEPROPERTIES [ IF EXISTS ] ( key1, key2, ... )
```

#### SET LOCATION And SET FILE FORMAT

`ALTER TABLE SET` command can also be used for changing the file location and file format for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ statement
SET SERDE stringLit (WITH SERDEPROPERTIES propertyList)? #setTableSerDe
| ALTER TABLE identifierReference (partitionSpec)?
SET SERDEPROPERTIES propertyList #setTableSerDe
| ALTER TABLE identifierReference (partitionSpec)?
UNSET SERDEPROPERTIES (IF EXISTS)? propertyList #unsetTableSerDeProperties
| ALTER (TABLE | VIEW) identifierReference ADD (IF errorCapturingNot EXISTS)?
partitionSpecLocation+ #addTablePartition
| ALTER TABLE identifierReference
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6006,6 +6006,26 @@ class AstBuilder extends DataTypeAstBuilder
Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))
}

/**
* Create an [[UnsetTableSerDeProperties]]
*
* For example:
* {{{
* ALTER TABLE multi_part_name [PARTITION spec] UNSET SERDEPROPERTIES [IF EXISTS] ('key');
* }}}
*/
override def visitUnsetTableSerDeProperties(
ctx: UnsetTableSerDePropertiesContext): LogicalPlan = withOrigin(ctx) {
val properties = visitPropertyKeys(ctx.propertyList)
val ifExists = ctx.EXISTS != null
UnsetTableSerDeProperties(
createUnresolvedTable(ctx.identifierReference, "ALTER TABLE ... UNSET SERDEPROPERTIES", true),
properties,
ifExists,
// TODO a partition spec is allowed to have optional values. This is currently violated.
Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))
}

/**
* Alter the query of a view. This creates a [[AlterViewAs]]
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1503,6 +1503,18 @@ case class SetTableSerDeProperties(
copy(child = newChild)
}

/**
* The logical plan of the ALTER TABLE ... UNSET SERDEPROPERTIES command.
*/
case class UnsetTableSerDeProperties(
child: LogicalPlan,
propertyKeys: Seq[String],
ifExists: Boolean,
partitionSpec: Option[TablePartitionSpec]) extends UnaryCommand {
override protected def withNewChildInternal(newChild: LogicalPlan): UnsetTableSerDeProperties =
copy(child = newChild)
}

/**
* The logical plan of the CACHE TABLE command.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1638,6 +1638,10 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
notSupportedForV2TablesError("ALTER TABLE ... SET [SERDE|SERDEPROPERTIES]")
}

def alterTableUnsetSerDePropertiesNotSupportedForV2TablesError(): Throwable = {
notSupportedForV2TablesError("ALTER TABLE ... UNSET SERDEPROPERTIES")
}

def describeAsJsonNotSupportedForV2TablesError(): Throwable = {
notSupportedForV2TablesError("DESCRIBE TABLE AS JSON")
}
Expand Down Expand Up @@ -2855,6 +2859,13 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
messageParameters = Map("tableName" -> toSQLId(tableName)))
}

def alterTableUnsetSerdePropertiesNotSupportedError(tableName: String): Throwable = {
new AnalysisException(
errorClass = "UNSUPPORTED_FEATURE.ALTER_TABLE_UNSET_SERDE_PROPERTIES_FOR_DATASOURCE_TABLE",
messageParameters = Map("tableName" -> toSQLId(tableName))
)
}

def cmdOnlyWorksOnPartitionedTablesError(
operation: String,
tableIdentWithDB: String): Throwable = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,13 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
serdeProperties,
partitionSpec)

case UnsetTableSerDeProperties(
ResolvedV1TableIdentifierInSessionCatalog(ident),
propertyKeys,
ifExists,
partitionSpec) =>
AlterTableUnsetSerDePropertiesCommand(ident, propertyKeys, ifExists, partitionSpec)

case SetTableLocation(ResolvedV1TableIdentifier(ident), None, location) =>
AlterTableSetLocationCommand(ident, None, location)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,44 @@ case class AlterTableSerDePropertiesCommand(

}

/**
* A command that unsets the serde properties of a table/partition.
*
* The syntax of this command is:
* {{{
* ALTER TABLE table [PARTITION spec] UNSET SERDEPROPERTIES [IF EXISTS] ('key1', 'key2', ...);
* }}}
*/
case class AlterTableUnsetSerDePropertiesCommand(
tableName: TableIdentifier,
propKeys: Seq[String],
ifExists: Boolean,
partSpec: Option[TablePartitionSpec])
extends LeafRunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableRawMetadata(tableName)
// For datasource tables, disallow unsetting partition serde properties
if (partSpec.isDefined && DDLUtils.isDatasourceTable(table)) {
throw QueryCompilationErrors.alterTableUnsetSerdePropertiesNotSupportedError(
table.qualifiedName)
}
if (partSpec.isEmpty) {
val newProperties = table.storage.properties.filter { case (k, _) => !propKeys.contains(k) }
val newTable = table.withNewStorage(properties = newProperties)
catalog.alterTable(newTable)
} else {
val spec = partSpec.get
val part = catalog.getPartition(table.identifier, spec)
val newProperties = part.storage.properties.filter { case (k, _) => !propKeys.contains(k) }
val newPart = part.copy(storage = part.storage.copy(properties = newProperties))
catalog.alterPartitions(table.identifier, Seq(newPart))
}
Seq.empty[Row]
}
}

/**
* Add Partition in ALTER TABLE: add the table partitions.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
case SetTableSerDeProperties(_: ResolvedTable, _, _, _) =>
throw QueryCompilationErrors.alterTableSerDePropertiesNotSupportedForV2TablesError()

case UnsetTableSerDeProperties(_: ResolvedTable, _, _, _) =>
throw QueryCompilationErrors.alterTableUnsetSerDePropertiesNotSupportedForV2TablesError()

case LoadData(_: ResolvedTable, _, _, _, _) =>
throw QueryCompilationErrors.loadDataNotSupportedForV2TablesError()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.command

import org.apache.spark.SparkThrowable
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedTable}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.UnsetTableSerDeProperties
import org.apache.spark.sql.test.SharedSparkSession

class AlterTableUnsetSerdePropertiesParserSuite extends AnalysisTest with SharedSparkSession {

private def parseException(sqlText: String): SparkThrowable = {
intercept[ParseException](sql(sqlText).collect())
}

// ALTER TABLE table_name [PARTITION spec] UNSET SERDEPROPERTIES [IF EXISTS] ('key1', 'key2');
test("alter table unset serde properties") {
val sql1 = "ALTER TABLE table_name UNSET SERDEPROPERTIES ('key1', 'key2')"
val sql2 = "ALTER TABLE table_name PARTITION (a=1, b='str') UNSET SERDEPROPERTIES ('key')"

comparePlans(
parsePlan(sql1),
UnsetTableSerDeProperties(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... UNSET SERDEPROPERTIES",
suggestAlternative = true),
Seq("key1", "key2"),
ifExists = false,
partitionSpec = None)
)
comparePlans(
parsePlan(sql2),
UnsetTableSerDeProperties(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... UNSET SERDEPROPERTIES",
suggestAlternative = true),
Seq("key"),
ifExists = false,
partitionSpec = Some(Map("a" -> "1", "b" -> "str"))))
}

test("alter table unset serde properties - property values must NOT be set") {
val sql = "ALTER TABLE my_tab UNSET SERDEPROPERTIES('key_without_value', 'key_with_value'='x')"
checkError(
exception = parseException(sql),
condition = "_LEGACY_ERROR_TEMP_0035",
parameters = Map("message" -> "Values should not be specified for key(s): [key_with_value]"),
context = ExpectedContext(
fragment = sql,
start = 0,
stop = 82))
}

test("alter table unset serde properties - partition values must be full") {
val sql = "ALTER TABLE table_name PARTITION (a=1, b) UNSET SERDEPROPERTIES ('key')"
checkError(
exception = parseException(sql),
condition = "INVALID_SQL_SYNTAX.EMPTY_PARTITION_VALUE",
parameters = Map("partKey" -> "`b`"),
context = ExpectedContext(
fragment = "PARTITION (a=1, b)",
start = 23,
stop = 40))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.command

import org.apache.spark.sql.QueryTest

/**
* This base suite contains unified tests for the `ALTER TABLE .. UNSET SERDEPROPERTIES`
* command that check V1 and V2 table catalogs. The tests that cannot run for all supported
* catalogs are located in more specific test suites:
*
* - V2 table catalog tests:
* `org.apache.spark.sql.execution.command.v2.AlterTableUnsetSerdePropertiesSuite`
* - V1 table catalog tests:
* `org.apache.spark.sql.execution.command.v1.AlterTableUnsetSerdePropertiesSuiteBase`
* - V1 In-Memory catalog:
* `org.apache.spark.sql.execution.command.v1.AlterTableUnsetSerdePropertiesSuite`
* - V1 Hive External catalog:
* `org.apache.spark.sql.hive.execution.command.AlterTableUnsetSerdePropertiesSuite`
*/
trait AlterTableUnsetSerdePropertiesSuiteBase extends QueryTest with DDLCommandTestUtils {
override val command = "ALTER TABLE ... UNSET SERDEPROPERTIES"
}
Loading