Skip to content

Commit

Permalink
Add Spark conf to gate the structural schema matching
Browse files Browse the repository at this point in the history
  • Loading branch information
andyl-db committed Jan 24, 2025
1 parent 047c1e6 commit 2f58fd1
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ object ConfUtils {
val NEVER_USE_HTTPS = "spark.delta.sharing.network.never.use.https"
val NEVER_USE_HTTPS_DEFAULT = "false"

val STRUCTURAL_SCHEMA_MATCH_CONF = "spark.delta.sharing.client.useStructuralSchemaMatch"
val STRUCTURAL_SCHEMA_MATCH_DEFAULT = "false"

def getProxyConfig(conf: Configuration): Option[ProxyConfig] = {
val proxyHost = conf.get(PROXY_HOST, null)
val proxyPortAsString = conf.get(PROXY_PORT, null)
Expand Down Expand Up @@ -286,6 +289,9 @@ object ConfUtils {
maxDur
}

def structuralSchemaMatchingEnabled(conf: SQLConf): Boolean =
conf.getConfString(STRUCTURAL_SCHEMA_MATCH_CONF, STRUCTURAL_SCHEMA_MATCH_DEFAULT).toBoolean

private def toTimeInSeconds(timeStr: String, conf: String): Int = {
val timeInSeconds = JavaUtils.timeStringAs(timeStr, TimeUnit.SECONDS)
validateNonNeg(timeInSeconds, conf)
Expand Down
51 changes: 12 additions & 39 deletions client/src/main/scala/io/delta/sharing/spark/RemoteDeltaLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import io.delta.sharing.client.model.{
Table => DeltaSharingTable
}
import io.delta.sharing.client.util.ConfUtils
import io.delta.sharing.spark.util.SchemaUtils
import io.delta.sharing.spark.perf.DeltaSharingLimitPushDown

/**
Expand Down Expand Up @@ -227,52 +228,24 @@ class RemoteSnapshot(
}

private def checkSchemaNotChange(newMetadata: Metadata): Unit = {
val newSchema = DataType.fromJson(newMetadata.schemaString).asInstanceOf[StructType]
val currentSchema = DataType.fromJson(metadata.schemaString).asInstanceOf[StructType]

val newSchemaFields = newSchema.fields.map(field => {
field.name -> field.dataType
}).toMap

val currentSchemaFields = currentSchema.fields.map(field => {
field.name -> field.dataType
}).toMap

val schemaChangedException = new SparkException(
s"""The schema or partition columns of your Delta table has changed since your
|DataFrame was created. Please redefine your DataFrame""".stripMargin)

// Verify partition columns are the same
if (metadata.partitionColumns != newMetadata.partitionColumns) {
throw schemaChangedException
}

val newSchemaFieldNames = newSchemaFields.keySet
val currentSchemaFieldNames = currentSchemaFields.keySet

// Ensure that all the current schema field names are a subset of new schema field names
if (!currentSchemaFieldNames.subsetOf(newSchemaFieldNames)) {
throw schemaChangedException
}
if (ConfUtils.structuralSchemaMatchingEnabled(spark.sessionState.conf)) {
val newSchema = DataType.fromJson(newMetadata.schemaString).asInstanceOf[StructType]
val currentSchema = DataType.fromJson(metadata.schemaString).asInstanceOf[StructType]

// Ensure the shared fields are the structually the same
currentSchemaFieldNames.intersect(newSchemaFieldNames).foreach(fieldName => {
val newSchemaDatatype = newSchemaFields.getOrElse(
fieldName,
throw new SparkException(
s"$fieldName as a field should exist in new metadata"
)
)
val currentSchemaDatatype = currentSchemaFields.getOrElse(
fieldName,
throw new SparkException(
s"$fieldName as a field should exist in current metadata"
)
)
if (!DataType.equalsStructurally(newSchemaDatatype, currentSchemaDatatype)) {
if (
metadata.partitionColumns != newMetadata.partitionColumns ||
!SchemaUtils.isReadCompatible(currentSchema, newSchema)
) {
throw schemaChangedException
}
})
} else if (newMetadata.schemaString != metadata.schemaString ||
newMetadata.partitionColumns != metadata.partitionColumns) {
throw schemaChangedException
}
}

def filesForScan(
Expand Down

0 comments on commit 2f58fd1

Please sign in to comment.