Skip to content
This repository was archived by the owner on Feb 27, 2025. It is now read-only.

Commit a22c30a

Browse files
committed
remove dfAutoCount() and make columns set
1 parent 8b7aa57 commit a22c30a

File tree

2 files changed

+10
-41
lines changed

2 files changed

+10
-41
lines changed

src/main/scala/com/microsoft/sqlserver/jdbc/spark/SQLServerBulkJdbcOptions.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ class SQLServerBulkJdbcOptions(val params: CaseInsensitiveMap[String])
7474

7575
// user input column names array to match dataframe
7676
val columnsToWrite =
77-
params.getOrElse("columnsToWrite", Array[String]())
77+
params.getOrElse("columnsToWrite", Array[String]()).toSet
7878

7979
// Not a feature
8080
// Only used for internally testing data idempotency

src/main/scala/com/microsoft/sqlserver/jdbc/spark/utils/BulkCopyUtils.scala

Lines changed: 9 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -205,29 +205,6 @@ object BulkCopyUtils extends Logging {
205205
autoCols.toList
206206
}
207207

208-
/**
209-
* dfAutoColCount
210-
* utility function to get number of auto columns in dataframe.
211-
* Use number of auto columns in dataframe to get number of non auto columns in df,
212-
* and compare with the number of non auto columns in sql table
213-
*/
214-
private[spark] def dfAutoColCount(
215-
dfColNames: List[String],
216-
autoCols: List[String],
217-
dfColCaseMap: Map[String, String],
218-
isCaseSensitive: Boolean): Int ={
219-
var dfAutoColCt = 0
220-
for (j <- 0 to autoCols.length-1){
221-
if (isCaseSensitive && dfColNames.contains(autoCols(j)) ||
222-
!isCaseSensitive && dfColCaseMap.contains(autoCols(j).toLowerCase())
223-
&& dfColCaseMap(autoCols(j).toLowerCase()) == autoCols(j)) {
224-
dfAutoColCt += 1
225-
}
226-
}
227-
dfAutoColCt
228-
}
229-
230-
231208
/**
232209
* getColMetadataMap
233210
* Utility function convert result set meta data to array.
@@ -297,7 +274,7 @@ object BulkCopyUtils extends Logging {
297274
* @param url: String,
298275
* @param isCaseSensitive: Boolean
299276
* @param strictSchemaCheck: Boolean
300-
* @param columnsToWrite: Array[String]
277+
* @param columnsToWrite: Set[String]
301278
*/
302279
private[spark] def matchSchemas(
303280
conn: Connection,
@@ -307,7 +284,7 @@ object BulkCopyUtils extends Logging {
307284
url: String,
308285
isCaseSensitive: Boolean,
309286
strictSchemaCheck: Boolean,
310-
columnsToWrite: Array[String]): Array[ColumnMetadata]= {
287+
columnsToWrite: Set[String]): Array[ColumnMetadata]= {
311288
val dfColCaseMap = (df.schema.fieldNames.map(item => item.toLowerCase)
312289
zip df.schema.fieldNames.toList).toMap
313290
val dfCols = df.schema
@@ -317,18 +294,9 @@ object BulkCopyUtils extends Logging {
317294

318295
val prefix = "Spark Dataframe and SQL Server table have differing"
319296

320-
if (autoCols.length == 0) {
321-
assertIfCheckEnabled(dfCols.length == tableCols.length, strictSchemaCheck,
322-
s"${prefix} numbers of columns")
323-
} else if (strictSchemaCheck) {
324-
val dfColNames = df.schema.fieldNames.toList
325-
val dfAutoColCt = dfAutoColCount(dfColNames, autoCols, dfColCaseMap, isCaseSensitive)
326-
// if df has auto column(s), check column length using non auto column in df and table.
327-
// non auto column number in df: dfCols.length - dfAutoColCt
328-
// non auto column number in table: tableCols.length - autoCols.length
329-
assertIfCheckEnabled(dfCols.length-dfAutoColCt == tableCols.length-autoCols.length, strictSchemaCheck,
330-
s"${prefix} numbers of columns")
331-
}
297+
// auto columns should not exist in df
298+
assertIfCheckEnabled(dfCols.length + autoCols.length == tableCols.length, strictSchemaCheck,
299+
s"${prefix} numbers of columns")
332300

333301
if (columnsToWrite.isEmpty()) {
334302
val result = new Array[ColumnMetadata](tableCols.length - autoCols.length)
@@ -341,10 +309,11 @@ object BulkCopyUtils extends Logging {
341309
for (i <- 0 to tableCols.length-1) {
342310
val tableColName = tableCols(i).name
343311
var dfFieldIndex = -1
344-
// set dfFieldIndex = -1 for all auto columns to skip ColumnMetadata
345-
if (!columnsToWrite.isEmpty() && !columnsToWrite.contain(tableColName)) {
346-
logDebug(s"skipping col index $i col name $tableColName, not provided in columnsToWrite list")
312+
if (!columnsToWrite.isEmpty() && !columnsToWrite.contains(tableColName)) {
313+
// if columnsToWrite provided, and column name not in it, skip column mapping and ColumnMetadata
314+
logDebug(s"skipping col index $i col name $tableColName, user not provided in columnsToWrite list")
347315
} else if (autoCols.contains(tableColName)) {
316+
// if auto columns, skip column mapping and ColumnMetadata
348317
logDebug(s"skipping auto generated col index $i col name $tableColName dfFieldIndex $dfFieldIndex")
349318
}else{
350319
var dfColName:String = ""

0 commit comments

Comments
 (0)