diff --git a/build.sbt b/build.sbt index f65a7a8..2a44280 100755 --- a/build.sbt +++ b/build.sbt @@ -23,7 +23,8 @@ libraryDependencies ++= Seq( "com.univocity" % "univocity-parsers" % "1.5.1", "org.slf4j" % "slf4j-api" % "1.7.5" % "provided", "org.scalatest" %% "scalatest" % "2.2.1" % "test", - "com.novocode" % "junit-interface" % "0.9" % "test" + "com.novocode" % "junit-interface" % "0.9" % "test", + "org.specs2" %% "specs2-core" % "3.7" % "test" ) libraryDependencies ++= Seq( diff --git a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala index 7efc6bd..243afac 100755 --- a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala +++ b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala @@ -20,38 +20,37 @@ import java.text.SimpleDateFormat import scala.collection.JavaConversions._ import scala.util.control.NonFatal - import org.apache.commons.csv._ import org.apache.hadoop.fs.Path import org.slf4j.LoggerFactory - import org.apache.spark.rdd.RDD import org.apache.spark.sql._ -import org.apache.spark.sql.sources.{PrunedScan, BaseRelation, InsertableRelation, TableScan} +import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation, PrunedScan, TableScan} import org.apache.spark.sql.types._ -import com.databricks.spark.csv.readers.{BulkCsvReader, LineCsvReader} +import com.databricks.spark.csv.readers._ import com.databricks.spark.csv.util._ -case class CsvRelation protected[spark] ( - baseRDD: () => RDD[String], - location: Option[String], - useHeader: Boolean, - delimiter: Char, - quote: Character, - escape: Character, - comment: Character, - parseMode: String, - parserLib: String, - ignoreLeadingWhiteSpace: Boolean, - ignoreTrailingWhiteSpace: Boolean, - treatEmptyValuesAsNulls: Boolean, - userSchema: StructType = null, - inferCsvSchema: Boolean, - codec: String = null, - nullValue: String = "", - dateFormat: String = null, - maxCharsPerCol: Int = 100000)(@transient val sqlContext: SQLContext) - extends BaseRelation with TableScan with PrunedScan with InsertableRelation { +sealed trait Relation extends BaseRelation with TableScan with PrunedScan with InsertableRelation { + def baseRDD: () => RDD[String] + val location: Option[String] + val useHeader: Boolean + val delimiter: Char + val quote: Character + val escape: Character + val comment: Character + val parseMode: String + val parserLib: String + val ignoreLeadingWhiteSpace: Boolean + val ignoreTrailingWhiteSpace: Boolean + val treatEmptyValuesAsNulls: Boolean + val userSchema: StructType + val inferCsvSchema: Boolean + val codec: String + val nullValue: String + val dateFormat: String + val maxCharsPerCol: Int + def getLineReader: LineReader + def getBulkReader(header: Seq[String], iter: Iterator[String], split: Int): BulkReader // Share date format object as it is expensive to parse date pattern. private val dateFormatter = if (dateFormat != null) new SimpleDateFormat(dateFormat) else null @@ -143,11 +142,11 @@ case class CsvRelation protected[spark] ( /** - * This supports to eliminate unneeded columns before producing an RDD - * containing all of its tuples as Row objects. This reads all the tokens of each line - * and then drop unneeded tokens without casting and type-checking by mapping - * both the indices produced by `requiredColumns` and the ones of tokens. - */ + * This supports to eliminate unneeded columns before producing an RDD + * containing all of its tuples as Row objects. This reads all the tokens of each line + * and then drop unneeded tokens without casting and type-checking by mapping + * both the indices produced by `requiredColumns` and the ones of tokens. + */ override def buildScan(requiredColumns: Array[String]): RDD[Row] = { val simpleDateFormatter = dateFormatter val schemaFields = schema.fields @@ -225,14 +224,7 @@ case class CsvRelation protected[spark] ( userSchema } else { val firstRow = if (ParserLibs.isUnivocityLib(parserLib)) { - val escapeVal = if (escape == null) '\\' else escape.charValue() - val commentChar: Char = if (comment == null) '\0' else comment - val quoteChar: Char = if (quote == null) '\0' else quote - new LineCsvReader( - fieldSep = delimiter, - quote = quoteChar, - escape = escapeVal, - commentMarker = commentChar).parseLine(firstLine) + getLineReader.parseLine(firstLine) } else { val csvFormat = defaultCsvFormat .withDelimiter(delimiter) @@ -260,8 +252,8 @@ case class CsvRelation protected[spark] ( } /** - * Returns the first line of the first non-empty file in path - */ + * Returns the first line of the first non-empty file in path + */ private lazy val firstLine = { if (comment != null) { baseRDD().filter { line => @@ -275,30 +267,21 @@ case class CsvRelation protected[spark] ( } private def univocityParseCSV( - file: RDD[String], - header: Seq[String]): RDD[Array[String]] = { + file: RDD[String], + header: Seq[String]): RDD[Array[String]] = { // If header is set, make sure firstLine is materialized before sending to executors. val filterLine = if (useHeader) firstLine else null val dataLines = if (useHeader) file.filter(_ != filterLine) else file val rows = dataLines.mapPartitionsWithIndex({ - case (split, iter) => { - val escapeVal = if (escape == null) '\\' else escape.charValue() - val commentChar: Char = if (comment == null) '\0' else comment - val quoteChar: Char = if (quote == null) '\0' else quote - - new BulkCsvReader(iter, split, - headers = header, fieldSep = delimiter, - quote = quoteChar, escape = escapeVal, - commentMarker = commentChar, maxCharsPerCol = maxCharsPerCol) - } + case (split, iter) => getBulkReader(header, iter, split) }, true) rows } private def parseCSV( - iter: Iterator[String], - csvFormat: CSVFormat): Iterator[Array[String]] = { + iter: Iterator[String], + csvFormat: CSVFormat): Iterator[Array[String]] = { iter.flatMap { line => try { val records = CSVParser.parse(line, csvFormat).getRecords @@ -346,3 +329,89 @@ case class CsvRelation protected[spark] ( } } } + +case class CsvRelation protected[spark] ( + baseRDD: () => RDD[String], + location: Option[String], + useHeader: Boolean, + delimiter: Char, + quote: Character, + escape: Character, + comment: Character, + parseMode: String, + parserLib: String, + ignoreLeadingWhiteSpace: Boolean, + ignoreTrailingWhiteSpace: Boolean, + treatEmptyValuesAsNulls: Boolean, + userSchema: StructType = null, + inferCsvSchema: Boolean, + codec: String = null, + nullValue: String = "", + dateFormat: String = null, + maxCharsPerCol: Int = 100000)(@transient val sqlContext: SQLContext) + extends Relation { + + override def getLineReader: LineReader = { + val escapeVal = if (escape == null) '\\' else escape.charValue() + val commentChar: Char = if (comment == null) '\0' else comment + val quoteChar: Char = if (quote == null) '\0' else quote + + new LineCsvReader( + fieldSep = delimiter, + quote = quoteChar, + escape = escapeVal, + commentMarker = commentChar) + } + + override def getBulkReader(header: Seq[String], + iter: Iterator[String], split: Int): BulkReader = { + val escapeVal = if (escape == null) '\\' else escape.charValue() + val commentChar: Char = if (comment == null) '\0' else comment + val quoteChar: Char = if (quote == null) '\0' else quote + + new BulkCsvReader(iter, split, + headers = header, fieldSep = delimiter, + quote = quoteChar, escape = escapeVal, + commentMarker = commentChar) + } +} + +case class FixedWidthRelation protected[spark] ( + baseRDD: () => RDD[String], + fixedWidths: Array[Int], + location: Option[String], + useHeader: Boolean, + parseMode: String, + comment: Character, + ignoreLeadingWhiteSpace: Boolean, + ignoreTrailingWhiteSpace: Boolean, + treatEmptyValuesAsNulls: Boolean, + userSchema: StructType, + inferSchema: Boolean, + codec: String = null, + nullValue: String = "", + dateFormat: String = null, + maxCharsPerCol: Int = 100000, + escape: Character = null, + quote: Character = null, + delimiter: Char = '\0', + inferCsvSchema: Boolean = true, + parserLib: String = "UNIVOCITY")(@transient override val sqlContext: SQLContext) + extends Relation { + + override def getLineReader: LineReader = { + val commentChar: Char = if (comment == null) '\0' else comment + new LineFixedWidthReader(fixedWidths, commentMarker = commentChar, + ignoreLeadingSpace = ignoreLeadingWhiteSpace, + ignoreTrailingSpace = ignoreTrailingWhiteSpace) + } + + override def getBulkReader(header: Seq[String], iter: Iterator[String], + split: Int): BulkReader = { + val commentChar: Char = if (comment == null) '\0' else comment + new BulkFixedwidthReader(iter, split, fixedWidths, + headers = header, commentMarker = commentChar, + ignoreLeadingSpace = ignoreLeadingWhiteSpace, + ignoreTrailingSpace = ignoreTrailingWhiteSpace) + } +} diff --git a/src/main/scala/com/databricks/spark/csv/package.scala b/src/main/scala/com/databricks/spark/csv/package.scala index 2cce4af..8050e02 100755 --- a/src/main/scala/com/databricks/spark/csv/package.scala +++ b/src/main/scala/com/databricks/spark/csv/package.scala @@ -16,12 +16,11 @@ package com.databricks.spark import java.text.SimpleDateFormat -import java.sql.{Timestamp, Date} +import java.sql.{Date, Timestamp} import org.apache.commons.csv.{CSVFormat, QuoteMode} import org.apache.hadoop.io.compress.CompressionCodec -import org.apache.spark.sql.types.{DateType, TimestampType} - +import org.apache.spark.sql.types.{DateType, StructType, TimestampType} import org.apache.spark.sql.{DataFrame, SQLContext} import com.databricks.spark.csv.util.TextFile @@ -211,4 +210,35 @@ package object csv { } } } + + /** + * Adds a method, `fixedWidthFile`, to SQLContext that allows reading Fixed-Width data. + */ + implicit class FixedWidthContext(sqlContext: SQLContext) extends Serializable { + def fixedWidthFile( + filePath: String, + fixedWidths: Array[Int], + schema: StructType = null, + useHeader: Boolean = true, + mode: String = "PERMISSIVE", + comment: Character = null, + ignoreLeadingWhiteSpace: Boolean = true, + ignoreTrailingWhiteSpace: Boolean = true, + charset: String = TextFile.DEFAULT_CHARSET.name(), + inferSchema: Boolean = false): DataFrame = { + val fixedWidthRelation = FixedWidthRelation( + () => TextFile.withCharset(sqlContext.sparkContext, filePath, charset), + location = Some(filePath), + useHeader = useHeader, + comment = comment, + parseMode = mode, + fixedWidths = fixedWidths, + ignoreLeadingWhiteSpace = ignoreLeadingWhiteSpace, + ignoreTrailingWhiteSpace = ignoreTrailingWhiteSpace, + userSchema = schema, + inferSchema = inferSchema, + treatEmptyValuesAsNulls = false)(sqlContext) + sqlContext.baseRelationToDataFrame(fixedWidthRelation) + } + } } diff --git a/src/main/scala/com/databricks/spark/csv/readers/readers.scala b/src/main/scala/com/databricks/spark/csv/readers/readers.scala index c236ef6..c2f4e8a 100644 --- a/src/main/scala/com/databricks/spark/csv/readers/readers.scala +++ b/src/main/scala/com/databricks/spark/csv/readers/readers.scala @@ -21,6 +21,16 @@ package com.databricks.spark.csv.readers import java.io.StringReader import com.univocity.parsers.csv._ +import com.univocity.parsers.fixed.{FixedWidthFieldLengths, FixedWidthParser, FixedWidthParserSettings} + +sealed trait BulkReader extends Iterator[Array[String]] { + protected def reader(iter: Iterator[String]) = new StringIteratorReader(iter) +} + +sealed trait LineReader { + protected def reader(line: String) = new StringReader(line) + def parseLine(line: String): Array[String] +} /** * Read and parse CSV-like input @@ -67,6 +77,49 @@ private[readers] abstract class CsvReader( } } +/** + * Read and parse Fixed-width-like input + * + * @param fixedWidths the fixed widths of the fields + * @param lineSep the delimiter used to separate lines + * @param commentMarker Ignore lines starting with this char + * @param ignoreLeadingSpace ignore white space before a field + * @param ignoreTrailingSpace ignore white space after a field + * @param headers headers for the columns + * @param inputBufSize size of buffer to use for parsing input, tune for performance + * @param maxCols maximum number of columns allowed, for safety against bad inputs + */ +private[readers] abstract class FixedWidthReader( + fixedWidths: Array[Int], + lineSep: String = "\n", + commentMarker: Char = '#', + ignoreLeadingSpace: Boolean = true, + ignoreTrailingSpace: Boolean = true, + headers: Seq[String], + inputBufSize: Int = 128, + maxCols: Int = 20480) { + protected lazy val parser: FixedWidthParser = { + val settings = new FixedWidthParserSettings(new FixedWidthFieldLengths(fixedWidths: _*)) + val format = settings.getFormat + format.setLineSeparator(lineSep) + format.setComment(commentMarker) + settings.setIgnoreLeadingWhitespaces(ignoreLeadingSpace) + settings.setIgnoreTrailingWhitespaces(ignoreTrailingSpace) + settings.setReadInputOnSeparateThread(false) + settings.setInputBufferSize(inputBufSize) + settings.setMaxColumns(maxCols) + settings.setNullValue("") + settings.setMaxCharsPerColumn(100000) + if (headers != null) settings.setHeaders(headers: _*) + // TODO: configurable? + settings.setSkipTrailingCharsUntilNewline(true) + settings.setRecordEndsOnNewline(true) + + new FixedWidthParser(settings) + } +} + + /** * Parser for parsing a line at a time. Not efficient for bulk data. * @param fieldSep the delimiter used to separate fields in a line @@ -100,14 +153,56 @@ private[csv] class LineCsvReader( null, inputBufSize, maxCols, - maxCharsPerCol) { + maxCharsPerCol) +with LineReader{ /** * parse a line * @param line a String with no newline at the end * @return array of strings where each string is a field in the CSV record */ def parseLine(line: String): Array[String] = { - parser.beginParsing(new StringReader(line)) + parser.beginParsing(reader(line)) + val parsed = parser.parseNext() + parser.stopParsing() + parsed + } +} + +/** + * Read and parse a single line of Fixed-width-like input. Inefficient for bulk data. + * @param fixedWidths the fixed widths of the fields + * @param lineSep the delimiter used to separate lines + * @param commentMarker Ignore lines starting with this char + * @param ignoreLeadingSpace ignore white space before a field + * @param ignoreTrailingSpace ignore white space after a field + * @param inputBufSize size of buffer to use for parsing input, tune for performance + * @param maxCols maximum number of columns allowed, for safety against bad inputs + */ +private[csv] class LineFixedWidthReader( + fixedWidths: Array[Int], + lineSep: String = "\n", + commentMarker: Char = '#', + ignoreLeadingSpace: Boolean = true, + ignoreTrailingSpace: Boolean = true, + inputBufSize: Int = 128, + maxCols: Int = 20480) + extends FixedWidthReader( + fixedWidths, + lineSep, + commentMarker, + ignoreLeadingSpace, + ignoreTrailingSpace, + null, + inputBufSize, + maxCols) + with LineReader { + /** + * parse a line + * @param line a String with no newline at the end + * @return array of strings where each string is a field in the CSV record + */ + def parseLine(line: String): Array[String] = { + parser.beginParsing(reader(line)) val parsed = parser.parseNext() parser.stopParsing() parsed @@ -153,10 +248,9 @@ private[csv] class BulkCsvReader( inputBufSize, maxCols, maxCharsPerCol) - with Iterator[Array[String]] { + with Iterator[Array[String]] with BulkReader { - private val reader = new StringIteratorReader(iter) - parser.beginParsing(reader) + parser.beginParsing(reader(iter)) private var nextRecord = parser.parseNext() /** @@ -177,13 +271,68 @@ private[csv] class BulkCsvReader( } +/** + * Read and parse Fixed-width-like input + * + * @param fixedWidths the fixed widths of the fields + * @param lineSep the delimiter used to separate lines + * @param commentMarker Ignore lines starting with this char + * @param ignoreLeadingSpace ignore white space before a field + * @param ignoreTrailingSpace ignore white space after a field + * @param headers headers for the columns + * @param inputBufSize size of buffer to use for parsing input, tune for performance + * @param maxCols maximum number of columns allowed, for safety against bad inputs + */ +private[csv] class BulkFixedwidthReader( + iter: Iterator[String], + split: Int, // for debugging + fixedWidths: Array[Int], + lineSep: String = "\n", + commentMarker: Char = '#', + ignoreLeadingSpace: Boolean = true, + ignoreTrailingSpace: Boolean = true, + headers: Seq[String], + inputBufSize: Int = 128, + maxCols: Int = 20480) + extends FixedWidthReader( + fixedWidths, + lineSep, + commentMarker, + ignoreLeadingSpace, + ignoreTrailingSpace, + headers, + inputBufSize, + maxCols + ) with BulkReader { + + parser.beginParsing(reader(iter)) + private var nextRecord = parser.parseNext() + + /** + * get the next parsed line. + * + * @return array of strings where each string is a field in the fixed-width record + */ + override def next(): Array[String] = { + val curRecord = nextRecord + if(curRecord != null) { + nextRecord = parser.parseNext() + } else { + throw new NoSuchElementException("next record is null") + } + curRecord + } + + override def hasNext: Boolean = nextRecord != null +} + /** * A Reader that "reads" from a sequence of lines. Spark's textFile method removes newlines at * end of each line Univocity parser requires a Reader that provides access to the data to be * parsed and needs the newlines to be present * @param iter iterator over RDD[String] */ -private class StringIteratorReader(val iter: Iterator[String]) extends java.io.Reader { +private[readers] class StringIteratorReader(val iter: Iterator[String]) extends java.io.Reader { private var next: Long = 0 private var length: Long = 0 // length of input so far diff --git a/src/test/resources/fruit__fixedwidth.txt b/src/test/resources/fruit__fixedwidth.txt new file mode 100644 index 0000000..2307c6f --- /dev/null +++ b/src/test/resources/fruit__fixedwidth.txt @@ -0,0 +1,7 @@ +56 apple TRUE 0.56 +45 pear FALSE1.34 +34 raspberry TRUE 2.43 +34 plum TRUE 1.31 +53 cherry TRUE 1.4 +23 orange FALSE2.34 +56 persimmon FALSE23.2 \ No newline at end of file diff --git a/src/test/resources/fruit_comments_fixedwidth.txt b/src/test/resources/fruit_comments_fixedwidth.txt new file mode 100644 index 0000000..92c2773 --- /dev/null +++ b/src/test/resources/fruit_comments_fixedwidth.txt @@ -0,0 +1,11 @@ +// Fruit for sale +// 2014-10-02 +AMTNAME AVAILCOST +56 apple TRUE 0.56 +45 pear FALSE1.34 +34 raspberry TRUE 2.43 +34 plum TRUE 1.31 +53 cherry TRUE 1.4 +23 orange FALSE2.34 +56 persimmon FALSE23.2 +// No more fruit to show \ No newline at end of file diff --git a/src/test/resources/fruit_malformed_fixedwidth.txt b/src/test/resources/fruit_malformed_fixedwidth.txt new file mode 100644 index 0000000..be94253 --- /dev/null +++ b/src/test/resources/fruit_malformed_fixedwidth.txt @@ -0,0 +1,7 @@ +23 apple TRUE 1.44 +45 cherry FALSE1.33 +34 raspberry TRUE BLR +34 +ewfergerfgrefergregreg +hi pear +56 persimmon FALSE23.2 \ No newline at end of file diff --git a/src/test/resources/fruit_overflow_fixedwidth.txt b/src/test/resources/fruit_overflow_fixedwidth.txt new file mode 100644 index 0000000..015e345 --- /dev/null +++ b/src/test/resources/fruit_overflow_fixedwidth.txt @@ -0,0 +1,7 @@ +56 apple TRUE 0.56 +45 pear FALSE1.34 +34 raspberry TRUE 2.436565 +34 plum TRUE 1.31 +53 cherry TRUE 1.4 +23 orange FALSE2.3466 +56 persimmon FALSE23.2 \ No newline at end of file diff --git a/src/test/resources/fruit_underflow_fixedwidth.txt b/src/test/resources/fruit_underflow_fixedwidth.txt new file mode 100644 index 0000000..00b4e16 --- /dev/null +++ b/src/test/resources/fruit_underflow_fixedwidth.txt @@ -0,0 +1,7 @@ +56 apple TRUE 0.56 +45 pear FALSE1.34 +34 raspberry TRUE 2 +34 plum TRUE 1.31 +53 cherry TRUE 1 +23 orange FALSE2.34 +56 persimmon FALSE23.2 \ No newline at end of file diff --git a/src/test/resources/fruit_w_headers_fixedwidth.txt b/src/test/resources/fruit_w_headers_fixedwidth.txt new file mode 100644 index 0000000..8970a5f --- /dev/null +++ b/src/test/resources/fruit_w_headers_fixedwidth.txt @@ -0,0 +1,8 @@ +AMTNAME SALE COST +56 apple TRUE 0.56 +45 pear FALSE1.34 +34 raspberry TRUE 2.43 +34 plum TRUE 1.31 +53 cherry TRUE 1.4 +23 orange FALSE2.34 +56 persimmon FALSE23.2 \ No newline at end of file diff --git a/src/test/resources/fruit_wrong_schema_fixedwidth.txt b/src/test/resources/fruit_wrong_schema_fixedwidth.txt new file mode 100644 index 0000000..51cf76c --- /dev/null +++ b/src/test/resources/fruit_wrong_schema_fixedwidth.txt @@ -0,0 +1,7 @@ +so apple TRUE 0.56 +on pear FALSE1.34 +hi raspberry TRUE 2.43 +yo plum TRUE 1.31 +ni cherry TRUE 1.4 +po orange FALSE2.34 +no persimmon FALSE23.2 \ No newline at end of file diff --git a/src/test/scala/com/databricks/spark/csv/FixedWidthSuite.scala b/src/test/scala/com/databricks/spark/csv/FixedWidthSuite.scala new file mode 100644 index 0000000..132f33b --- /dev/null +++ b/src/test/scala/com/databricks/spark/csv/FixedWidthSuite.scala @@ -0,0 +1,124 @@ +package com.databricks.spark.csv + +import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.sql.types._ +import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import org.specs2.mutable.Specification +import org.specs2.specification.After + +trait FixedWidthSetup extends After { + protected def fruit_resource(name: String = ""): String = + s"src/test/resources/fruit_${name}_fixedwidth.txt" + + protected val fruitWidths = Array(3, 10, 5, 4) + protected val fruitSize = 7 + protected val malformedFruitSize = 5 + protected val fruitFirstRow = Seq(56, "apple", true, 0.56) + + protected val fruitSchema = StructType(Seq( + StructField("val", IntegerType), + StructField("name", StringType), + StructField("avail", BooleanType), + StructField("cost", DoubleType) + )) + + val sqlContext: SQLContext = new SQLContext( + new SparkContext(master = "local[2]", appName = "FixedwidthSuite") + ) + + def after: Unit = sqlContext.sparkContext.stop() +} + +class FixedWidthSpec extends Specification with FixedWidthSetup { + + protected def sanityChecks(resultSet: DataFrame) = { + resultSet.show() + resultSet.collect().length mustEqual fruitSize + + val head = resultSet.head() + head.length mustEqual fruitWidths.length + head.toSeq mustEqual fruitFirstRow + } + + "FixedwidthParser" should { + "Parse a basic fixed width file, successfully" in { + val result = sqlContext.fixedWidthFile(fruit_resource(), fruitWidths, fruitSchema, + useHeader = false, ignoreLeadingWhiteSpace = true, ignoreTrailingWhiteSpace = true) + sanityChecks(result) + } + + "Parse a fw file with headers, and ignore them" in { + val result = sqlContext.fixedWidthFile(fruit_resource("w_headers"), fruitWidths, + fruitSchema, useHeader = true) + sanityChecks(result) + } + + "Parse a fw file with overflowing lines, and ignore the overflow" in { + val result = sqlContext.fixedWidthFile(fruit_resource("overflow"), fruitWidths, + fruitSchema, useHeader = false) + sanityChecks(result) + } + + "Parse a fw file with underflowing lines, successfully " in { + val result = sqlContext.fixedWidthFile(fruit_resource("underflow"), fruitWidths, + fruitSchema, useHeader = false) + sanityChecks(result) + } + + "Parse a basic fw file without schema and without inferring types, successfully" in { + val result = sqlContext.fixedWidthFile(fruit_resource(), fruitWidths, + useHeader = false, inferSchema = false) + sanityChecks(result) + } + + "Parse a basic fw file without schema, and infer the schema" in { + val result = sqlContext.fixedWidthFile(fruit_resource(), fruitWidths, + useHeader = false, inferSchema = true) + sanityChecks(result) + } + + "Parse a fw file with headers but without schema and without inferrence, succesfully" in { + val result = sqlContext.fixedWidthFile(fruit_resource("w_headers"), fruitWidths, + useHeader = true, inferSchema = true) + sanityChecks(result) + } + + "Parse a fw file with comments, and ignore those lines" in { + val result = sqlContext.fixedWidthFile(fruit_resource("comments"), fruitWidths, + useHeader = true, inferSchema = true, comment = '/') + sanityChecks(result) + } + + "Parse a malformed fw and schemaless file in PERMISSIVE mode, successfully" in { + val result = sqlContext.fixedWidthFile(fruit_resource("malformed"), fruitWidths, + useHeader = false, mode = "PERMISSIVE") + result.show() + result.collect().length mustEqual fruitSize + } + + "Parse a malformed and schemaless fw file in DROPMALFORMED mode, " + + "successfully dropping bad lines" in { + val result = sqlContext.fixedWidthFile(fruit_resource("malformed"), fruitWidths, + useHeader = false, mode = "DROPMALFORMED") + result.show() + result.collect().length mustEqual malformedFruitSize + } + + "FAIL to parse a malformed fw file with schema in FAILFAST mode" in { + def fail: Array[Row] = { + sqlContext.fixedWidthFile(fruit_resource("malformed"), fruitWidths, + fruitSchema, useHeader = false, mode = "FAILFAST").collect() + } + fail must throwA[SparkException] + } + + "FAIL to parse a fw file with the wrong format" in { + def fail: Array[Row] = { + sqlContext.fixedWidthFile(fruit_resource("wrong_schema"), fruitWidths, + fruitSchema, useHeader = false).collect() + } + fail must throwA[SparkException] + } + } + +}