Skip to content

Commit

Permalink
Automatic type widening in MERGE (#2764)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description
This change is part of the type widening table feature.
Type widening feature request:
#2622
Type Widening protocol RFC: #2624

It adds automatic type widening as part of schema evolution in MERGE
INTO:
- During resolution of the `DeltaMergeInto` plan, when merging the
target and source schema to compute the schema after evolution, we keep
the wider source type when type widening is enabled on the table.
- When updating the table schema at the beginning of MERGE execution,
metadata is added to the schema to record type changes.

## How was this patch tested?
- A new test suite `DeltaTypeWideningSchemaEvolutionSuite` is added to
cover type evolution in MERGE

## This PR introduces the following *user-facing* changes
The table feature is available in testing only, there are no user-facing
changes as of now.

When automatic schema evolution is enabled in MERGE and the source
schema contains a type that is wider than the target schema:

With type widening disabled: the type in the target schema is not
changed. the ingestion behavior follows the `storeAssignmentPolicy`
configuration:
- LEGACY: source values that overflow the target type are stored as
`null`
- ANSI: a runtime check is injected to fail on source values that
overflow the target type.
- STRICT: the MERGE operation fails during analysis.

With type widening enabled: the type in the target schema is updated to
the wider source type. The MERGE operation always succeeds:
```
-- target: key int, value short
-- source: key int, value int
MERGE INTO target
USING source
ON target.key = source.key
WHEN MATCHED THEN UPDATE SET *
```
After the MERGE operation, the target schema is `key int, value int`.
  • Loading branch information
johanl-db authored Mar 22, 2024
1 parent cb07092 commit 90b98e3
Show file tree
Hide file tree
Showing 11 changed files with 507 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ object ResolveDeltaMergeInto {
// schema before merging it with the target schema. We don't consider NOT MATCHED BY SOURCE
// clauses since these can't by definition reference source columns and thus can't introduce
// new columns in the target schema.
val actions = (matchedClauses ++ notMatchedClauses).flatMap(_.actions)
val actions = (resolvedMatchedClauses ++ resolvedNotMatchedClauses).flatMap(_.actions)
val assignments = actions.collect { case a: DeltaMergeAction => a.targetColNameParts }
val containsStarAction = actions.exists {
case _: UnresolvedStar => true
Expand Down Expand Up @@ -278,14 +278,22 @@ object ResolveDeltaMergeInto {
})

val migrationSchema = filterSchema(source.schema, Seq.empty)
val allowTypeWidening = target.exists {
case DeltaTable(fileIndex) =>
TypeWidening.isEnabled(fileIndex.protocol, fileIndex.metadata)
case _ => false
}

// The implicit conversions flag allows any type to be merged from source to target if Spark
// SQL considers the source type implicitly castable to the target. Normally, mergeSchemas
// enforces Parquet-level write compatibility, which would mean an INT source can't be merged
// into a LONG target.
SchemaMergingUtils.mergeSchemas(
target.schema,
migrationSchema,
allowImplicitConversions = true)
allowImplicitConversions = true,
allowTypeWidening = allowTypeWidening
)
} else {
target.schema
}
Expand Down
27 changes: 27 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,20 @@ object TypeWidening {
isEnabled
}

/**
* Checks that the type widening table property wasn't disabled or enabled between the two given
* states, throws an errors if it was.
*/
def ensureFeatureConsistentlyEnabled(
protocol: Protocol,
metadata: Metadata,
otherProtocol: Protocol,
otherMetadata: Metadata): Unit = {
if (isEnabled(protocol, metadata) != isEnabled(otherProtocol, otherMetadata)) {
throw DeltaErrors.metadataChangedException(None)
}
}

/**
* Returns whether the given type change is eligible for widening. This only checks atomic types.
* It is the responsibility of the caller to recurse into structs, maps and arrays.
Expand All @@ -62,6 +76,19 @@ object TypeWidening {
case _ => false
}

/**
* Returns whether the given type change can be applied during schema evolution. Only a
* subset of supported type changes are considered for schema evolution.
*/
def isTypeChangeSupportedForSchemaEvolution(fromType: AtomicType, toType: AtomicType): Boolean =
(fromType, toType) match {
case (from, to) if from == to => true
case (from, to) if !isTypeChangeSupported(from, to) => false
case (ByteType, ShortType) => true
case (ByteType | ShortType, IntegerType) => true
case _ => false
}

