Skip to content

Fixed width support. #391

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ libraryDependencies ++= Seq(
"com.univocity" % "univocity-parsers" % "1.5.1",
"org.slf4j" % "slf4j-api" % "1.7.5" % "provided",
"org.scalatest" %% "scalatest" % "2.2.1" % "test",
"com.novocode" % "junit-interface" % "0.9" % "test"
"com.novocode" % "junit-interface" % "0.9" % "test",
"org.specs2" %% "specs2-core" % "3.7" % "test"
)

libraryDependencies ++= Seq(
Expand Down
175 changes: 122 additions & 53 deletions src/main/scala/com/databricks/spark/csv/CsvRelation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,38 +20,37 @@ import java.text.SimpleDateFormat

import scala.collection.JavaConversions._
import scala.util.control.NonFatal

import org.apache.commons.csv._
import org.apache.hadoop.fs.Path
import org.slf4j.LoggerFactory

import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.sources.{PrunedScan, BaseRelation, InsertableRelation, TableScan}
import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation, PrunedScan, TableScan}
import org.apache.spark.sql.types._
import com.databricks.spark.csv.readers.{BulkCsvReader, LineCsvReader}
import com.databricks.spark.csv.readers._
import com.databricks.spark.csv.util._

case class CsvRelation protected[spark] (
baseRDD: () => RDD[String],
location: Option[String],
useHeader: Boolean,
delimiter: Char,
quote: Character,
escape: Character,
comment: Character,
parseMode: String,
parserLib: String,
ignoreLeadingWhiteSpace: Boolean,
ignoreTrailingWhiteSpace: Boolean,
treatEmptyValuesAsNulls: Boolean,
userSchema: StructType = null,
inferCsvSchema: Boolean,
codec: String = null,
nullValue: String = "",
dateFormat: String = null,
maxCharsPerCol: Int = 100000)(@transient val sqlContext: SQLContext)
extends BaseRelation with TableScan with PrunedScan with InsertableRelation {
sealed trait Relation extends BaseRelation with TableScan with PrunedScan with InsertableRelation {
def baseRDD: () => RDD[String]
val location: Option[String]
val useHeader: Boolean
val delimiter: Char
val quote: Character
val escape: Character
val comment: Character
val parseMode: String
val parserLib: String
val ignoreLeadingWhiteSpace: Boolean
val ignoreTrailingWhiteSpace: Boolean
val treatEmptyValuesAsNulls: Boolean
val userSchema: StructType
val inferCsvSchema: Boolean
val codec: String
val nullValue: String
val dateFormat: String
val maxCharsPerCol: Int
def getLineReader: LineReader
def getBulkReader(header: Seq[String], iter: Iterator[String], split: Int): BulkReader

// Share date format object as it is expensive to parse date pattern.
private val dateFormatter = if (dateFormat != null) new SimpleDateFormat(dateFormat) else null
Expand Down Expand Up @@ -143,11 +142,11 @@ case class CsvRelation protected[spark] (


/**
* This supports to eliminate unneeded columns before producing an RDD
* containing all of its tuples as Row objects. This reads all the tokens of each line
* and then drop unneeded tokens without casting and type-checking by mapping
* both the indices produced by `requiredColumns` and the ones of tokens.
*/
* This supports to eliminate unneeded columns before producing an RDD
* containing all of its tuples as Row objects. This reads all the tokens of each line
* and then drop unneeded tokens without casting and type-checking by mapping
* both the indices produced by `requiredColumns` and the ones of tokens.
*/
override def buildScan(requiredColumns: Array[String]): RDD[Row] = {
val simpleDateFormatter = dateFormatter
val schemaFields = schema.fields
Expand Down Expand Up @@ -225,14 +224,7 @@ case class CsvRelation protected[spark] (
userSchema
} else {
val firstRow = if (ParserLibs.isUnivocityLib(parserLib)) {
val escapeVal = if (escape == null) '\\' else escape.charValue()
val commentChar: Char = if (comment == null) '\0' else comment
val quoteChar: Char = if (quote == null) '\0' else quote
new LineCsvReader(
fieldSep = delimiter,
quote = quoteChar,
escape = escapeVal,
commentMarker = commentChar).parseLine(firstLine)
getLineReader.parseLine(firstLine)
} else {
val csvFormat = defaultCsvFormat
.withDelimiter(delimiter)
Expand Down Expand Up @@ -260,8 +252,8 @@ case class CsvRelation protected[spark] (
}

/**
* Returns the first line of the first non-empty file in path
*/
* Returns the first line of the first non-empty file in path
*/
private lazy val firstLine = {
if (comment != null) {
baseRDD().filter { line =>
Expand All @@ -275,30 +267,21 @@ case class CsvRelation protected[spark] (
}

private def univocityParseCSV(
file: RDD[String],
header: Seq[String]): RDD[Array[String]] = {
file: RDD[String],
header: Seq[String]): RDD[Array[String]] = {
// If header is set, make sure firstLine is materialized before sending to executors.
val filterLine = if (useHeader) firstLine else null
val dataLines = if (useHeader) file.filter(_ != filterLine) else file
val rows = dataLines.mapPartitionsWithIndex({
case (split, iter) => {
val escapeVal = if (escape == null) '\\' else escape.charValue()
val commentChar: Char = if (comment == null) '\0' else comment
val quoteChar: Char = if (quote == null) '\0' else quote

new BulkCsvReader(iter, split,
headers = header, fieldSep = delimiter,
quote = quoteChar, escape = escapeVal,
commentMarker = commentChar, maxCharsPerCol = maxCharsPerCol)
}
case (split, iter) => getBulkReader(header, iter, split)
}, true)

rows
}

private def parseCSV(
iter: Iterator[String],
csvFormat: CSVFormat): Iterator[Array[String]] = {
iter: Iterator[String],
csvFormat: CSVFormat): Iterator[Array[String]] = {
iter.flatMap { line =>
try {
val records = CSVParser.parse(line, csvFormat).getRecords
Expand Down Expand Up @@ -346,3 +329,89 @@ case class CsvRelation protected[spark] (
}
}
}

case class CsvRelation protected[spark] (
baseRDD: () => RDD[String],
location: Option[String],
useHeader: Boolean,
delimiter: Char,
quote: Character,
escape: Character,
comment: Character,
parseMode: String,
parserLib: String,
ignoreLeadingWhiteSpace: Boolean,
ignoreTrailingWhiteSpace: Boolean,
treatEmptyValuesAsNulls: Boolean,
userSchema: StructType = null,
inferCsvSchema: Boolean,
codec: String = null,
nullValue: String = "",
dateFormat: String = null,
maxCharsPerCol: Int = 100000)(@transient val sqlContext: SQLContext)
extends Relation {

override def getLineReader: LineReader = {
val escapeVal = if (escape == null) '\\' else escape.charValue()
val commentChar: Char = if (comment == null) '\0' else comment
val quoteChar: Char = if (quote == null) '\0' else quote

new LineCsvReader(
fieldSep = delimiter,
quote = quoteChar,
escape = escapeVal,
commentMarker = commentChar)
}

override def getBulkReader(header: Seq[String],
iter: Iterator[String], split: Int): BulkReader = {
val escapeVal = if (escape == null) '\\' else escape.charValue()
val commentChar: Char = if (comment == null) '\0' else comment
val quoteChar: Char = if (quote == null) '\0' else quote

new BulkCsvReader(iter, split,
headers = header, fieldSep = delimiter,
quote = quoteChar, escape = escapeVal,
commentMarker = commentChar)
}
}

case class FixedWidthRelation protected[spark] (
baseRDD: () => RDD[String],
fixedWidths: Array[Int],
location: Option[String],
useHeader: Boolean,
parseMode: String,
comment: Character,
ignoreLeadingWhiteSpace: Boolean,
ignoreTrailingWhiteSpace: Boolean,
treatEmptyValuesAsNulls: Boolean,
userSchema: StructType,
inferSchema: Boolean,
codec: String = null,
nullValue: String = "",
dateFormat: String = null,
maxCharsPerCol: Int = 100000,
escape: Character = null,
quote: Character = null,
delimiter: Char = '\0',
inferCsvSchema: Boolean = true,
parserLib: String = "UNIVOCITY")(@transient override val sqlContext: SQLContext)
extends Relation {

override def getLineReader: LineReader = {
val commentChar: Char = if (comment == null) '\0' else comment
new LineFixedWidthReader(fixedWidths, commentMarker = commentChar,
ignoreLeadingSpace = ignoreLeadingWhiteSpace,
ignoreTrailingSpace = ignoreTrailingWhiteSpace)
}

override def getBulkReader(header: Seq[String], iter: Iterator[String],
split: Int): BulkReader = {
val commentChar: Char = if (comment == null) '\0' else comment
new BulkFixedwidthReader(iter, split, fixedWidths,
headers = header, commentMarker = commentChar,
ignoreLeadingSpace = ignoreLeadingWhiteSpace,
ignoreTrailingSpace = ignoreTrailingWhiteSpace)
}
}
36 changes: 33 additions & 3 deletions src/main/scala/com/databricks/spark/csv/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@
package com.databricks.spark

import java.text.SimpleDateFormat
import java.sql.{Timestamp, Date}
import java.sql.{Date, Timestamp}

import org.apache.commons.csv.{CSVFormat, QuoteMode}
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.spark.sql.types.{DateType, TimestampType}

import org.apache.spark.sql.types.{DateType, StructType, TimestampType}
import org.apache.spark.sql.{DataFrame, SQLContext}
import com.databricks.spark.csv.util.TextFile

Expand Down Expand Up @@ -211,4 +210,35 @@ package object csv {
}
}
}

/**
* Adds a method, `fixedWidthFile`, to SQLContext that allows reading Fixed-Width data.
*/
implicit class FixedWidthContext(sqlContext: SQLContext) extends Serializable {
def fixedWidthFile(
filePath: String,
fixedWidths: Array[Int],
schema: StructType = null,
useHeader: Boolean = true,
mode: String = "PERMISSIVE",
comment: Character = null,
ignoreLeadingWhiteSpace: Boolean = true,
ignoreTrailingWhiteSpace: Boolean = true,
charset: String = TextFile.DEFAULT_CHARSET.name(),
inferSchema: Boolean = false): DataFrame = {
val fixedWidthRelation = FixedWidthRelation(
() => TextFile.withCharset(sqlContext.sparkContext, filePath, charset),
location = Some(filePath),
useHeader = useHeader,
comment = comment,
parseMode = mode,
fixedWidths = fixedWidths,
ignoreLeadingWhiteSpace = ignoreLeadingWhiteSpace,
ignoreTrailingWhiteSpace = ignoreTrailingWhiteSpace,
userSchema = schema,
inferSchema = inferSchema,
treatEmptyValuesAsNulls = false)(sqlContext)
sqlContext.baseRelationToDataFrame(fixedWidthRelation)
}
}
}
Loading