From c09c36f986a1b97a7b2179e7874568108aa9a7a2 Mon Sep 17 00:00:00 2001 From: Mohit Jaggi Date: Thu, 2 Jul 2015 11:11:13 -0700 Subject: [PATCH 01/17] single header option to write csv --- .../scala/com/databricks/spark/csv/CsvRelation.scala | 12 ++++++++++-- .../scala/com/databricks/spark/csv/package.scala | 4 +++- .../com/databricks/spark/csv/CsvFastSuite.scala | 10 +++++----- 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala index 2c9f30a..726a690 100755 --- a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala +++ b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala @@ -28,7 +28,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation, TableScan} import org.apache.spark.sql.types.{StringType, StructField, StructType} -import com.databricks.spark.csv.util.{ParserLibs, ParseModes, TypeCast} +import com.databricks.spark.csv.util.{ParseModes, ParserLibs, TypeCast} import com.databricks.spark.sql.readers._ case class CsvRelation protected[spark] ( @@ -135,7 +135,15 @@ case class CsvRelation protected[spark] ( schemaFields: Seq[StructField]) = { // If header is set, make sure firstLine is materialized before sending to executors. val filterLine = if (useHeader) firstLine else null - val dataLines = if(useHeader) file.filter(_ != filterLine) else file + val dataLines = if (useHeader) { + file.mapPartitionsWithIndex({ + case (partitionIndex, iter) => if (partitionIndex == 0) iter.drop(1) else iter + }, true) + } + else { + file + } + val rows = dataLines.mapPartitionsWithIndex({ case (split, iter) => { val escapeVal = if(escape == null) '\\' else escape.charValue() diff --git a/src/main/scala/com/databricks/spark/csv/package.scala b/src/main/scala/com/databricks/spark/csv/package.scala index 26a1d10..1ac90a6 100755 --- a/src/main/scala/com/databricks/spark/csv/package.scala +++ b/src/main/scala/com/databricks/spark/csv/package.scala @@ -120,6 +120,8 @@ package object csv { "" // There is no need to generate header in this case } + val headerPerPart = parameters.getOrElse("headerPerPart", "true").toBoolean + val strRDD = dataFrame.rdd.mapPartitionsWithIndex { case (index, iter) => val csvFormatBase = CSVFormat.DEFAULT .withDelimiter(delimiterChar) @@ -133,7 +135,7 @@ package object csv { } new Iterator[String] { - var firstRow: Boolean = generateHeader + var firstRow: Boolean = if(headerPerPart) generateHeader else generateHeader && index == 0 override def hasNext = iter.hasNext || firstRow diff --git a/src/test/scala/com/databricks/spark/csv/CsvFastSuite.scala b/src/test/scala/com/databricks/spark/csv/CsvFastSuite.scala index e2bab82..2d0804f 100644 --- a/src/test/scala/com/databricks/spark/csv/CsvFastSuite.scala +++ b/src/test/scala/com/databricks/spark/csv/CsvFastSuite.scala @@ -240,7 +240,7 @@ class CsvFastSuite extends FunSuite { val copyFilePath = tempEmptyDir + "cars-copy.csv" val cars = TestSQLContext.csvFile(carsFile, parserLib = "univocity") - cars.saveAsCsvFile(copyFilePath, Map("header" -> "true")) + cars.saveAsCsvFile(copyFilePath, Map("header" -> "true", "headerPerPart" -> "false")) val carsCopy = TestSQLContext.csvFile(copyFilePath + "/") @@ -255,7 +255,7 @@ class CsvFastSuite extends FunSuite { val copyFilePath = tempEmptyDir + "cars-copy.csv" val cars = TestSQLContext.csvFile(carsFile, parserLib = "univocity") - cars.saveAsCsvFile(copyFilePath, Map("header" -> "true"), classOf[GzipCodec]) + cars.saveAsCsvFile(copyFilePath, Map("header" -> "true", "headerPerPart" -> "false"), classOf[GzipCodec]) val carsCopy = TestSQLContext.csvFile(copyFilePath + "/") @@ -270,7 +270,7 @@ class CsvFastSuite extends FunSuite { val copyFilePath = tempEmptyDir + "cars-copy.csv" val cars = TestSQLContext.csvFile(carsFile, parserLib = "univocity") - cars.saveAsCsvFile(copyFilePath, Map("header" -> "true", "quote" -> "\"")) + cars.saveAsCsvFile(copyFilePath, Map("header" -> "true", "headerPerPart" -> "false", "quote" -> "\"")) val carsCopy = TestSQLContext.csvFile(copyFilePath + "/", parserLib = "univocity") @@ -285,7 +285,7 @@ class CsvFastSuite extends FunSuite { val copyFilePath = tempEmptyDir + "cars-copy.csv" val cars = TestSQLContext.csvFile(carsFile) - cars.saveAsCsvFile(copyFilePath, Map("header" -> "true", "quote" -> "!")) + cars.saveAsCsvFile(copyFilePath, Map("header" -> "true", "headerPerPart" -> "false", "quote" -> "!")) val carsCopy = TestSQLContext.csvFile(copyFilePath + "/", quote = '!', parserLib = "univocity") @@ -300,7 +300,7 @@ class CsvFastSuite extends FunSuite { val copyFilePath = tempEmptyDir + "escape-copy.csv" val escape = TestSQLContext.csvFile(escapeFile, escape='|', quote='"') - escape.saveAsCsvFile(copyFilePath, Map("header" -> "true", "quote" -> "\"")) + escape.saveAsCsvFile(copyFilePath, Map("header" -> "true", "headerPerPart" -> "false", "quote" -> "\"")) val escapeCopy = TestSQLContext.csvFile(copyFilePath + "/", parserLib = "univocity") From de59f04ea7bff210f38123a73f6bb8f296230674 Mon Sep 17 00:00:00 2001 From: Mohit Jaggi Date: Thu, 2 Jul 2015 20:41:00 -0700 Subject: [PATCH 02/17] print error line contents --- .../scala/com/databricks/spark/csv/CsvRelation.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala index 726a690..f621395 100755 --- a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala +++ b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala @@ -133,8 +133,6 @@ case class CsvRelation protected[spark] ( file: RDD[String], header: Seq[String], schemaFields: Seq[StructField]) = { - // If header is set, make sure firstLine is materialized before sending to executors. - val filterLine = if (useHeader) firstLine else null val dataLines = if (useHeader) { file.mapPartitionsWithIndex({ case (partitionIndex, iter) => if (partitionIndex == 0) iter.drop(1) else iter @@ -150,11 +148,14 @@ case class CsvRelation protected[spark] ( new BulkCsvReader(iter, split, headers = header, fieldSep = delimiter, quote = quote, escape = escapeVal).flatMap { tokens => + + lazy val errorDetail = s"${tokens.mkString(delimiter.toString)}" + if (dropMalformed && schemaFields.length != tokens.size) { - logger.warn(s"Dropping malformed line: $tokens") + logger.warn(s"Dropping malformed line: $errorDetail") None } else if (failFast && schemaFields.length != tokens.size) { - throw new RuntimeException(s"Malformed line in FAILFAST mode: $tokens") + throw new RuntimeException(s"Malformed line in FAILFAST mode: $errorDetail") } else { var index: Int = 0 val rowArray = new Array[Any](schemaFields.length) @@ -170,7 +171,7 @@ case class CsvRelation protected[spark] ( (index until schemaFields.length).foreach(ind => rowArray(ind) = null) Some(Row.fromSeq(rowArray)) case NonFatal(e) if !failFast => - logger.error(s"Exception while parsing line: $tokens. ", e) + logger.error(s"Exception while parsing line: $errorDetail. ", e) None } } From 8079e8b4d310237bc0b9cd0d2d00374dcad6ea1b Mon Sep 17 00:00:00 2001 From: Mohit Jaggi Date: Thu, 2 Jul 2015 20:41:38 -0700 Subject: [PATCH 03/17] incorporate into bigdf submodule --- build.sbt | 58 ++++++++++++++----------------------------------------- 1 file changed, 15 insertions(+), 43 deletions(-) diff --git a/build.sbt b/build.sbt index aa4e300..27467d2 100755 --- a/build.sbt +++ b/build.sbt @@ -1,21 +1,32 @@ name := "spark-csv" -version := "1.1.0" +version := "1.1.0-SNAPSHOT" organization := "com.databricks" -scalaVersion := "2.11.6" +scalaVersion := "2.10.4" parallelExecution in Test := false -crossScalaVersions := Seq("2.10.4", "2.11.6") - libraryDependencies += "org.apache.commons" % "commons-csv" % "1.1" libraryDependencies += "com.univocity" % "univocity-parsers" % "1.5.1" libraryDependencies += "org.slf4j" % "slf4j-api" % "1.7.5" % "provided" +libraryDependencies ++= Seq( + ("org.apache.spark" %% "spark-sql" % "1.4.0"). +// exclude("org.mortbay.jetty", "servlet-api"). + exclude("commons-beanutils", "commons-beanutils-core"). + exclude("commons-collections", "commons-collections"). + exclude("commons-logging", "commons-logging"). + exclude("org.slf4j", "slf4j-api"). + exclude("org.apache.hadoop", "hadoop-yarn-api"). + exclude("org.apache.hadoop", "hadoop-yarn-common"). + exclude("com.esotericsoftware.minlog", "minlog") +) + + resolvers ++= Seq( "Apache Staging" at "https://repository.apache.org/content/repositories/staging/", "Typesafe" at "http://repo.typesafe.com/typesafe/releases", @@ -24,45 +35,6 @@ resolvers ++= Seq( publishMavenStyle := true -spAppendScalaVersion := true - -spIncludeMaven := true - -publishTo := { - val nexus = "https://oss.sonatype.org/" - if (version.value.endsWith("SNAPSHOT")) - Some("snapshots" at nexus + "content/repositories/snapshots") - else - Some("releases" at nexus + "service/local/staging/deploy/maven2") -} - -pomExtra := ( - https://github.com/databricks/spark-csv - - - Apache License, Verision 2.0 - http://www.apache.org/licenses/LICENSE-2.0.html - repo - - - - git@github.com:databricks/spark-csv.git - scm:git:git@github.com:databricks/spark-csv.git - - - - falaki - Hossein Falaki - http://www.falaki.net - - ) - -spName := "databricks/spark-csv" - -sparkVersion := "1.4.0" - -sparkComponents += "sql" - libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.1" % "test" libraryDependencies += "com.novocode" % "junit-interface" % "0.9" % "test" From 213aa35c46c1f773eac372252a5e3aa2b3868ca9 Mon Sep 17 00:00:00 2001 From: Mohit Jaggi Date: Fri, 3 Jul 2015 10:53:39 -0700 Subject: [PATCH 04/17] more parsing options for univocity-based parser --- .../com/databricks/spark/csv/CsvParser.scala | 55 ++++++++---- .../databricks/spark/csv/CsvRelation.scala | 67 ++++++++------ .../databricks/spark/csv/DefaultSource.scala | 14 +-- .../databricks/spark/csv/ParsingOptions.scala | 87 +++++++++++++++++++ .../com/databricks/spark/csv/package.scala | 27 +++--- 5 files changed, 188 insertions(+), 62 deletions(-) create mode 100644 src/main/scala/com/databricks/spark/csv/ParsingOptions.scala diff --git a/src/main/scala/com/databricks/spark/csv/CsvParser.scala b/src/main/scala/com/databricks/spark/csv/CsvParser.scala index 7d71195..160fdc4 100644 --- a/src/main/scala/com/databricks/spark/csv/CsvParser.scala +++ b/src/main/scala/com/databricks/spark/csv/CsvParser.scala @@ -16,9 +16,9 @@ package com.databricks.spark.csv -import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.types.StructType -import com.databricks.spark.csv.util.{ParserLibs, ParseModes} +import org.apache.spark.sql.{DataFrame, SQLContext} +import com.databricks.spark.csv.util.{ParseModes, ParserLibs} /** * A collection of static functions for working with CSV files in Spark SQL @@ -26,28 +26,26 @@ import com.databricks.spark.csv.util.{ParserLibs, ParseModes} class CsvParser { private var useHeader: Boolean = false - private var delimiter: Character = ',' - private var quote: Character = '"' - private var escape: Character = null + private var csvParsingOpts: CSVParsingOpts = CSVParsingOpts() + private var lineParsingOpts: LineParsingOpts = LineParsingOpts() + private var numberParsingOpts: NumberParsingOpts = NumberParsingOpts() + private var stringParsingOpts: StringParsingOpts = StringParsingOpts() private var schema: StructType = null private var parseMode: String = ParseModes.DEFAULT - private var ignoreLeadingWhiteSpace: Boolean = false - private var ignoreTrailingWhiteSpace: Boolean = false private var parserLib: String = ParserLibs.DEFAULT - def withUseHeader(flag: Boolean): CsvParser = { this.useHeader = flag this } def withDelimiter(delimiter: Character): CsvParser = { - this.delimiter = delimiter + this.csvParsingOpts.delimiter = delimiter this } def withQuoteChar(quote: Character): CsvParser = { - this.quote = quote + this.csvParsingOpts.quoteChar = quote this } @@ -62,17 +60,17 @@ class CsvParser { } def withEscape(escapeChar: Character): CsvParser = { - this.escape = escapeChar + this.csvParsingOpts.escapeChar = escapeChar this } def withIgnoreLeadingWhiteSpace(ignore: Boolean): CsvParser = { - this.ignoreLeadingWhiteSpace = ignore + this.csvParsingOpts.ignoreLeadingWhitespace = ignore this } def withIgnoreTrailingWhiteSpace(ignore: Boolean): CsvParser = { - this.ignoreTrailingWhiteSpace = ignore + this.csvParsingOpts.ignoreTrailingWhitespace = ignore this } @@ -81,20 +79,39 @@ class CsvParser { this } + def withCsvParsingOpts(csvParsingOpts: CSVParsingOpts) = { + this.csvParsingOpts = csvParsingOpts + this + } + + def withLineParsingOpts(lineParsingOpts: LineParsingOpts) = { + this.lineParsingOpts = lineParsingOpts + this + } + + def withNumberParsingOpts(numberParsingOpts: NumberParsingOpts) = { + this.numberParsingOpts = numberParsingOpts + this + } + + def withStringParsingOpts(stringParsingOpts: StringParsingOpts) = { + this.stringParsingOpts = stringParsingOpts + this + } + /** Returns a Schema RDD for the given CSV path. */ @throws[RuntimeException] def csvFile(sqlContext: SQLContext, path: String): DataFrame = { val relation: CsvRelation = CsvRelation( path, useHeader, - delimiter, - quote, - escape, + csvParsingOpts, parseMode, parserLib, - ignoreLeadingWhiteSpace, - ignoreTrailingWhiteSpace, - schema)(sqlContext) + schema, + lineParsingOpts, + numberParsingOpts, + stringParsingOpts)(sqlContext) sqlContext.baseRelationToDataFrame(relation) } diff --git a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala index f621395..9185691 100755 --- a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala +++ b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala @@ -34,14 +34,13 @@ import com.databricks.spark.sql.readers._ case class CsvRelation protected[spark] ( location: String, useHeader: Boolean, - delimiter: Char, - quote: Char, - escape: Character, + csvParsingOpts: CSVParsingOpts, parseMode: String, parserLib: String, - ignoreLeadingWhiteSpace: Boolean, - ignoreTrailingWhiteSpace: Boolean, - userSchema: StructType = null)(@transient val sqlContext: SQLContext) + userSchema: StructType = null, + lineExceptionPolicy: LineParsingOpts = LineParsingOpts(), + numberParsingOpts: NumberParsingOpts = NumberParsingOpts(), + stringParsingOpts: StringParsingOpts = StringParsingOpts())(@transient val sqlContext: SQLContext) extends BaseRelation with TableScan with InsertableRelation { private val logger = LoggerFactory.getLogger(CsvRelation.getClass) @@ -51,8 +50,10 @@ case class CsvRelation protected[spark] ( logger.warn(s"$parseMode is not a valid parse mode. Using ${ParseModes.DEFAULT}.") } - if((ignoreLeadingWhiteSpace || ignoreLeadingWhiteSpace) && ParserLibs.isCommonsLib(parserLib)) { - logger.warn(s"Ignore white space options may not work with Commons parserLib option") + if ((csvParsingOpts.ignoreLeadingWhitespace || + csvParsingOpts.ignoreTrailingWhitespace) && + ParserLibs.isCommonsLib(parserLib)) { + logger.warn(s"Ignore white space options only supported with univocity parser option") } private val failFast = ParseModes.isFailFastMode(parseMode) @@ -71,9 +72,9 @@ case class CsvRelation protected[spark] ( univocityParseCSV(baseRDD, fieldNames, schema.fields) } else { val csvFormat = CSVFormat.DEFAULT - .withDelimiter(delimiter) - .withQuote(quote) - .withEscape(escape) + .withDelimiter(csvParsingOpts.delimiter) + .withQuote(csvParsingOpts.quoteChar) + .withEscape(csvParsingOpts.escapeChar) .withSkipHeaderRecord(false) .withHeader(fieldNames: _*) @@ -97,14 +98,16 @@ case class CsvRelation protected[spark] ( userSchema } else { val firstRow = if(ParserLibs.isUnivocityLib(parserLib)) { - val escapeVal = if(escape == null) '\\' else escape.charValue() - new LineCsvReader(fieldSep = delimiter, quote = quote, escape = escapeVal) + val escapeVal = if (csvParsingOpts.escapeChar == null) '\\' else csvParsingOpts.escapeChar.charValue() + new LineCsvReader(fieldSep = csvParsingOpts.delimiter, + quote = csvParsingOpts.quoteChar, + escape = escapeVal) .parseLine(firstLine) } else { val csvFormat = CSVFormat.DEFAULT - .withDelimiter(delimiter) - .withQuote(quote) - .withEscape(escape) + .withDelimiter(csvParsingOpts.delimiter) + .withQuote(csvParsingOpts.quoteChar) + .withEscape(csvParsingOpts.escapeChar) .withSkipHeaderRecord(false) CSVParser.parse(firstLine, csvFormat).getRecords.head.toArray } @@ -144,17 +147,19 @@ case class CsvRelation protected[spark] ( val rows = dataLines.mapPartitionsWithIndex({ case (split, iter) => { - val escapeVal = if(escape == null) '\\' else escape.charValue() + val escapeVal = if(csvParsingOpts.escapeChar == null) '\\' else csvParsingOpts.escapeChar.charValue() new BulkCsvReader(iter, split, - headers = header, fieldSep = delimiter, - quote = quote, escape = escapeVal).flatMap { tokens => + headers = header, fieldSep = csvParsingOpts.delimiter, + quote = csvParsingOpts.quoteChar, escape = escapeVal).flatMap { tokens => - lazy val errorDetail = s"${tokens.mkString(delimiter.toString)}" + lazy val errorDetail = s"${tokens.mkString(csvParsingOpts.delimiter.toString)}" - if (dropMalformed && schemaFields.length != tokens.size) { + if (schemaFields.length != tokens.size && + (dropMalformed || lineExceptionPolicy.badLinePolicy == LineExceptionPolicy.Ignore)) { logger.warn(s"Dropping malformed line: $errorDetail") None - } else if (failFast && schemaFields.length != tokens.size) { + } else if (schemaFields.length != tokens.size && + (failFast || lineExceptionPolicy.badLinePolicy == LineExceptionPolicy.Abort)) { throw new RuntimeException(s"Malformed line in FAILFAST mode: $errorDetail") } else { var index: Int = 0 @@ -162,12 +167,24 @@ case class CsvRelation protected[spark] ( try { index = 0 while (index < schemaFields.length) { - rowArray(index) = TypeCast.castTo(tokens(index), schemaFields(index).dataType) + try { + rowArray(index) = TypeCast.castTo(tokens(index), schemaFields(index).dataType) + } catch { + case _ : NumberFormatException if numberParsingOpts.enable => + if(numberParsingOpts.nanStrings.contains(tokens(index))) + rowArray(index) = numberParsingOpts.nanValue + else if(tokens(index).isEmpty) + rowArray(index) = + TypeCast.castTo(numberParsingOpts.emptyStringReplace, schemaFields(index).dataType) + else + rowArray(index) = null + } index = index + 1 } Some(Row.fromSeq(rowArray)) } catch { - case aiob: ArrayIndexOutOfBoundsException if permissive => + case aiob: ArrayIndexOutOfBoundsException + if permissive || lineExceptionPolicy.badLinePolicy == LineExceptionPolicy.Fill => (index until schemaFields.length).foreach(ind => rowArray(ind) = null) Some(Row.fromSeq(rowArray)) case NonFatal(e) if !failFast => @@ -236,7 +253,7 @@ case class CsvRelation protected[spark] ( + s" to INSERT OVERWRITE a CSV table:\n${e.toString}") } // Write the data. We assume that schema isn't changed, and we won't update it. - data.saveAsCsvFile(location, Map("delimiter" -> delimiter.toString)) + data.saveAsCsvFile(location, Map("delimiter" -> csvParsingOpts.delimiter.toString)) } else { sys.error("CSV tables only support INSERT OVERWRITE for now.") } diff --git a/src/main/scala/com/databricks/spark/csv/DefaultSource.scala b/src/main/scala/com/databricks/spark/csv/DefaultSource.scala index a3361b0..6dfd8c2 100755 --- a/src/main/scala/com/databricks/spark/csv/DefaultSource.scala +++ b/src/main/scala/com/databricks/spark/csv/DefaultSource.scala @@ -16,9 +16,10 @@ package com.databricks.spark.csv import org.apache.hadoop.fs.Path -import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} + import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} import com.databricks.spark.csv.util.{ParserLibs, TypeCast} /** @@ -102,16 +103,17 @@ class DefaultSource throw new Exception("Ignore white space flag can be true or false") } + val csvParsingOpts = CSVParsingOpts(delimiter = delimiter, + quoteChar = quoteChar, + escapeChar = escapeChar, + ignoreLeadingWhitespace = ignoreLeadingWhiteSpaceFlag, + ignoreTrailingWhitespace = ignoreTrailingWhiteSpaceFlag) CsvRelation(path, headerFlag, - delimiter, - quoteChar, - escapeChar, + csvParsingOpts, parseMode, parserLib, - ignoreLeadingWhiteSpaceFlag, - ignoreTrailingWhiteSpaceFlag, schema)(sqlContext) } diff --git a/src/main/scala/com/databricks/spark/csv/ParsingOptions.scala b/src/main/scala/com/databricks/spark/csv/ParsingOptions.scala new file mode 100644 index 0000000..384a647 --- /dev/null +++ b/src/main/scala/com/databricks/spark/csv/ParsingOptions.scala @@ -0,0 +1,87 @@ +// scalastyle:off +/* + * Copyright 2015 Ayasdi Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +// scalastyle:on + +package com.databricks.spark.csv + +import scala.collection.immutable.HashSet + +/** + * Action to take when malformed lines are found in a CSV File + */ +object LineExceptionPolicy { + + sealed trait EnumVal + + /** + * ignore the malformed line and continue + */ + case object Ignore extends EnumVal + + /** + * stop parsing and abort + */ + case object Abort extends EnumVal + + /** + * if fields are missing in a line, fill in the blanks + */ + case object Fill extends EnumVal + +} + +/** + * Options to control parsing of numbers + * @param emptyStringReplace replace empty string with this string + * @param nanStrings these strings are NaNs + * @param nanValue this is the value to use for NaN + * @param enable make this false to stop attempting to parse numbers i.e. treat them as strings + */ +case class NumberParsingOpts(var emptyStringReplace: String = "NaN", + var nanStrings: Set[String] = HashSet("NaN", "NULL", "N/A"), + var nanValue: Double = Double.NaN, + var enable: Boolean = true) + +/** + * Options to control parsing of strings + * @param emptyStringReplace replace empty string with this string + */ +case class StringParsingOpts(var emptyStringReplace: String = "", + var nullStrings: Set[String] = HashSet("NULL", "null", "n/a", "N/A")) + +/** + * options to handle exceptions while parsing a line + * @param badLinePolicy abort, ignore line or fill with nulls when a bad line is encountered + * @param fillValue if line exception policy is to fill in the blanks, use this value to fill + */ +case class LineParsingOpts(var badLinePolicy: LineExceptionPolicy.EnumVal = LineExceptionPolicy.Fill, + var fillValue: String = "") + +/** + * CSV parsing options + * @param quoteChar fields containing delimiters, other special chars are quoted using this character + * e.g. "this is a comma ," + * @param escapeChar if a quote character appears in a field, it is escaped using this + * e.g. "this is a quote \"" + * @param ignoreLeadingWhitespace ignore white space before a field + * @param ignoreTrailingWhitespace ignore white space after a field + */ +case class CSVParsingOpts(var delimiter: Character = ',', + var quoteChar: Character = '"', + var escapeChar: Character = '\\', + var ignoreLeadingWhitespace: Boolean = true, + var ignoreTrailingWhitespace: Boolean = true) diff --git a/src/main/scala/com/databricks/spark/csv/package.scala b/src/main/scala/com/databricks/spark/csv/package.scala index 1ac90a6..66dd9c5 100755 --- a/src/main/scala/com/databricks/spark/csv/package.scala +++ b/src/main/scala/com/databricks/spark/csv/package.scala @@ -35,16 +35,17 @@ package object csv { parserLib: String = "COMMONS", ignoreLeadingWhiteSpace: Boolean = false, ignoreTrailingWhiteSpace: Boolean = false) = { + val csvParsingOpts = CSVParsingOpts(delimiter = delimiter, + quoteChar = quote, + escapeChar = escape, + ignoreLeadingWhitespace = ignoreLeadingWhiteSpace, + ignoreTrailingWhitespace = ignoreTrailingWhiteSpace) val csvRelation = CsvRelation( location = filePath, useHeader = useHeader, - delimiter = delimiter, - quote = quote, - escape = escape, + csvParsingOpts = csvParsingOpts, parseMode = mode, - parserLib = parserLib, - ignoreLeadingWhiteSpace = ignoreLeadingWhiteSpace, - ignoreTrailingWhiteSpace = ignoreTrailingWhiteSpace)(sqlContext) + parserLib = parserLib)(sqlContext) sqlContext.baseRelationToDataFrame(csvRelation) } @@ -53,16 +54,18 @@ package object csv { parserLib: String = "COMMONS", ignoreLeadingWhiteSpace: Boolean = false, ignoreTrailingWhiteSpace: Boolean = false) = { + val csvParsingOpts = CSVParsingOpts(delimiter = '\t', + quoteChar = '"', + escapeChar = '\\', + ignoreLeadingWhitespace = ignoreLeadingWhiteSpace, + ignoreTrailingWhitespace = ignoreTrailingWhiteSpace) + val csvRelation = CsvRelation( location = filePath, useHeader = useHeader, - delimiter = '\t', - quote = '"', - escape = '\\', + csvParsingOpts = csvParsingOpts, parseMode = "PERMISSIVE", - parserLib = parserLib, - ignoreLeadingWhiteSpace = ignoreLeadingWhiteSpace, - ignoreTrailingWhiteSpace = ignoreTrailingWhiteSpace)(sqlContext) + parserLib = parserLib)(sqlContext) sqlContext.baseRelationToDataFrame(csvRelation) } } From faef4bc7ef2465bb8c52276cb4e66fdab5c7f78c Mon Sep 17 00:00:00 2001 From: Mohit Jaggi Date: Tue, 7 Jul 2015 16:46:21 -0700 Subject: [PATCH 05/17] "provided" dependencies --- build.sbt | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/build.sbt b/build.sbt index 27467d2..5686a8e 100755 --- a/build.sbt +++ b/build.sbt @@ -12,20 +12,9 @@ libraryDependencies += "org.apache.commons" % "commons-csv" % "1.1" libraryDependencies += "com.univocity" % "univocity-parsers" % "1.5.1" -libraryDependencies += "org.slf4j" % "slf4j-api" % "1.7.5" % "provided" - -libraryDependencies ++= Seq( - ("org.apache.spark" %% "spark-sql" % "1.4.0"). -// exclude("org.mortbay.jetty", "servlet-api"). - exclude("commons-beanutils", "commons-beanutils-core"). - exclude("commons-collections", "commons-collections"). - exclude("commons-logging", "commons-logging"). - exclude("org.slf4j", "slf4j-api"). - exclude("org.apache.hadoop", "hadoop-yarn-api"). - exclude("org.apache.hadoop", "hadoop-yarn-common"). - exclude("com.esotericsoftware.minlog", "minlog") -) +libraryDependencies += "org.slf4j" % "slf4j-api" % "1.7.5" % Provided +libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.4.0" % Provided resolvers ++= Seq( "Apache Staging" at "https://repository.apache.org/content/repositories/staging/", From e424401702ef18ffab9e9a4676c29d72600dccd8 Mon Sep 17 00:00:00 2001 From: Mohit Jaggi Date: Fri, 17 Jul 2015 20:02:38 -0700 Subject: [PATCH 06/17] unnest array --- .../databricks/spark/csv/CsvRelation.scala | 2 +- .../databricks/spark/csv/ParsingOptions.scala | 3 +- .../com/databricks/spark/csv/package.scala | 45 +++++++++++++++++-- 3 files changed, 45 insertions(+), 5 deletions(-) diff --git a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala index 9185691..5e4165c 100755 --- a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala +++ b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala @@ -64,7 +64,7 @@ case class CsvRelation protected[spark] ( // By making this a lazy val we keep the RDD around, amortizing the cost of locating splits. def buildScan = { - val baseRDD = sqlContext.sparkContext.textFile(location) + val baseRDD = sqlContext.sparkContext.textFile(location, csvParsingOpts.numParts) val fieldNames = schema.fieldNames diff --git a/src/main/scala/com/databricks/spark/csv/ParsingOptions.scala b/src/main/scala/com/databricks/spark/csv/ParsingOptions.scala index 384a647..03afdfd 100644 --- a/src/main/scala/com/databricks/spark/csv/ParsingOptions.scala +++ b/src/main/scala/com/databricks/spark/csv/ParsingOptions.scala @@ -84,4 +84,5 @@ case class CSVParsingOpts(var delimiter: Character = ',', var quoteChar: Character = '"', var escapeChar: Character = '\\', var ignoreLeadingWhitespace: Boolean = true, - var ignoreTrailingWhitespace: Boolean = true) + var ignoreTrailingWhitespace: Boolean = true, + var numParts: Int = 0) diff --git a/src/main/scala/com/databricks/spark/csv/package.scala b/src/main/scala/com/databricks/spark/csv/package.scala index 66dd9c5..3c7af6c 100755 --- a/src/main/scala/com/databricks/spark/csv/package.scala +++ b/src/main/scala/com/databricks/spark/csv/package.scala @@ -15,6 +15,9 @@ */ package com.databricks.spark +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + import org.apache.commons.csv.CSVFormat import org.apache.hadoop.io.compress.CompressionCodec @@ -76,7 +79,9 @@ package object csv { * Saves DataFrame as csv files. By default uses ',' as delimiter, and includes header line. */ def saveAsCsvFile(path: String, parameters: Map[String, String] = Map(), - compressionCodec: Class[_ <: CompressionCodec] = null): Unit = { + compressionCodec: Class[_ <: CompressionCodec] = null, + sparseColInfo: mutable.Map[String, mutable.Map[String, Int]] = null): Unit = { + // TODO(hossein): For nested types, we may want to perform special work val delimiter = parameters.getOrElse("delimiter", ",") val delimiterChar = if (delimiter.length == 1) { @@ -117,8 +122,32 @@ package object csv { } val generateHeader = parameters.getOrElse("header", "false").toBoolean + + val isSparse: Array[Boolean] = dataFrame.columns.flatMap { colName: String => + if (sparseColInfo.contains(colName)) { + Array.fill(sparseColInfo(colName).size)(true) + } else { + Array(false) + } + } + + def makeHeader : String = { + val hs = dataFrame.columns.flatMap { colName: String => + if (sparseColInfo.contains(colName)) { + require(sparseColInfo.contains(colName)) + sparseColInfo(colName).toSeq.sortBy(_._2).map(_._1) + } else { + Seq(colName) + } + } + csvFormat.format(hs : _*) + } + val header = if (generateHeader) { - csvFormat.format(dataFrame.columns.map(_.asInstanceOf[AnyRef]):_*) + if (sparseColInfo == null) + csvFormat.format(dataFrame.columns.map(_.asInstanceOf[AnyRef]): _*) + else + makeHeader } else { "" // There is no need to generate header in this case } @@ -144,7 +173,17 @@ package object csv { override def next: String = { if(!iter.isEmpty) { - val row = csvFormat.format(iter.next.toSeq.map(_.asInstanceOf[AnyRef]):_*) + def makeCsvRow(inFields: Seq[Any]) : String = { + val fields = inFields.flatMap { f => + if(isSparse(inFields.indexOf(f))) { + f.asInstanceOf[ArrayBuffer[Any]] + } else { + ArrayBuffer(f) + } + } + csvFormat.format(fields.map(_.asInstanceOf[AnyRef]): _*) + } + val row = makeCsvRow(iter.next.toSeq) if (firstRow) { firstRow = false header + csvFormat.getRecordSeparator() + row From c9a6592172ce476a928f7571a2d0b4b2abe5b128 Mon Sep 17 00:00:00 2001 From: Mohit Jaggi Date: Fri, 17 Jul 2015 20:22:30 -0700 Subject: [PATCH 07/17] fix npe when no array/sparse column exists --- src/main/scala/com/databricks/spark/csv/package.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/scala/com/databricks/spark/csv/package.scala b/src/main/scala/com/databricks/spark/csv/package.scala index 3c7af6c..a9b387b 100755 --- a/src/main/scala/com/databricks/spark/csv/package.scala +++ b/src/main/scala/com/databricks/spark/csv/package.scala @@ -123,8 +123,8 @@ package object csv { val generateHeader = parameters.getOrElse("header", "false").toBoolean - val isSparse: Array[Boolean] = dataFrame.columns.flatMap { colName: String => - if (sparseColInfo.contains(colName)) { + val isSparse: Array[Boolean] = dataFrame.columns.flatMap { colName: String => + if (sparseColInfo != null && sparseColInfo.contains(colName)) { Array.fill(sparseColInfo(colName).size)(true) } else { Array(false) From d553da5213b492dd7266d57ecdadf45604866976 Mon Sep 17 00:00:00 2001 From: Mohit Jaggi Date: Tue, 28 Jul 2015 23:00:39 -0700 Subject: [PATCH 08/17] parse exception handling, unit tests --- .../com/databricks/spark/csv/CsvParser.scala | 16 +- .../databricks/spark/csv/CsvRelation.scala | 42 +++- .../databricks/spark/csv/ParsingOptions.scala | 36 ++- .../com/databricks/spark/csv/package.scala | 4 +- src/test/resources/cars-alternative.csv | 6 +- src/test/resources/numbers.csv | 11 + .../databricks/spark/csv/CsvFastSuite.scala | 212 +++++++++++++++--- .../com/databricks/spark/csv/CsvSuite.scala | 2 +- .../spark/csv/util/TypeCastSuite.scala | 15 +- 9 files changed, 283 insertions(+), 61 deletions(-) create mode 100644 src/test/resources/numbers.csv diff --git a/src/main/scala/com/databricks/spark/csv/CsvParser.scala b/src/main/scala/com/databricks/spark/csv/CsvParser.scala index 160fdc4..25d7950 100644 --- a/src/main/scala/com/databricks/spark/csv/CsvParser.scala +++ b/src/main/scala/com/databricks/spark/csv/CsvParser.scala @@ -28,7 +28,8 @@ class CsvParser { private var useHeader: Boolean = false private var csvParsingOpts: CSVParsingOpts = CSVParsingOpts() private var lineParsingOpts: LineParsingOpts = LineParsingOpts() - private var numberParsingOpts: NumberParsingOpts = NumberParsingOpts() + private var realNumberParsingOpts: RealNumberParsingOpts = RealNumberParsingOpts() + private var intNumberParsingOpts: IntNumberParsingOpts = IntNumberParsingOpts() private var stringParsingOpts: StringParsingOpts = StringParsingOpts() private var schema: StructType = null private var parseMode: String = ParseModes.DEFAULT @@ -89,11 +90,17 @@ class CsvParser { this } - def withNumberParsingOpts(numberParsingOpts: NumberParsingOpts) = { - this.numberParsingOpts = numberParsingOpts + def withRealNumberParsingOpts(numberParsingOpts: RealNumberParsingOpts) = { + this.realNumberParsingOpts = numberParsingOpts this } + def withIntNumberParsingOpts(numberParsingOpts: IntNumberParsingOpts) = { + this.intNumberParsingOpts = numberParsingOpts + this + } + + def withStringParsingOpts(stringParsingOpts: StringParsingOpts) = { this.stringParsingOpts = stringParsingOpts this @@ -110,7 +117,8 @@ class CsvParser { parserLib, schema, lineParsingOpts, - numberParsingOpts, + realNumberParsingOpts, + intNumberParsingOpts, stringParsingOpts)(sqlContext) sqlContext.baseRelationToDataFrame(relation) } diff --git a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala index 5e4165c..eb8a731 100755 --- a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala +++ b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala @@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation, TableScan} -import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql.types._ import com.databricks.spark.csv.util.{ParseModes, ParserLibs, TypeCast} import com.databricks.spark.sql.readers._ @@ -39,7 +39,8 @@ case class CsvRelation protected[spark] ( parserLib: String, userSchema: StructType = null, lineExceptionPolicy: LineParsingOpts = LineParsingOpts(), - numberParsingOpts: NumberParsingOpts = NumberParsingOpts(), + realNumOpts: RealNumberParsingOpts = RealNumberParsingOpts(), + intNumOpts: IntNumberParsingOpts = IntNumberParsingOpts(), stringParsingOpts: StringParsingOpts = StringParsingOpts())(@transient val sqlContext: SQLContext) extends BaseRelation with TableScan with InsertableRelation { @@ -138,7 +139,7 @@ case class CsvRelation protected[spark] ( schemaFields: Seq[StructField]) = { val dataLines = if (useHeader) { file.mapPartitionsWithIndex({ - case (partitionIndex, iter) => if (partitionIndex == 0) iter.drop(1) else iter + case (partitionIndex, iter) => if (partitionIndex == 0) iter.drop(1) else iter //FIXME: empty partition? }, true) } else { @@ -150,7 +151,9 @@ case class CsvRelation protected[spark] ( val escapeVal = if(csvParsingOpts.escapeChar == null) '\\' else csvParsingOpts.escapeChar.charValue() new BulkCsvReader(iter, split, headers = header, fieldSep = csvParsingOpts.delimiter, - quote = csvParsingOpts.quoteChar, escape = escapeVal).flatMap { tokens => + quote = csvParsingOpts.quoteChar, escape = escapeVal, + ignoreLeadingSpace = csvParsingOpts.ignoreLeadingWhitespace, + ignoreTrailingSpace = csvParsingOpts.ignoreTrailingWhitespace).flatMap { tokens => lazy val errorDetail = s"${tokens.mkString(csvParsingOpts.delimiter.toString)}" @@ -170,14 +173,27 @@ case class CsvRelation protected[spark] ( try { rowArray(index) = TypeCast.castTo(tokens(index), schemaFields(index).dataType) } catch { - case _ : NumberFormatException if numberParsingOpts.enable => - if(numberParsingOpts.nanStrings.contains(tokens(index))) - rowArray(index) = numberParsingOpts.nanValue - else if(tokens(index).isEmpty) - rowArray(index) = - TypeCast.castTo(numberParsingOpts.emptyStringReplace, schemaFields(index).dataType) + case e : NumberFormatException if realNumOpts.enable && + (schemaFields(index).dataType == DoubleType || schemaFields(index).dataType == FloatType) => + + rowArray(index) = if (realNumOpts.nullStrings.contains(tokens(index))) + null + else if (realNumOpts.nanStrings.contains(tokens(index))) + TypeCast.castTo("NaN", schemaFields(index).dataType) + else if (realNumOpts.infPosStrings.contains(tokens(index))) + TypeCast.castTo("Infinity", schemaFields(index).dataType) + else if (realNumOpts.infNegStrings.contains(tokens(index))) + TypeCast.castTo("-Infinity", schemaFields(index).dataType) else - rowArray(index) = null + throw new IllegalStateException(s"Failed to parse as double/float number ${tokens(index)}") + + case _ : NumberFormatException if intNumOpts.enable && + (schemaFields(index).dataType == IntegerType || schemaFields(index).dataType == LongType) => + + rowArray(index) = if (intNumOpts.nullStrings.contains(tokens(index))) + null + else + throw new IllegalStateException(s"Failed to parse as int/long number ${tokens(index)}") } index = index + 1 } @@ -185,7 +201,9 @@ case class CsvRelation protected[spark] ( } catch { case aiob: ArrayIndexOutOfBoundsException if permissive || lineExceptionPolicy.badLinePolicy == LineExceptionPolicy.Fill => - (index until schemaFields.length).foreach(ind => rowArray(ind) = null) + (index until schemaFields.length).foreach { ind => + rowArray(ind) = TypeCast.castTo(lineExceptionPolicy.fillValue, schemaFields(index).dataType) + } Some(Row.fromSeq(rowArray)) case NonFatal(e) if !failFast => logger.error(s"Exception while parsing line: $errorDetail. ", e) diff --git a/src/main/scala/com/databricks/spark/csv/ParsingOptions.scala b/src/main/scala/com/databricks/spark/csv/ParsingOptions.scala index 03afdfd..7577f8a 100644 --- a/src/main/scala/com/databricks/spark/csv/ParsingOptions.scala +++ b/src/main/scala/com/databricks/spark/csv/ParsingOptions.scala @@ -44,24 +44,37 @@ object LineExceptionPolicy { } +object ParsingOptions { + val defaultNullStrings = HashSet("", "NULL", "N/A", "null", "n/a") + val defaultNaNStrings = HashSet("NaN", "nan") + val defaultInfPosString = HashSet("+Inf", "Inf", "Infinity", "+Infinity", "inf", "+inf") + val defaultInfNegString = HashSet("-Inf", "-inf", "-Infinity") +} + /** - * Options to control parsing of numbers - * @param emptyStringReplace replace empty string with this string + * Options to control parsing of real numbers e.g. the types Float and Double * @param nanStrings these strings are NaNs - * @param nanValue this is the value to use for NaN * @param enable make this false to stop attempting to parse numbers i.e. treat them as strings */ -case class NumberParsingOpts(var emptyStringReplace: String = "NaN", - var nanStrings: Set[String] = HashSet("NaN", "NULL", "N/A"), - var nanValue: Double = Double.NaN, - var enable: Boolean = true) +case class RealNumberParsingOpts(var nanStrings: Set[String] = ParsingOptions.defaultNaNStrings, + var infPosStrings: Set[String] = ParsingOptions.defaultInfPosString, + var infNegStrings: Set[String] = ParsingOptions.defaultInfNegString, + var nullStrings: Set[String] = ParsingOptions.defaultNullStrings, + var enable: Boolean = true) /** - * Options to control parsing of strings - * @param emptyStringReplace replace empty string with this string - */ + * Options to control parsing of integral numbers e.g. the types Int and Long + * @param enable make this false to stop attempting to parse numbers i.e. treat them as strings + */ +case class IntNumberParsingOpts(var nullStrings: Set[String] = ParsingOptions.defaultNullStrings, + var enable: Boolean = true) + +/** + * Options to control parsing of strings + * @param emptyStringReplace replace empty string with this string + */ case class StringParsingOpts(var emptyStringReplace: String = "", - var nullStrings: Set[String] = HashSet("NULL", "null", "n/a", "N/A")) + var nullStrings: Set[String] = ParsingOptions.defaultNullStrings) /** * options to handle exceptions while parsing a line @@ -79,6 +92,7 @@ case class LineParsingOpts(var badLinePolicy: LineExceptionPolicy.EnumVal = Line * e.g. "this is a quote \"" * @param ignoreLeadingWhitespace ignore white space before a field * @param ignoreTrailingWhitespace ignore white space after a field + * @param numParts number of partitions to use in sc.textFile() */ case class CSVParsingOpts(var delimiter: Character = ',', var quoteChar: Character = '"', diff --git a/src/main/scala/com/databricks/spark/csv/package.scala b/src/main/scala/com/databricks/spark/csv/package.scala index a9b387b..b9dfa74 100755 --- a/src/main/scala/com/databricks/spark/csv/package.scala +++ b/src/main/scala/com/databricks/spark/csv/package.scala @@ -36,8 +36,8 @@ package object csv { escape: Character = null, mode: String = "PERMISSIVE", parserLib: String = "COMMONS", - ignoreLeadingWhiteSpace: Boolean = false, - ignoreTrailingWhiteSpace: Boolean = false) = { + ignoreLeadingWhiteSpace: Boolean = true, + ignoreTrailingWhiteSpace: Boolean = true) = { val csvParsingOpts = CSVParsingOpts(delimiter = delimiter, quoteChar = quote, escapeChar = escape, diff --git a/src/test/resources/cars-alternative.csv b/src/test/resources/cars-alternative.csv index 2c1285a..baff5bc 100644 --- a/src/test/resources/cars-alternative.csv +++ b/src/test/resources/cars-alternative.csv @@ -1,5 +1,5 @@ year|make|model|comment -'2012'|'Tesla'|'S'| 'No comment' + '2012' |'Tesla'|'S'| 'No comment' -1997|Ford|E350|'Go get one now they are going fast' -2015|Chevy|Volt + 1997|Ford|E350|'Go get one now they are going fast' +2015 |Chevy|Volt diff --git a/src/test/resources/numbers.csv b/src/test/resources/numbers.csv new file mode 100644 index 0000000..072247d --- /dev/null +++ b/src/test/resources/numbers.csv @@ -0,0 +1,11 @@ +double, float, int, long +1.0, 1.0, 1, 1 + , , , +NaN, NaN, 3, 3 +NULL, null, N/A, n/a +Inf, Inf, 5, 5 +-Inff, -Inff, 6, 6 +Infinity, Infinity, 7, 7 + + + diff --git a/src/test/scala/com/databricks/spark/csv/CsvFastSuite.scala b/src/test/scala/com/databricks/spark/csv/CsvFastSuite.scala index 2d0804f..2bdf14d 100644 --- a/src/test/scala/com/databricks/spark/csv/CsvFastSuite.scala +++ b/src/test/scala/com/databricks/spark/csv/CsvFastSuite.scala @@ -18,13 +18,15 @@ package com.databricks.spark.csv import java.io.File import org.apache.hadoop.io.compress.GzipCodec -import org.apache.spark.sql.test._ +import org.scalatest.FunSuite + import org.apache.spark.SparkException +import org.apache.spark.sql.test._ import org.apache.spark.sql.types._ -import org.scalatest.FunSuite /* Implicits */ -import TestSQLContext._ + +import org.apache.spark.sql.test.TestSQLContext._ class CsvFastSuite extends FunSuite { val carsFile = "src/test/resources/cars.csv" @@ -32,6 +34,7 @@ class CsvFastSuite extends FunSuite { val carsAltFile = "src/test/resources/cars-alternative.csv" val emptyFile = "src/test/resources/empty.csv" val escapeFile = "src/test/resources/escape.csv" + val numbersFile = "src/test/resources/numbers.csv" val tempEmptyDir = "target/test/empty2/" val numCars = 3 @@ -72,7 +75,7 @@ class CsvFastSuite extends FunSuite { s""" |CREATE TEMPORARY TABLE carsTable |(yearMade double, makeName string, modelName string, priceTag decimal, - | comments string, grp string) + |comments string, grp string) |USING com.databricks.spark.csv |OPTIONS (path "$carsTsvFile", header "true", delimiter "\t", parserLib "univocity") """.stripMargin.replaceAll("\n", " ")) @@ -81,6 +84,162 @@ class CsvFastSuite extends FunSuite { assert(sql("SELECT makeName FROM carsTable where priceTag > 60000").collect().size === 1) } + test("DSL test for Line exception policy") { + val results = new CsvParser() + .withUseHeader(true) + .withLineParsingOpts(LineParsingOpts(badLinePolicy = LineExceptionPolicy.Fill, fillValue = "fill")) + .withParserLib("univocity") + .csvFile(TestSQLContext, carsFile) + .collect() + val volt = results.filter(row => row.get(2).asInstanceOf[String] == "Volt").head + assert(volt.get(3).asInstanceOf[String] === "fill") + assert(volt.get(4).asInstanceOf[String] === "fill") + } + + test("DSL test for CSV Parsing Opts: delimiter") { + val results = new CsvParser() + .withUseHeader(true) + .withCsvParsingOpts(CSVParsingOpts(delimiter = '\t')) + .withParserLib("univocity") + .csvFile(TestSQLContext, carsTsvFile) + .select("year") + .collect() + + assert(results.size === numCars) + } + + test("DSL test for CSV Parsing Opts: numParts") { + val rdd = new CsvParser() + .withUseHeader(true) + .withCsvParsingOpts(CSVParsingOpts(numParts = 1, delimiter = '\t')) + .withParserLib("univocity") + .csvFile(TestSQLContext, carsTsvFile) + .select("year") + .rdd + + assert(rdd.partitions.size === 1) + } + + test("DSL test for CSV Parsing Opts: quote") { + val results = new CsvParser() + .withUseHeader(true) + .withCsvParsingOpts(CSVParsingOpts(quoteChar = '\'', delimiter = '|', numParts = 1)) + .withParserLib("univocity") + .csvFile(TestSQLContext, carsAltFile) + .select("year") + .collect() + + assert(results.size === numCars) + + val years = results.map(_.get(0).asInstanceOf[String]) + assert(years === Array("2012", "1997", "2015")) + } + + test("DSL test for CSV Parsing Opts: whitespace") { + var results = new CsvParser() + .withUseHeader(true) + .withCsvParsingOpts(CSVParsingOpts(delimiter = '|', numParts = 1, ignoreLeadingWhitespace = false)) + .withParserLib("univocity") + .csvFile(TestSQLContext, carsAltFile) + .select("year") + .collect() + + assert(results.size === numCars) + + var years = results.map(_.get(0).asInstanceOf[String]) + assert(years === Array(" \'2012\'", " 1997", "2015")) + + results = new CsvParser() + .withUseHeader(true) + .withCsvParsingOpts(CSVParsingOpts(delimiter = '|', numParts = 1, ignoreLeadingWhitespace = false)) + .withParserLib("univocity") + .csvFile(TestSQLContext, carsAltFile) + .select("year") + .collect() + + assert(results.size === numCars) + + years = results.map(_.get(0).asInstanceOf[String]) + assert(years === Array(" \'2012\'", " 1997", "2015")) + + results = new CsvParser() + .withUseHeader(true) + .withCsvParsingOpts(CSVParsingOpts(delimiter = '|', numParts = 1, ignoreTrailingWhitespace = false)) + .withParserLib("univocity") + .csvFile(TestSQLContext, carsAltFile) + .select("year") + .collect() + + assert(results.size === numCars) + + years = results.map(_.get(0).asInstanceOf[String]) + assert(years === Array("\'2012\' ", "1997", "2015 ")) + } + + test("DSL test for CSV Parsing Opts: special") { + var results = new CsvParser() + .withUseHeader(true) + .withSchema(StructType(Seq(StructField("double", DoubleType), + StructField("float", FloatType), + StructField("int", IntegerType), + StructField("long", LongType)))) + .withCsvParsingOpts(CSVParsingOpts(numParts = 1)) + .withParserLib("univocity") + .csvFile(TestSQLContext, numbersFile) + .collect() + + assert(results.size === 6) + + var doubles = results.map(_.get(0)) + assert(doubles.count(_.asInstanceOf[Double].isNaN) === 1) + assert(doubles.count(_.asInstanceOf[Double].isInfinite) === 2) + assert(doubles.count(_.asInstanceOf[Double] == 0.0) === 2) + + var floats = results.map(_.get(1)) + assert(floats.count(_.asInstanceOf[Float].isNaN) === 1) + assert(floats.count(_.asInstanceOf[Float].isInfinite) === 2) + assert(floats.count(_.asInstanceOf[Float] == 0.0) === 2) + + var ints = results.map(_.get(2)) + assert(ints.count(_.asInstanceOf[Int] == 0) === 2) + + var longs = results.map(_.get(3)) + assert(longs.count(_.asInstanceOf[Long] == 0) === 2) + + results = new CsvParser() + .withUseHeader(true) + .withSchema(StructType(Seq(StructField("double", DoubleType), + StructField("float", FloatType), + StructField("int", IntegerType), + StructField("long", LongType)))) + .withCsvParsingOpts(CSVParsingOpts(numParts = 1)) + .withRealNumberParsingOpts(RealNumberParsingOpts( + infNegStrings = ParsingOptions.defaultInfNegString + "-Inff")) + .withParserLib("univocity") + .csvFile(TestSQLContext, numbersFile) + .collect() + + assert(results.size === 7) + + doubles = results.map(_.get(0)) + assert(doubles.count(_.asInstanceOf[Double].isNaN) === 1) + assert(doubles.count(_.asInstanceOf[Double].isInfinite) === 3) + assert(doubles.count(_.asInstanceOf[Double] == 0.0) === 2) + + floats = results.map(_.get(1)) + assert(floats.count(_.asInstanceOf[Float].isNaN) === 1) + assert(floats.count(_.asInstanceOf[Float].isInfinite) === 3) + assert(floats.count(_.asInstanceOf[Float] == 0.0) === 2) + + ints = results.map(_.get(2)) + assert(ints.count(_.asInstanceOf[Int] == 0) === 2) + + longs = results.map(_.get(3)) + assert(longs.count(_.asInstanceOf[Long] == 0) === 2) + + } + + test("DSL test for DROPMALFORMED parsing mode") { val results = new CsvParser() .withParseMode("DROPMALFORMED") @@ -99,7 +258,7 @@ class CsvFastSuite extends FunSuite { .withUseHeader(true) .withParserLib("univocity") - val exception = intercept[SparkException]{ + val exception = intercept[SparkException] { parser.csvFile(TestSQLContext, carsFile) .select("year") .collect() @@ -108,7 +267,6 @@ class CsvFastSuite extends FunSuite { assert(exception.getMessage.contains("Malformed line in FAILFAST mode")) } - test("DSL test with alternative delimiter and quote") { val results = new CsvParser() .withDelimiter('|') @@ -132,7 +290,7 @@ class CsvFastSuite extends FunSuite { } test("Expect parsing error with wrong delimiter settting using sparkContext.csvFile") { - intercept[ org.apache.spark.sql.AnalysisException] { + intercept[org.apache.spark.sql.AnalysisException] { TestSQLContext.csvFile(carsAltFile, useHeader = true, delimiter = ',', quote = '\'', parserLib = "univocity") .select("year") .collect() @@ -173,27 +331,27 @@ class CsvFastSuite extends FunSuite { } test("DDL test with empty file") { - sql(s""" - |CREATE TEMPORARY TABLE carsTable - |(yearMade double, makeName string, modelName string, comments string, grp string) - |USING com.databricks.spark.csv - |OPTIONS (path "$emptyFile", header "false", parserLib "univocity") + sql( s""" + |CREATE TEMPORARY TABLE carsTable + |(yearMade double, makeName string, modelName string, comments string, grp string) + |USING com.databricks.spark.csv + |OPTIONS (path "$emptyFile", header "false", parserLib "univocity") """.stripMargin.replaceAll("\n", " ")) assert(sql("SELECT count(*) FROM carsTable").collect().head(0) === 0) } test("DDL test with schema") { - sql(s""" - |CREATE TEMPORARY TABLE carsTable - |(yearMade double, makeName string, modelName string, comments string, grp string) - |USING com.databricks.spark.csv - |OPTIONS (path "$carsFile", header "true", parserLib "univocity") + sql( s""" + |CREATE TEMPORARY TABLE carsTable + |(yearMade double, makeName string, modelName string, comments string, grp string) + |USING com.databricks.spark.csv + |OPTIONS (path "$carsFile", header "true", parserLib "univocity") """.stripMargin.replaceAll("\n", " ")) assert(sql("SELECT makeName FROM carsTable").collect().size === numCars) assert(sql("SELECT avg(yearMade) FROM carsTable where grp = '' group by grp") - .collect().head(0) === 2004.5) + .collect().head(0) === 2008) } test("DSL column names test") { @@ -215,11 +373,11 @@ class CsvFastSuite extends FunSuite { |USING com.databricks.spark.csv |OPTIONS (path "$carsFile", header "true", parserLib "univocity") """.stripMargin.replaceAll("\n", " ")) - sql(s""" - |CREATE TEMPORARY TABLE carsTableEmpty - |(yearMade double, makeName string, modelName string, comments string, grp string) - |USING com.databricks.spark.csv - |OPTIONS (path "$tempEmptyDir", header "false", parserLib "univocity") + sql( s""" + |CREATE TEMPORARY TABLE carsTableEmpty + |(yearMade double, makeName string, modelName string, comments string, grp string) + |USING com.databricks.spark.csv + |OPTIONS (path "$tempEmptyDir", header "false", parserLib "univocity") """.stripMargin.replaceAll("\n", " ")) assert(sql("SELECT * FROM carsTableIO").collect().size === numCars) @@ -270,7 +428,7 @@ class CsvFastSuite extends FunSuite { val copyFilePath = tempEmptyDir + "cars-copy.csv" val cars = TestSQLContext.csvFile(carsFile, parserLib = "univocity") - cars.saveAsCsvFile(copyFilePath, Map("header" -> "true", "headerPerPart" -> "false", "quote" -> "\"")) + cars.saveAsCsvFile(copyFilePath, Map("header" -> "true", "headerPerPart" -> "false", "quote" -> "\"")) val carsCopy = TestSQLContext.csvFile(copyFilePath + "/", parserLib = "univocity") @@ -285,7 +443,7 @@ class CsvFastSuite extends FunSuite { val copyFilePath = tempEmptyDir + "cars-copy.csv" val cars = TestSQLContext.csvFile(carsFile) - cars.saveAsCsvFile(copyFilePath, Map("header" -> "true", "headerPerPart" -> "false", "quote" -> "!")) + cars.saveAsCsvFile(copyFilePath, Map("header" -> "true", "headerPerPart" -> "false", "quote" -> "!")) val carsCopy = TestSQLContext.csvFile(copyFilePath + "/", quote = '!', parserLib = "univocity") @@ -299,8 +457,8 @@ class CsvFastSuite extends FunSuite { new File(tempEmptyDir).mkdirs() val copyFilePath = tempEmptyDir + "escape-copy.csv" - val escape = TestSQLContext.csvFile(escapeFile, escape='|', quote='"') - escape.saveAsCsvFile(copyFilePath, Map("header" -> "true", "headerPerPart" -> "false", "quote" -> "\"")) + val escape = TestSQLContext.csvFile(escapeFile, escape = '|', quote = '"') + escape.saveAsCsvFile(copyFilePath, Map("header" -> "true", "headerPerPart" -> "false", "quote" -> "\"")) val escapeCopy = TestSQLContext.csvFile(copyFilePath + "/", parserLib = "univocity") diff --git a/src/test/scala/com/databricks/spark/csv/CsvSuite.scala b/src/test/scala/com/databricks/spark/csv/CsvSuite.scala index bef1b74..ebb0022 100755 --- a/src/test/scala/com/databricks/spark/csv/CsvSuite.scala +++ b/src/test/scala/com/databricks/spark/csv/CsvSuite.scala @@ -145,7 +145,7 @@ class CsvSuite extends FunSuite { .collect() assert(results.slice(0, numCars).toSeq.map(_(0).asInstanceOf[String]) == - Seq("'2012'", "1997", "2015")) + Seq(" '2012' ", " 1997", "2015 ")) } test("DDL test with alternative delimiter and quote") { diff --git a/src/test/scala/com/databricks/spark/csv/util/TypeCastSuite.scala b/src/test/scala/com/databricks/spark/csv/util/TypeCastSuite.scala index dca2abd..4e53927 100644 --- a/src/test/scala/com/databricks/spark/csv/util/TypeCastSuite.scala +++ b/src/test/scala/com/databricks/spark/csv/util/TypeCastSuite.scala @@ -19,7 +19,7 @@ import java.math.BigDecimal import org.scalatest.FunSuite -import org.apache.spark.sql.types.DecimalType +import org.apache.spark.sql.types.{FloatType, DoubleType, DecimalType} class TypeCastSuite extends FunSuite { @@ -33,6 +33,19 @@ class TypeCastSuite extends FunSuite { } } + test("Can parse special") { + val strValues = Seq("NaN", "Infinity", "-Infinity") + val doubleChecks: Seq[Double => Boolean] = Seq(x => x.isNaN, x => x.isPosInfinity, x => x.isNegInfinity) + val floatChecks: Seq[Float => Boolean] = Seq(x => x.isNaN, x => x.isPosInfinity, x => x.isNegInfinity) + + strValues.zip(doubleChecks).foreach { case (strVal, checker) => + assert(checker(TypeCast.castTo(strVal, DoubleType).asInstanceOf[Double])) + } + strValues.zip(floatChecks).foreach { case (strVal, checker) => + assert(checker(TypeCast.castTo(strVal, FloatType).asInstanceOf[Float])) + } + } + test("Can parse escaped characters") { assert(TypeCast.toChar("""\t""") === '\t') assert(TypeCast.toChar("""\r""") === '\r') From 5d3c906fae32806436a59bd849f5a08dc9937371 Mon Sep 17 00:00:00 2001 From: Mohit Jaggi Date: Tue, 28 Jul 2015 23:27:50 -0700 Subject: [PATCH 09/17] scala style fixes --- .../databricks/spark/csv/CsvRelation.scala | 44 ++++++++++++------- .../databricks/spark/csv/ParsingOptions.scala | 20 +++++---- .../com/databricks/spark/csv/package.scala | 5 ++- 3 files changed, 41 insertions(+), 28 deletions(-) diff --git a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala index eb8a731..a40062b 100755 --- a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala +++ b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala @@ -41,7 +41,8 @@ case class CsvRelation protected[spark] ( lineExceptionPolicy: LineParsingOpts = LineParsingOpts(), realNumOpts: RealNumberParsingOpts = RealNumberParsingOpts(), intNumOpts: IntNumberParsingOpts = IntNumberParsingOpts(), - stringParsingOpts: StringParsingOpts = StringParsingOpts())(@transient val sqlContext: SQLContext) + stringParsingOpts: StringParsingOpts = StringParsingOpts()) + (@transient val sqlContext: SQLContext) extends BaseRelation with TableScan with InsertableRelation { private val logger = LoggerFactory.getLogger(CsvRelation.getClass) @@ -99,7 +100,8 @@ case class CsvRelation protected[spark] ( userSchema } else { val firstRow = if(ParserLibs.isUnivocityLib(parserLib)) { - val escapeVal = if (csvParsingOpts.escapeChar == null) '\\' else csvParsingOpts.escapeChar.charValue() + val escapeVal = if (csvParsingOpts.escapeChar == null) '\\' + else csvParsingOpts.escapeChar.charValue() new LineCsvReader(fieldSep = csvParsingOpts.delimiter, quote = csvParsingOpts.quoteChar, escape = escapeVal) @@ -139,7 +141,7 @@ case class CsvRelation protected[spark] ( schemaFields: Seq[StructField]) = { val dataLines = if (useHeader) { file.mapPartitionsWithIndex({ - case (partitionIndex, iter) => if (partitionIndex == 0) iter.drop(1) else iter //FIXME: empty partition? + case (partitionIndex, iter) => if (partitionIndex == 0) iter.drop(1) else iter }, true) } else { @@ -148,7 +150,8 @@ case class CsvRelation protected[spark] ( val rows = dataLines.mapPartitionsWithIndex({ case (split, iter) => { - val escapeVal = if(csvParsingOpts.escapeChar == null) '\\' else csvParsingOpts.escapeChar.charValue() + val escapeVal = if(csvParsingOpts.escapeChar == null) '\\' + else csvParsingOpts.escapeChar.charValue() new BulkCsvReader(iter, split, headers = header, fieldSep = csvParsingOpts.delimiter, quote = csvParsingOpts.quoteChar, escape = escapeVal, @@ -173,27 +176,33 @@ case class CsvRelation protected[spark] ( try { rowArray(index) = TypeCast.castTo(tokens(index), schemaFields(index).dataType) } catch { - case e : NumberFormatException if realNumOpts.enable && - (schemaFields(index).dataType == DoubleType || schemaFields(index).dataType == FloatType) => + case e: NumberFormatException if realNumOpts.enable && + (schemaFields(index).dataType == DoubleType || + schemaFields(index).dataType == FloatType) => - rowArray(index) = if (realNumOpts.nullStrings.contains(tokens(index))) + rowArray(index) = if (realNumOpts.nullStrings.contains(tokens(index))) { null - else if (realNumOpts.nanStrings.contains(tokens(index))) + } else if (realNumOpts.nanStrings.contains(tokens(index))) { TypeCast.castTo("NaN", schemaFields(index).dataType) - else if (realNumOpts.infPosStrings.contains(tokens(index))) + } else if (realNumOpts.infPosStrings.contains(tokens(index))) { TypeCast.castTo("Infinity", schemaFields(index).dataType) - else if (realNumOpts.infNegStrings.contains(tokens(index))) + } else if (realNumOpts.infNegStrings.contains(tokens(index))) { TypeCast.castTo("-Infinity", schemaFields(index).dataType) - else - throw new IllegalStateException(s"Failed to parse as double/float number ${tokens(index)}") + } else { + throw new IllegalStateException( + s"Failed to parse as double/float number ${tokens(index)}") + } case _ : NumberFormatException if intNumOpts.enable && - (schemaFields(index).dataType == IntegerType || schemaFields(index).dataType == LongType) => + (schemaFields(index).dataType == IntegerType || + schemaFields(index).dataType == LongType) => - rowArray(index) = if (intNumOpts.nullStrings.contains(tokens(index))) + rowArray(index) = if (intNumOpts.nullStrings.contains(tokens(index))) { null - else - throw new IllegalStateException(s"Failed to parse as int/long number ${tokens(index)}") + } else { + throw new IllegalStateException( + s"Failed to parse as int/long number ${tokens(index)}") + } } index = index + 1 } @@ -202,7 +211,8 @@ case class CsvRelation protected[spark] ( case aiob: ArrayIndexOutOfBoundsException if permissive || lineExceptionPolicy.badLinePolicy == LineExceptionPolicy.Fill => (index until schemaFields.length).foreach { ind => - rowArray(ind) = TypeCast.castTo(lineExceptionPolicy.fillValue, schemaFields(index).dataType) + rowArray(ind) = TypeCast.castTo(lineExceptionPolicy.fillValue, + schemaFields(index).dataType) } Some(Row.fromSeq(rowArray)) case NonFatal(e) if !failFast => diff --git a/src/main/scala/com/databricks/spark/csv/ParsingOptions.scala b/src/main/scala/com/databricks/spark/csv/ParsingOptions.scala index 7577f8a..8bb292c 100644 --- a/src/main/scala/com/databricks/spark/csv/ParsingOptions.scala +++ b/src/main/scala/com/databricks/spark/csv/ParsingOptions.scala @@ -56,11 +56,12 @@ object ParsingOptions { * @param nanStrings these strings are NaNs * @param enable make this false to stop attempting to parse numbers i.e. treat them as strings */ -case class RealNumberParsingOpts(var nanStrings: Set[String] = ParsingOptions.defaultNaNStrings, - var infPosStrings: Set[String] = ParsingOptions.defaultInfPosString, - var infNegStrings: Set[String] = ParsingOptions.defaultInfNegString, - var nullStrings: Set[String] = ParsingOptions.defaultNullStrings, - var enable: Boolean = true) +case class RealNumberParsingOpts( + var nanStrings: Set[String] = ParsingOptions.defaultNaNStrings, + var infPosStrings: Set[String] = ParsingOptions.defaultInfPosString, + var infNegStrings: Set[String] = ParsingOptions.defaultInfNegString, + var nullStrings: Set[String] = ParsingOptions.defaultNullStrings, + var enable: Boolean = true) /** * Options to control parsing of integral numbers e.g. the types Int and Long @@ -81,13 +82,14 @@ case class StringParsingOpts(var emptyStringReplace: String = "", * @param badLinePolicy abort, ignore line or fill with nulls when a bad line is encountered * @param fillValue if line exception policy is to fill in the blanks, use this value to fill */ -case class LineParsingOpts(var badLinePolicy: LineExceptionPolicy.EnumVal = LineExceptionPolicy.Fill, - var fillValue: String = "") +case class LineParsingOpts( + var badLinePolicy: LineExceptionPolicy.EnumVal = LineExceptionPolicy.Fill, + var fillValue: String = "") /** * CSV parsing options - * @param quoteChar fields containing delimiters, other special chars are quoted using this character - * e.g. "this is a comma ," + * @param quoteChar fields containing delimiters, other special chars are quoted using this + * character e.g. "this is a comma ," * @param escapeChar if a quote character appears in a field, it is escaped using this * e.g. "this is a quote \"" * @param ignoreLeadingWhitespace ignore white space before a field diff --git a/src/main/scala/com/databricks/spark/csv/package.scala b/src/main/scala/com/databricks/spark/csv/package.scala index b9dfa74..aef025a 100755 --- a/src/main/scala/com/databricks/spark/csv/package.scala +++ b/src/main/scala/com/databricks/spark/csv/package.scala @@ -144,10 +144,11 @@ package object csv { } val header = if (generateHeader) { - if (sparseColInfo == null) + if (sparseColInfo == null) { csvFormat.format(dataFrame.columns.map(_.asInstanceOf[AnyRef]): _*) - else + } else { makeHeader + } } else { "" // There is no need to generate header in this case } From beab9c2bbde3a72b602cd7e440b6d6681a795006 Mon Sep 17 00:00:00 2001 From: Mohit Jaggi Date: Thu, 30 Jul 2015 15:55:32 -0700 Subject: [PATCH 10/17] easier interface for options --- .../com/databricks/spark/csv/CsvParser.scala | 9 + .../databricks/spark/csv/ParsingOptions.scala | 161 ++++++++++++++++-- .../com/databricks/spark/csv/CsvSuite.scala | 14 ++ .../databricks/spark/csv/OptionsSuite.scala | 89 ++++++++++ 4 files changed, 261 insertions(+), 12 deletions(-) create mode 100644 src/test/scala/com/databricks/spark/csv/OptionsSuite.scala diff --git a/src/main/scala/com/databricks/spark/csv/CsvParser.scala b/src/main/scala/com/databricks/spark/csv/CsvParser.scala index 25d7950..16d8c77 100644 --- a/src/main/scala/com/databricks/spark/csv/CsvParser.scala +++ b/src/main/scala/com/databricks/spark/csv/CsvParser.scala @@ -106,6 +106,15 @@ class CsvParser { this } + def withOpts(optMap: Map[String, String]) = { + this.stringParsingOpts = StringParsingOpts(optMap) + this.lineParsingOpts = LineParsingOpts(optMap) + this.realNumberParsingOpts = RealNumberParsingOpts(optMap) + this.intNumberParsingOpts = IntNumberParsingOpts(optMap) + this.csvParsingOpts = CSVParsingOpts(optMap) + this + } + /** Returns a Schema RDD for the given CSV path. */ @throws[RuntimeException] def csvFile(sqlContext: SQLContext, path: String): DataFrame = { diff --git a/src/main/scala/com/databricks/spark/csv/ParsingOptions.scala b/src/main/scala/com/databricks/spark/csv/ParsingOptions.scala index 8bb292c..938bef7 100644 --- a/src/main/scala/com/databricks/spark/csv/ParsingOptions.scala +++ b/src/main/scala/com/databricks/spark/csv/ParsingOptions.scala @@ -49,42 +49,51 @@ object ParsingOptions { val defaultNaNStrings = HashSet("NaN", "nan") val defaultInfPosString = HashSet("+Inf", "Inf", "Infinity", "+Infinity", "inf", "+inf") val defaultInfNegString = HashSet("-Inf", "-inf", "-Infinity") + + private[csv] def delimitedStringToSet(str: String) = { + str.split(",").toSet + } } /** * Options to control parsing of real numbers e.g. the types Float and Double - * @param nanStrings these strings are NaNs - * @param enable make this false to stop attempting to parse numbers i.e. treat them as strings + * @param nanStrings NaNs + * @param nullStrings nulls + * @param infNegStrings negative infinity + * @param infPosStrings positive infinity + * @param enable false to not apply these options, default true */ case class RealNumberParsingOpts( - var nanStrings: Set[String] = ParsingOptions.defaultNaNStrings, - var infPosStrings: Set[String] = ParsingOptions.defaultInfPosString, - var infNegStrings: Set[String] = ParsingOptions.defaultInfNegString, - var nullStrings: Set[String] = ParsingOptions.defaultNullStrings, - var enable: Boolean = true) + var nanStrings: Set[String] = ParsingOptions.defaultNaNStrings, + var infPosStrings: Set[String] = ParsingOptions.defaultInfPosString, + var infNegStrings: Set[String] = ParsingOptions.defaultInfNegString, + var nullStrings: Set[String] = ParsingOptions.defaultNullStrings, + var enable: Boolean = true) /** * Options to control parsing of integral numbers e.g. the types Int and Long - * @param enable make this false to stop attempting to parse numbers i.e. treat them as strings + * @param nullStrings nulls + * @param enable false to not apply these options, default true */ case class IntNumberParsingOpts(var nullStrings: Set[String] = ParsingOptions.defaultNullStrings, var enable: Boolean = true) /** * Options to control parsing of strings + * @param nullStrings nulls * @param emptyStringReplace replace empty string with this string */ case class StringParsingOpts(var emptyStringReplace: String = "", - var nullStrings: Set[String] = ParsingOptions.defaultNullStrings) + var nullStrings: Set[String] = ParsingOptions.defaultNullStrings) /** * options to handle exceptions while parsing a line - * @param badLinePolicy abort, ignore line or fill with nulls when a bad line is encountered + * @param badLinePolicy abort, ignore line or fill with fillValue when not enough fields are parsed * @param fillValue if line exception policy is to fill in the blanks, use this value to fill */ case class LineParsingOpts( - var badLinePolicy: LineExceptionPolicy.EnumVal = LineExceptionPolicy.Fill, - var fillValue: String = "") + var badLinePolicy: LineExceptionPolicy.EnumVal = LineExceptionPolicy.Fill, + var fillValue: String = "") /** * CSV parsing options @@ -102,3 +111,131 @@ case class CSVParsingOpts(var delimiter: Character = ',', var ignoreLeadingWhitespace: Boolean = true, var ignoreTrailingWhitespace: Boolean = true, var numParts: Int = 0) + +/** + * builds a [[RealNumberParsingOpts]] instance from "text" + * realNumParsingOpts.{nans, infs, -infs, nulls, enable} are supported + */ +object RealNumberParsingOpts { + val prefix = "realNumParsingOpts." + val build = RealNumberParsingOpts() + + def apply(opts: Map[String, String]): RealNumberParsingOpts = { + for (opt <- opts if opt._1.startsWith(prefix)) { + (opt._1.stripPrefix(prefix), opt._2) match { + case ("nans", value: String) => + build.nanStrings = ParsingOptions.delimitedStringToSet(value) + case ("infs", value: String) => + build.infPosStrings = ParsingOptions.delimitedStringToSet(value) + case ("-infs", value: String) => + build.infNegStrings = ParsingOptions.delimitedStringToSet(value) + case ("nulls", value: String) => + build.nullStrings = ParsingOptions.delimitedStringToSet(value) + case ("enable", value: String) => build.enable = value.toBoolean + case _ => throw new IllegalArgumentException(s"Unknown option $opt") + } + } + + build + } +} + +/** + * builds a [[IntNumberParsingOpts]] instance from "text" + * intNumParsingOpts.{nulls, enable} are supported + */ +object IntNumberParsingOpts { + val prefix = "intNumParsingOpts." + val build = IntNumberParsingOpts() + + def apply(opts: Map[String, String]): IntNumberParsingOpts = { + for (opt <- opts if opt._1.startsWith(prefix)) { + (opt._1.stripPrefix(prefix), opt._2) match { + case ("nulls", value: String) => + build.nullStrings = ParsingOptions.delimitedStringToSet(value) + case ("enable", value: String) => build.enable = value.toBoolean + case _ => throw new IllegalArgumentException(s"Unknown option $opt") + } + } + + build + } +} + +/** + * builds a [[StringParsingOpts]] instance from "text" + * stringParsingOpts.{nulls, emptyStringReplace} are supported + */ +object StringParsingOpts { + val prefix = "stringParsingOpts." + val build = StringParsingOpts() + + def apply(opts: Map[String, String]): StringParsingOpts = { + for (opt <- opts if opt._1.startsWith(prefix)) { + (opt._1.stripPrefix(prefix), opt._2) match { + case ("nulls", value: String) => + build.nullStrings = ParsingOptions.delimitedStringToSet(value) + case ("emptyStringReplace", value: String) => build.emptyStringReplace = value + case _ => throw new IllegalArgumentException(s"Unknown option $opt") + } + } + + build + } +} + +/** + * builds a [[LineParsingOpts]] instance from "text" + * lineParsingOpts.{badLinePolicy, fillValue} are supported + * lineParsingOpts.badLinePolicy can be one of fill, ignore or abort + */ +object LineParsingOpts { + val prefix = "lineParsingOpts." + val build = LineParsingOpts() + + def apply(opts: Map[String, String]): LineParsingOpts = { + for (opt <- opts if opt._1.startsWith(prefix)) { + (opt._1.stripPrefix(prefix), opt._2) match { + case ("badLinePolicy", value: String) => + build.badLinePolicy = value.toLowerCase match { + case "fill" => LineExceptionPolicy.Fill + case "ignore" => LineExceptionPolicy.Ignore + case "abort" => LineExceptionPolicy.Abort + case _ => throw new IllegalArgumentException(s"Unknown option $opt") + } + case ("fillValue", value: String) => build.fillValue = value + case _ => throw new IllegalArgumentException(s"Unknown option $opt") + } + } + + build + } +} + +/** + * builds a [[CSVParsingOpts]] instance from "text" + * csvParsingOpts.{delimiter, quote, escape, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, + * numParts} are supported + */ +object CSVParsingOpts { + val prefix = "csvParsingOpts." + val build = CSVParsingOpts() + + def apply(opts: Map[String, String]): CSVParsingOpts = { + for (opt <- opts if opt._1.startsWith(prefix)) { + (opt._1.stripPrefix(prefix), opt._2) match { + case ("delimiter", value: String) => build.delimiter = value.charAt(0); + case ("quote", value: String) => build.quoteChar = value.charAt(0) + case ("escape", value: String) => build.escapeChar = value.charAt(0) + case ("ignoreLeadingSpace", value: String) => + build.ignoreLeadingWhitespace = value.toBoolean + case ("ignoreTrailingSpace", value: String) => + build.ignoreTrailingWhitespace = value.toBoolean + case ("numParts", value: String) => build.numParts = value.toInt + case _ => throw new IllegalArgumentException(s"Unknown option $opt") + } + } + + build + } +} diff --git a/src/test/scala/com/databricks/spark/csv/CsvSuite.scala b/src/test/scala/com/databricks/spark/csv/CsvSuite.scala index ebb0022..ad5ef08 100755 --- a/src/test/scala/com/databricks/spark/csv/CsvSuite.scala +++ b/src/test/scala/com/databricks/spark/csv/CsvSuite.scala @@ -120,6 +120,20 @@ class CsvSuite extends FunSuite { assert(results.size === numCars) } + test("DSL test with alternative delimiter and quote using simple options API") { + val optMap = Map("csvParsingOpts.quote" -> "'", + "csvParsingOpts.delimiter" -> "|" + ) + + val results = new CsvParser().withOpts(optMap) + .withUseHeader(true) + .csvFile(TestSQLContext, carsAltFile) + .select("year") + .collect() + + assert(results.size === numCars) + } + test("DSL test with alternative delimiter and quote using sparkContext.csvFile") { val results = TestSQLContext.csvFile(carsAltFile, useHeader = true, delimiter = '|', quote = '\'') diff --git a/src/test/scala/com/databricks/spark/csv/OptionsSuite.scala b/src/test/scala/com/databricks/spark/csv/OptionsSuite.scala new file mode 100644 index 0000000..3dfdb77 --- /dev/null +++ b/src/test/scala/com/databricks/spark/csv/OptionsSuite.scala @@ -0,0 +1,89 @@ +// scalastyle:off +/* + * Copyright 2015 Ayasdi Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +// scalastyle:on +package com.databricks.spark.csv + +import scala.collection.immutable.HashSet + +import org.scalatest.FunSuite + +class OptionsSuite extends FunSuite { + test("csv opts") { + val optMap = Map("csvParsingOpts.delimiter" -> "|", + "csvParsingOpts.quote" -> "[", + "csvParsingOpts.ignoreLeadingSpace" -> "false", + "csvParsingOpts.ignoreTrailingSpace" -> "true", + "csvParsingOpts.escape" -> ":", + "csvParsingOpts.numParts" -> "5") + println(optMap) + val opts = CSVParsingOpts(optMap) + + assert(opts.delimiter === '|') + assert(opts.escapeChar === ':') + assert(opts.ignoreLeadingWhitespace === false) + assert(opts.ignoreTrailingWhitespace === true) + assert(opts.numParts === 5) + assert(opts.quoteChar === '[') + } + + test("line opts") { + val optMap = Map("lineParsingOpts.badLinePolicy" -> "abort", + "lineParsingOpts.fillValue" -> "duh") + println(optMap) + val opts = LineParsingOpts(optMap) + + assert(opts.fillValue === "duh") + assert(opts.badLinePolicy === LineExceptionPolicy.Abort) + } + + test("string opts") { + val optMap = Map("stringParsingOpts.nulls" -> "abcd,efg", + "stringParsingOpts.emptyStringReplace" -> "") + println(optMap) + val opts = StringParsingOpts(optMap) + + assert(opts.nullStrings === HashSet("abcd", "efg")) + assert(opts.emptyStringReplace === "") + } + + test("int opts") { + val optMap = Map("intNumParsingOpts.nulls" -> "abcd,efg", + "intNumParsingOpts.enable" -> "false") + println(optMap) + val opts = IntNumberParsingOpts(optMap) + + assert(opts.nullStrings === HashSet("abcd", "efg")) + assert(opts.enable === false) + } + + test("real opts") { + val optMap = Map("realNumParsingOpts.nulls" -> "abcd,efg", + "realNumParsingOpts.enable" -> "false", + "realNumParsingOpts.nans" -> "NaN,nan", + "realNumParsingOpts.infs" -> "iinnff,IINNFF", + "realNumParsingOpts.-infs" -> "minusInf") + println(optMap) + val opts = RealNumberParsingOpts(optMap) + + assert(opts.nullStrings === HashSet("abcd", "efg")) + assert(opts.nanStrings === HashSet("NaN", "nan")) + assert(opts.infPosStrings === HashSet("iinnff", "IINNFF")) + assert(opts.infNegStrings === HashSet("minusInf")) + assert(opts.enable === false) + } + +} From df6914e8443bb4b07302e75cc01a2ab8a84e81b8 Mon Sep 17 00:00:00 2001 From: Mohit Jaggi Date: Fri, 31 Jul 2015 10:18:53 -0700 Subject: [PATCH 11/17] fix unit test breakage --- .../databricks/spark/csv/CsvRelation.scala | 26 +++++++++---------- .../databricks/spark/csv/ParsingOptions.scala | 10 +++---- .../databricks/spark/csv/util/TextFile.scala | 5 ++-- .../databricks/spark/csv/CsvFastSuite.scala | 10 ++++--- .../com/databricks/spark/csv/CsvSuite.scala | 2 +- 5 files changed, 28 insertions(+), 25 deletions(-) diff --git a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala index e70f702..c728f43 100755 --- a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala +++ b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala @@ -32,18 +32,18 @@ import com.databricks.spark.csv.util._ import com.databricks.spark.sql.readers._ case class CsvRelation protected[spark]( - location: String, - useHeader: Boolean, - csvParsingOpts: CSVParsingOpts, - parseMode: String, - parserLib: String, - userSchema: StructType = null, - lineExceptionPolicy: LineParsingOpts = LineParsingOpts(), - realNumOpts: RealNumberParsingOpts = RealNumberParsingOpts(), - intNumOpts: IntNumberParsingOpts = IntNumberParsingOpts(), - stringParsingOpts: StringParsingOpts = StringParsingOpts(), - charset: String = TextFile.DEFAULT_CHARSET.name(), - inferCsvSchema: Boolean)(@transient val sqlContext: SQLContext) + location: String, + useHeader: Boolean, + csvParsingOpts: CSVParsingOpts, + parseMode: String, + parserLib: String, + userSchema: StructType = null, + lineExceptionPolicy: LineParsingOpts = LineParsingOpts(), + realNumOpts: RealNumberParsingOpts = RealNumberParsingOpts(), + intNumOpts: IntNumberParsingOpts = IntNumberParsingOpts(), + stringParsingOpts: StringParsingOpts = StringParsingOpts(), + charset: String = TextFile.DEFAULT_CHARSET.name(), + inferCsvSchema: Boolean)(@transient val sqlContext: SQLContext) extends BaseRelation with TableScan with InsertableRelation { private val logger = LoggerFactory.getLogger(CsvRelation.getClass) @@ -106,7 +106,7 @@ case class CsvRelation protected[spark]( None } else if (schemaFields.length != tokens.size && (failFast || lineExceptionPolicy.badLinePolicy == LineExceptionPolicy.Abort)) { - throw new RuntimeException(s"Malformed line in FAILFAST mode: $errorDetail") + throw new RuntimeException(s"Malformed line in FAILFAST or Abort mode: $errorDetail") } else { var index: Int = 0 val rowArray = new Array[Any](schemaFields.length) diff --git a/src/main/scala/com/databricks/spark/csv/ParsingOptions.scala b/src/main/scala/com/databricks/spark/csv/ParsingOptions.scala index 4e464a6..b0849fc 100644 --- a/src/main/scala/com/databricks/spark/csv/ParsingOptions.scala +++ b/src/main/scala/com/databricks/spark/csv/ParsingOptions.scala @@ -118,9 +118,9 @@ case class CSVParsingOpts(var delimiter: Character = ',', */ object RealNumberParsingOpts { val prefix = "realNumParsingOpts." - val build = RealNumberParsingOpts() def apply(opts: Map[String, String]): RealNumberParsingOpts = { + val build = RealNumberParsingOpts() for (opt <- opts if opt._1.startsWith(prefix)) { (opt._1.stripPrefix(prefix), opt._2) match { case ("nans", value: String) => @@ -146,9 +146,9 @@ object RealNumberParsingOpts { */ object IntNumberParsingOpts { val prefix = "intNumParsingOpts." - val build = IntNumberParsingOpts() def apply(opts: Map[String, String]): IntNumberParsingOpts = { + val build = IntNumberParsingOpts() for (opt <- opts if opt._1.startsWith(prefix)) { (opt._1.stripPrefix(prefix), opt._2) match { case ("nulls", value: String) => @@ -168,9 +168,9 @@ object IntNumberParsingOpts { */ object StringParsingOpts { val prefix = "stringParsingOpts." - val build = StringParsingOpts() def apply(opts: Map[String, String]): StringParsingOpts = { + val build = StringParsingOpts() for (opt <- opts if opt._1.startsWith(prefix)) { (opt._1.stripPrefix(prefix), opt._2) match { case ("nulls", value: String) => @@ -191,9 +191,9 @@ object StringParsingOpts { */ object LineParsingOpts { val prefix = "lineParsingOpts." - val build = LineParsingOpts() def apply(opts: Map[String, String]): LineParsingOpts = { + val build = LineParsingOpts() for (opt <- opts if opt._1.startsWith(prefix)) { (opt._1.stripPrefix(prefix), opt._2) match { case ("badLinePolicy", value: String) => @@ -219,9 +219,9 @@ object LineParsingOpts { */ object CSVParsingOpts { val prefix = "csvParsingOpts." - val build = CSVParsingOpts() def apply(opts: Map[String, String]): CSVParsingOpts = { + val build = CSVParsingOpts() for (opt <- opts if opt._1.startsWith(prefix)) { (opt._1.stripPrefix(prefix), opt._2) match { case ("delimiter", value: String) => build.delimiter = value.charAt(0); diff --git a/src/main/scala/com/databricks/spark/csv/util/TextFile.scala b/src/main/scala/com/databricks/spark/csv/util/TextFile.scala index 944cb42..ab4f2e3 100644 --- a/src/main/scala/com/databricks/spark/csv/util/TextFile.scala +++ b/src/main/scala/com/databricks/spark/csv/util/TextFile.scala @@ -29,10 +29,11 @@ private[csv] object TextFile { def withCharset(context: SparkContext, location: String, charset: String, numParts: Int = 0): RDD[String] = { if (Charset.forName(charset) == DEFAULT_CHARSET) { - if (numParts == 0) + if (numParts == 0) { context.textFile(location) - else + } else { context.textFile(location, numParts) + } } else { // can't pass a Charset object here cause its not serializable // TODO: maybe use mapPartitions instead? diff --git a/src/test/scala/com/databricks/spark/csv/CsvFastSuite.scala b/src/test/scala/com/databricks/spark/csv/CsvFastSuite.scala index 5c5e1e9..6ad536b 100644 --- a/src/test/scala/com/databricks/spark/csv/CsvFastSuite.scala +++ b/src/test/scala/com/databricks/spark/csv/CsvFastSuite.scala @@ -299,7 +299,7 @@ class CsvFastSuite extends FunSuite { .collect() } - assert(exception.getMessage.contains("Malformed line in FAILFAST mode")) + assert(exception.getMessage.contains("Malformed line in FAILFAST or Abort mode")) } test("DSL test with alternative delimiter and quote") { @@ -451,7 +451,7 @@ class CsvFastSuite extends FunSuite { val cars = TestSQLContext.csvFile(carsFile, parserLib = "univocity") cars.saveAsCsvFile(copyFilePath, Map("header" -> "true", "headerPerPart" -> "false")) - val carsCopy = TestSQLContext.csvFile(copyFilePath + "/") + val carsCopy = TestSQLContext.csvFile(copyFilePath + "/", parserLib = "univocity") assert(carsCopy.count == cars.count) assert(carsCopy.collect.map(_.toString).toSet == cars.collect.map(_.toString).toSet) @@ -494,7 +494,8 @@ class CsvFastSuite extends FunSuite { val copyFilePath = tempEmptyDir + "cars-copy.csv" val cars = TestSQLContext.csvFile(carsFile) - cars.saveAsCsvFile(copyFilePath, Map("header" -> "true", "headerPerPart" -> "false", "quote" -> "!")) + cars.saveAsCsvFile(copyFilePath, + Map("header" -> "true", "headerPerPart" -> "false", "quote" -> "!")) val carsCopy = TestSQLContext.csvFile(copyFilePath + "/", quote = '!', parserLib = "univocity") @@ -509,7 +510,8 @@ class CsvFastSuite extends FunSuite { val copyFilePath = tempEmptyDir + "escape-copy.csv" val escape = TestSQLContext.csvFile(escapeFile, escape = '|', quote = '"') - escape.saveAsCsvFile(copyFilePath, Map("header" -> "true", "headerPerPart" -> "false", "quote" -> "\"")) + escape.saveAsCsvFile(copyFilePath, + Map("header" -> "true", "headerPerPart" -> "false", "quote" -> "\"")) val escapeCopy = TestSQLContext.csvFile(copyFilePath + "/", parserLib = "univocity") diff --git a/src/test/scala/com/databricks/spark/csv/CsvSuite.scala b/src/test/scala/com/databricks/spark/csv/CsvSuite.scala index 7163059..6f1eeff 100755 --- a/src/test/scala/com/databricks/spark/csv/CsvSuite.scala +++ b/src/test/scala/com/databricks/spark/csv/CsvSuite.scala @@ -134,7 +134,7 @@ class CsvSuite extends FunSuite { .collect() } - assert(exception.getMessage.contains("Malformed line in FAILFAST mode")) + assert(exception.getMessage.contains("Malformed line in FAILFAST or Abort mode")) } From 72a91fc5ca0b7fceb633f9274b2c993749d1db05 Mon Sep 17 00:00:00 2001 From: Mohit Jaggi Date: Fri, 31 Jul 2015 20:46:46 -0700 Subject: [PATCH 12/17] add options to ddl --- .../databricks/spark/csv/DefaultSource.scala | 58 ++++++++++++------- .../com/databricks/spark/csv/CsvSuite.scala | 11 ++++ 2 files changed, 47 insertions(+), 22 deletions(-) diff --git a/src/main/scala/com/databricks/spark/csv/DefaultSource.scala b/src/main/scala/com/databricks/spark/csv/DefaultSource.scala index bd2f8e1..46004af 100755 --- a/src/main/scala/com/databricks/spark/csv/DefaultSource.scala +++ b/src/main/scala/com/databricks/spark/csv/DefaultSource.scala @@ -19,7 +19,6 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType - import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} import com.databricks.spark.csv.util.{ParserLibs, TextFile, TypeCast} @@ -47,9 +46,9 @@ class DefaultSource * Parameters have to include 'path' and optionally 'delimiter', 'quote', and 'header' */ override def createRelation( - sqlContext: SQLContext, - parameters: Map[String, String], - schema: StructType) = { + sqlContext: SQLContext, + parameters: Map[String, String], + schema: StructType) = { val path = checkPath(parameters) val delimiter = TypeCast.toChar(parameters.getOrElse("delimiter", ",")) @@ -82,10 +81,10 @@ class DefaultSource val parserLib = parameters.getOrElse("parserLib", ParserLibs.DEFAULT) val ignoreLeadingWhiteSpace = parameters.getOrElse("ignoreLeadingWhiteSpace", "false") - val ignoreLeadingWhiteSpaceFlag = if(ignoreLeadingWhiteSpace == "false") { + val ignoreLeadingWhiteSpaceFlag = if (ignoreLeadingWhiteSpace == "false") { false - } else if(ignoreLeadingWhiteSpace == "true") { - if(!ParserLibs.isUnivocityLib(parserLib)) { + } else if (ignoreLeadingWhiteSpace == "true") { + if (!ParserLibs.isUnivocityLib(parserLib)) { throw new Exception("Ignore whitesspace supported for Univocity parser only") } true @@ -93,10 +92,10 @@ class DefaultSource throw new Exception("Ignore white space flag can be true or false") } val ignoreTrailingWhiteSpace = parameters.getOrElse("ignoreTrailingWhiteSpace", "false") - val ignoreTrailingWhiteSpaceFlag = if(ignoreTrailingWhiteSpace == "false") { + val ignoreTrailingWhiteSpaceFlag = if (ignoreTrailingWhiteSpace == "false") { false - } else if(ignoreTrailingWhiteSpace == "true") { - if(!ParserLibs.isUnivocityLib(parserLib)) { + } else if (ignoreTrailingWhiteSpace == "true") { + if (!ParserLibs.isUnivocityLib(parserLib)) { throw new Exception("Ignore whitespace supported for the Univocity parser only") } true @@ -104,26 +103,41 @@ class DefaultSource throw new Exception("Ignore white space flag can be true or false") } - val csvParsingOpts = CSVParsingOpts(delimiter = delimiter, - quoteChar = quoteChar, - escapeChar = escapeChar, - ignoreLeadingWhitespace = ignoreLeadingWhiteSpaceFlag, - ignoreTrailingWhitespace = ignoreTrailingWhiteSpaceFlag) - val charset = parameters.getOrElse("charset", TextFile.DEFAULT_CHARSET.name()) // TODO validate charset? val inferSchema = parameters.getOrElse("inferSchema", "false") - val inferSchemaFlag = if(inferSchema == "false") { + val inferSchemaFlag = if (inferSchema == "false") { false - } else if(inferSchema == "true") { + } else if (inferSchema == "true") { true } else { throw new Exception("Infer schema flag can be true or false") } + val lineParsingOpts = LineParsingOpts(parameters) + val realNumParsingOpts = RealNumberParsingOpts(parameters) + val intNumParsingOpts = IntNumberParsingOpts(parameters) + val stringParsingOpts = StringParsingOpts(parameters) + + val csvParsingOpts = if (!parameters.exists { case (k, v) => + k.startsWith("csvParsingOpts.") + }) { + CSVParsingOpts(delimiter = delimiter, + quoteChar = quoteChar, + escapeChar = escapeChar, + ignoreLeadingWhitespace = ignoreLeadingWhiteSpaceFlag, + ignoreTrailingWhitespace = ignoreTrailingWhiteSpaceFlag) + } else { + CSVParsingOpts(parameters) + } + CsvRelation(path, useHeader = headerFlag, csvParsingOpts = csvParsingOpts, + lineExceptionPolicy = lineParsingOpts, + realNumOpts = realNumParsingOpts, + intNumOpts = intNumParsingOpts, + stringParsingOpts = stringParsingOpts, parseMode = parseMode, parserLib = parserLib, userSchema = schema, @@ -132,10 +146,10 @@ class DefaultSource } override def createRelation( - sqlContext: SQLContext, - mode: SaveMode, - parameters: Map[String, String], - data: DataFrame): BaseRelation = { + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): BaseRelation = { val path = checkPath(parameters) val filesystemPath = new Path(path) val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) diff --git a/src/test/scala/com/databricks/spark/csv/CsvSuite.scala b/src/test/scala/com/databricks/spark/csv/CsvSuite.scala index 6f1eeff..36193a8 100755 --- a/src/test/scala/com/databricks/spark/csv/CsvSuite.scala +++ b/src/test/scala/com/databricks/spark/csv/CsvSuite.scala @@ -97,6 +97,17 @@ class CsvSuite extends FunSuite { assert(sql("SELECT year FROM carsTable").collect().size === numCars) } + test("DDL test with tab separated file, using newer options") { + sql( + s""" + |CREATE TEMPORARY TABLE carsTable + |USING com.databricks.spark.csv + |OPTIONS (path "$carsTsvFile", header "true", "csvParsingOpts.delimiter" "\t") + """.stripMargin.replaceAll("\n", " ")) + + assert(sql("SELECT year FROM carsTable").collect().size === numCars) + } + test("DDL test parsing decimal type") { sql( s""" From be8ab4d20b4e95ec6bae8995ca488b3c3dcc8f1a Mon Sep 17 00:00:00 2001 From: Mohit Jaggi Date: Fri, 31 Jul 2015 23:19:26 -0700 Subject: [PATCH 13/17] trying to debug travis test --- src/test/scala/com/databricks/spark/csv/CsvFastSuite.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/test/scala/com/databricks/spark/csv/CsvFastSuite.scala b/src/test/scala/com/databricks/spark/csv/CsvFastSuite.scala index 6a41d2e..90cbf17 100644 --- a/src/test/scala/com/databricks/spark/csv/CsvFastSuite.scala +++ b/src/test/scala/com/databricks/spark/csv/CsvFastSuite.scala @@ -456,6 +456,9 @@ class CsvFastSuite extends FunSuite { val carsCopy = TestSQLContext.csvFile(copyFilePath + "/", parserLib = "univocity") + //remove --- debug travis test + TestSQLContext.sparkContext.textFile(copyFilePath + "/").foreach(println) + assert(carsCopy.count == cars.count) assert(carsCopy.collect.map(_.toString).toSet == cars.collect.map(_.toString).toSet) } From e414e07028aa5cb425afbcef457dea87948e2b93 Mon Sep 17 00:00:00 2001 From: Mohit Jaggi Date: Fri, 31 Jul 2015 23:38:37 -0700 Subject: [PATCH 14/17] suspect partitions out of order --- src/test/scala/com/databricks/spark/csv/CsvFastSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/test/scala/com/databricks/spark/csv/CsvFastSuite.scala b/src/test/scala/com/databricks/spark/csv/CsvFastSuite.scala index 90cbf17..f57f110 100644 --- a/src/test/scala/com/databricks/spark/csv/CsvFastSuite.scala +++ b/src/test/scala/com/databricks/spark/csv/CsvFastSuite.scala @@ -457,7 +457,9 @@ class CsvFastSuite extends FunSuite { val carsCopy = TestSQLContext.csvFile(copyFilePath + "/", parserLib = "univocity") //remove --- debug travis test - TestSQLContext.sparkContext.textFile(copyFilePath + "/").foreach(println) + val nParts = TestSQLContext.sparkContext.textFile(copyFilePath + "/").partitions.length + for(i <- 0 to nParts) + TestSQLContext.sparkContext.textFile(copyFilePath + "/" + s"part0000$i").foreach(println) assert(carsCopy.count == cars.count) assert(carsCopy.collect.map(_.toString).toSet == cars.collect.map(_.toString).toSet) From dcb1df1d25a2c97e2d02d3d44124f6bbabb08f87 Mon Sep 17 00:00:00 2001 From: Mohit Jaggi Date: Sat, 1 Aug 2015 00:11:12 -0700 Subject: [PATCH 15/17] revert headerPerPart option this works everywhere but travis! the partitions are re-ordered somehow methinks. leaving this for another time --- .../scala/com/databricks/spark/csv/CsvRelation.scala | 11 +++-------- src/main/scala/com/databricks/spark/csv/package.scala | 4 +--- .../scala/com/databricks/spark/csv/CsvFastSuite.scala | 5 ----- .../scala/com/databricks/spark/csv/OptionsSuite.scala | 5 ----- 4 files changed, 4 insertions(+), 21 deletions(-) diff --git a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala index d072f6b..a02f04a 100755 --- a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala +++ b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala @@ -224,14 +224,9 @@ case class CsvRelation protected[spark]( } private def univocityParseCSV(file: RDD[String], header: Seq[String]) = { - val dataLines = if (useHeader) { - file.mapPartitionsWithIndex({ - case (partitionIndex, iter) => if (partitionIndex == 0) iter.drop(1) else iter - }, true) - } - else { - file - } + // If header is set, make sure firstLine is materialized before sending to executors. + val filterLine = if (useHeader) firstLine else null + val dataLines = if(useHeader) file.filter(_ != filterLine) else file val rows = dataLines.mapPartitionsWithIndex({ case (split, iter) => { val escapeVal = if (csvParsingOpts.escapeChar == null) '\\' diff --git a/src/main/scala/com/databricks/spark/csv/package.scala b/src/main/scala/com/databricks/spark/csv/package.scala index bd3d44b..414f8c0 100755 --- a/src/main/scala/com/databricks/spark/csv/package.scala +++ b/src/main/scala/com/databricks/spark/csv/package.scala @@ -165,8 +165,6 @@ package object csv { "" // There is no need to generate header in this case } - val headerPerPart = parameters.getOrElse("headerPerPart", "true").toBoolean - val strRDD = dataFrame.rdd.mapPartitionsWithIndex { case (index, iter) => val csvFormatBase = CSVFormat.DEFAULT .withDelimiter(delimiterChar) @@ -180,7 +178,7 @@ package object csv { } new Iterator[String] { - var firstRow: Boolean = if(headerPerPart) generateHeader else generateHeader && index == 0 + var firstRow: Boolean = generateHeader override def hasNext = iter.hasNext || firstRow diff --git a/src/test/scala/com/databricks/spark/csv/CsvFastSuite.scala b/src/test/scala/com/databricks/spark/csv/CsvFastSuite.scala index f57f110..6a41d2e 100644 --- a/src/test/scala/com/databricks/spark/csv/CsvFastSuite.scala +++ b/src/test/scala/com/databricks/spark/csv/CsvFastSuite.scala @@ -456,11 +456,6 @@ class CsvFastSuite extends FunSuite { val carsCopy = TestSQLContext.csvFile(copyFilePath + "/", parserLib = "univocity") - //remove --- debug travis test - val nParts = TestSQLContext.sparkContext.textFile(copyFilePath + "/").partitions.length - for(i <- 0 to nParts) - TestSQLContext.sparkContext.textFile(copyFilePath + "/" + s"part0000$i").foreach(println) - assert(carsCopy.count == cars.count) assert(carsCopy.collect.map(_.toString).toSet == cars.collect.map(_.toString).toSet) } diff --git a/src/test/scala/com/databricks/spark/csv/OptionsSuite.scala b/src/test/scala/com/databricks/spark/csv/OptionsSuite.scala index 3dfdb77..fc0bee2 100644 --- a/src/test/scala/com/databricks/spark/csv/OptionsSuite.scala +++ b/src/test/scala/com/databricks/spark/csv/OptionsSuite.scala @@ -29,7 +29,6 @@ class OptionsSuite extends FunSuite { "csvParsingOpts.ignoreTrailingSpace" -> "true", "csvParsingOpts.escape" -> ":", "csvParsingOpts.numParts" -> "5") - println(optMap) val opts = CSVParsingOpts(optMap) assert(opts.delimiter === '|') @@ -43,7 +42,6 @@ class OptionsSuite extends FunSuite { test("line opts") { val optMap = Map("lineParsingOpts.badLinePolicy" -> "abort", "lineParsingOpts.fillValue" -> "duh") - println(optMap) val opts = LineParsingOpts(optMap) assert(opts.fillValue === "duh") @@ -53,7 +51,6 @@ class OptionsSuite extends FunSuite { test("string opts") { val optMap = Map("stringParsingOpts.nulls" -> "abcd,efg", "stringParsingOpts.emptyStringReplace" -> "") - println(optMap) val opts = StringParsingOpts(optMap) assert(opts.nullStrings === HashSet("abcd", "efg")) @@ -63,7 +60,6 @@ class OptionsSuite extends FunSuite { test("int opts") { val optMap = Map("intNumParsingOpts.nulls" -> "abcd,efg", "intNumParsingOpts.enable" -> "false") - println(optMap) val opts = IntNumberParsingOpts(optMap) assert(opts.nullStrings === HashSet("abcd", "efg")) @@ -76,7 +72,6 @@ class OptionsSuite extends FunSuite { "realNumParsingOpts.nans" -> "NaN,nan", "realNumParsingOpts.infs" -> "iinnff,IINNFF", "realNumParsingOpts.-infs" -> "minusInf") - println(optMap) val opts = RealNumberParsingOpts(optMap) assert(opts.nullStrings === HashSet("abcd", "efg")) From df3f7b987d41ce2fabc41f87d18864bf8158070d Mon Sep 17 00:00:00 2001 From: Mohit Jaggi Date: Mon, 3 Aug 2015 11:43:25 -0700 Subject: [PATCH 16/17] workaround for scoverage as specified in their readme --- project/plugins.sbt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/project/plugins.sbt b/project/plugins.sbt index b26f1ca..c87a2ee 100755 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -8,6 +8,8 @@ resolvers += "sonatype-releases" at "https://oss.sonatype.org/content/repositori resolvers += "Spark Package Main Repo" at "https://dl.bintray.com/spark-packages/maven" +resolvers += Resolver.url("scoverage-bintray", url("https://dl.bintray.com/sksamuel/sbt-plugins/"))(Resolver.ivyStylePatterns) + addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.2.0") addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.6.0") From 1438a4119179f70edfc56756acb5e7aad29fe0ed Mon Sep 17 00:00:00 2001 From: Mohit Jaggi Date: Tue, 13 Oct 2015 14:37:21 -0700 Subject: [PATCH 17/17] moving up to spark 1.4.1 --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index b3f87fe..99f7526 100755 --- a/build.sbt +++ b/build.sbt @@ -59,7 +59,7 @@ pomExtra := ( spName := "databricks/spark-csv" -sparkVersion := "1.4.0" +sparkVersion := "1.4.1" sparkComponents += "sql"