/**
* Filter the given list of files to only keep files that were written before the latest type
* change, if any. These older files contain a column or field with a type that is different than
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ case class MergeIntoCommand(
atAnalysis = target.schema, latestSchema = deltaTxn.metadata.schema)
}

// Check that type widening wasn't enabled/disabled between analysis and the start of the
// transaction.
TypeWidening.ensureFeatureConsistentlyEnabled(
protocol = targetFileIndex.protocol,
metadata = targetFileIndex.metadata,
otherProtocol = deltaTxn.protocol,
otherMetadata = deltaTxn.metadata
)

if (canMergeSchema) {
updateMetadata(
spark, deltaTxn, migratedSchema.getOrElse(target.schema),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,14 @@ trait ImplicitMetadataOperation extends DeltaLogging {
if (rearrangeOnly) {
throw DeltaErrors.unexpectedDataChangeException("Change the Delta table schema")
}
txn.updateMetadata(txn.metadata.copy(schemaString = mergedSchema.json

val schemaWithTypeWideningMetadata = TypeWideningMetadata.addTypeWideningMetadata(
txn,
schema = mergedSchema,
oldSchema = txn.metadata.schema
)

txn.updateMetadata(txn.metadata.copy(schemaString = schemaWithTypeWideningMetadata.json
))
} else if (isNewSchema || isNewPartitioning
) {
Expand Down Expand Up @@ -201,7 +208,8 @@ object ImplicitMetadataOperation {
SchemaMergingUtils.mergeSchemas(
txn.metadata.schema,
dataSchema,
fixedTypeColumns = fixedTypeColumns)
fixedTypeColumns = fixedTypeColumns,
allowTypeWidening = TypeWidening.isEnabled(txn.protocol, txn.metadata))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@ import java.util.Locale

import scala.util.control.NonFatal

import org.apache.spark.sql.delta.DeltaAnalysisException
import org.apache.spark.sql.delta.{DeltaAnalysisException, TypeWidening}

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{Resolver, TypeCoercion, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical.DeltaMergeInto
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.types.{ArrayType, ByteType, DataType, DecimalType, IntegerType, MapType, NullType, ShortType, StructField, StructType}
import org.apache.spark.sql.types._

/**
* Utils to merge table schema with data schema.
Expand Down Expand Up @@ -168,6 +167,7 @@ object SchemaMergingUtils {
dataSchema: StructType,
allowImplicitConversions: Boolean = false,
keepExistingType: Boolean = false,
allowTypeWidening: Boolean = false,
fixedTypeColumns: Set[String] = Set.empty,
caseSensitive: Boolean = false): StructType = {
checkColumnNameDuplication(dataSchema, "in the data to save", caseSensitive)
Expand Down Expand Up @@ -232,6 +232,9 @@ object SchemaMergingUtils {
// Simply keeps the existing type for primitive types
case (current, update) if keepExistingType => current

case (current: AtomicType, update: AtomicType) if allowTypeWidening &&
TypeWidening.isTypeChangeSupportedForSchemaEvolution(current, update) => update

// If implicit conversions are allowed, that means we can use any valid implicit cast to
// perform the merge.
case (current, update)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1432,7 +1432,12 @@ trait DeltaErrorsSuiteBase
val e = intercept[DeltaAnalysisException] {
val s1 = StructType(Seq(StructField("c0", IntegerType, true)))
val s2 = StructType(Seq(StructField("c0", StringType, false)))
SchemaMergingUtils.mergeSchemas(s1, s2, false, false, Set("c0"))
SchemaMergingUtils.mergeSchemas(s1, s2,
allowImplicitConversions = false,
keepExistingType = false,
allowTypeWidening = false,
Set("c0")
)
}
checkErrorMessage(e, Some("DELTA_GENERATED_COLUMNS_DATA_TYPE_MISMATCH"), Some("42K09"),
Some("Column c0 is a generated column or a column used by a generated " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ import org.apache.spark.scheduler.{JobFailed, SparkListener, SparkListenerJobEnd
import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.catalyst.util.{quietly, FailFastMode}
import org.apache.spark.sql.execution.{FileSourceScanExec, QueryExecution, RDDScanExec, SparkPlan, WholeStageCodegenExec}
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.QueryExecutionListener
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -475,6 +476,8 @@ trait DeltaDMLTestUtils
with BeforeAndAfterEach {
self: SharedSparkSession =>

import testImplicits._

protected var tempDir: File = _

protected var deltaLog: DeltaLog = _
Expand Down Expand Up @@ -523,6 +526,23 @@ trait DeltaDMLTestUtils
}
}

/**
* Parse the input JSON data into a dataframe, one row per input element.
* Throws an exception on malformed inputs or records that don't comply with the provided schema.
*/
protected def readFromJSON(data: Seq[String], schema: StructType = null): DataFrame = {
if (schema != null) {
spark.read
.schema(schema)
.option("mode", FailFastMode.name)
.json(data.toDS)
} else {
spark.read
.option("mode", FailFastMode.name)
.json(data.toDS)
}
}

protected def readDeltaTable(path: String): DataFrame = {
spark.read.format("delta").load(path)
}
Expand Down
Loading

0 comments on commit 90b98e3

Please sign in to comment.