Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the same library when Reading and Writing CSV #30

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ sbt/sbt-launch*.jar
target/
.idea/
.idea_modules/
.DS_Store
98 changes: 73 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,36 +13,37 @@ You can link against this library in your program at the following coordiates:

```
groupId: com.databricks
artifactId: spark-csv_2.10
version: 1.0.0
artifactId: spark-csv_2.11
version: 1.0.3
```
The spark-csv assembly jar file can also be added to a Spark using the `--jars` command line option. For example, to include it when starting the spark shell:

## Using with Apache Spark
This package can be added to Spark using the `--jars` command line option. For example, to include it when starting the spark shell:

```
$ bin/spark-shell --jars spark-csv-assembly-1.0.0.jar
$ bin/spark-shell --packages com.databricks:spark-csv_2.10:1.0.3
```

## Features
This package allows reading CSV files in local or distributed filesystem as [Spark DataFrames](https://spark.apache.org/docs/1.3.0/sql-programming-guide.html).
When reading files the API accepts several options:
* path: location of files. Similar to Spark can accept standard Hadoop globbing expressions.
* header: when set to true the first line of files will be used to name columns and will not be included in data. All types will be assumed string. Default value is false.
* delimiter: by default lines are delimited using ',', but delimiter can be set to any character
* quote: by default the quote character is '"', but can be set to any character. Delimiters inside quotes are ignored
* mode: determines the parsing mode. By default it is PERMISSIVE. Possible values are:
* PERMISSIVE: tries to parse all lines: nulls are inserted for missing tokens and extra tokens are ignored.
* DROPMALFORMED: drops lines which have fewer or more tokens than expected
* FAILFAST: aborts with a RuntimeException if encounters any malformed line

The package also support saving simple (non-nested) DataFrame. When saving you can specify the delimiter and whether we should generate a header row for the table. See following examples for more details.

These examples use a CSV file available for download [here](https://github.com/databricks/spark-csv/raw/master/src/test/resources/cars.csv):

```
$ wget https://github.com/databricks/spark-csv/raw/master/src/test/resources/cars.csv
```

### Scala API

You can use the library by loading the implicits from `com.databricks.spark.csv._`.

```
import org.apache.spark.sql.SQLContext

val sqlContext = new SQLContext(sc)

import com.databricks.spark.csv._

val cars = sqlContext.csvFile("cars.csv")
```

### SQL API
CSV data can be queried in pure SQL by registering the data as a (temporary) table.

Expand All @@ -59,18 +60,65 @@ USING com.databricks.spark.csv
OPTIONS (path "cars.csv", header "true")
```

### Scala API
The recommended way to load CSV data is using the load/save functions in SQLContext.

```scala
import org.apache.spark.sql.SQLContext

val sqlContext = new SQLContext(sc)
val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "cars.csv", "header" -> "true"))
df.select("year", "model").save("newcars.csv", "com.databricks.spark.csv")
```

You can also use the implicits from `com.databricks.spark.csv._`.

```scala
import org.apache.spark.sql.SQLContext
import com.databricks.spark.csv._

val sqlContext = new SQLContext(sc)

val cars = sqlContext.csvFile("cars.csv")
cars.select("year", "model").saveAsCsvFile("newcars.tsv")
```

### Java API
CSV files can be read using functions in JavaCsvParser.
Similar to Scala, we recommend load/save functions in SQLContext.

```java
import com.databricks.spark.csv.JavaCsvParser;
import org.apache.spark.sql.SQLContext

DataFrame cars = (new JavaCsvParser()).withUseHeader(true).csvFile(sqlContext, "cars.csv");
SQLContext sqlContext = new SQLContext(sc);

HashMap<String, String> options = new HashMap<String, String>();
options.put("header", "true");
options.put("path", "cars.csv");

DataFrame df = sqlContext.load("com.databricks.spark.csv", options);
df.select("year", "model").save("newcars.csv", "com.databricks.spark.csv");
```
See documentations of <a href="https://spark.apache.org/docs/1.3.0/api/java/org/apache/spark/sql/SQLContext.html#load(java.lang.String)">load</a> and <a href="https://spark.apache.org/docs/1.3.0/api/java/org/apache/spark/sql/DataFrame.html#save(java.lang.String)">save</a> for more details.

In Java (as well as Scala) CSV files can be read using functions in CsvParser.

```java
import com.databricks.spark.csv.CsvParser;
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

DataFrame cars = (new CsvParser()).withUseHeader(true).csvFile(sqlContext, "cars.csv");
```

### Saving as CSV
You can save your DataFrame using `saveAsCsvFile` function. The function allows you to specify the delimiter and whether we should generate a header row for the table (each header has name `C$i` where `$i` is column index). For example:
```myDataFrame.saveAsCsvFile("/mydir", Map("delimiter" -> "|", "header" -> "true"))```
### Python API
In Python you can read and save CSV files using load/save functions.

```python
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.load(source="com.databricks.spark.csv", header="true", path = "cars.csv")
df.select("year", "model").save("newcars.csv", "com.databricks.spark.csv")
```

## Building From Source
This library is built with [SBT](http://www.scala-sbt.org/0.13/docs/Command-Line-Reference.html), which is automatically downloaded by the included shell script. To build a JAR file simply run `sbt/sbt assembly` from the project root.
This library is built with [SBT](http://www.scala-sbt.org/0.13/docs/Command-Line-Reference.html), which is automatically downloaded by the included shell script. To build a JAR file simply run `sbt/sbt package` from the project root. The build configuration includes support for both Scala 2.10 and 2.11.
11 changes: 6 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
name := "spark-csv"

version := "1.0.0"
version := "1.0.3"

organization := "com.databricks"

scalaVersion := "2.10.4"
scalaVersion := "2.11.6"

crossScalaVersions := Seq("2.10.4", "2.11.6")

libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.3.0" % "provided"

Expand Down Expand Up @@ -55,7 +57,6 @@ sparkVersion := "1.3.0"

sparkComponents += "sql"

// Enable Junit testing.
// libraryDependencies += "com.novocode" % "junit-interface" % "0.9" % "test"

libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.1" % "test"

libraryDependencies += "com.novocode" % "junit-interface" % "0.9" % "test"
58 changes: 0 additions & 58 deletions src/main/java/com/databricks/spark/csv/JavaCsvParser.java

This file was deleted.

26 changes: 24 additions & 2 deletions src/main/scala/com/databricks/spark/csv/CsvParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@ package com.databricks.spark.csv
import org.apache.spark.sql.{SQLContext, DataFrame}
import org.apache.spark.sql.types.StructType

import com.databricks.spark.csv.util.ParseModes

/**
* A collection of static functions for working with CSV files in Spark SQL
*/
class CsvParser {

private var useHeader: Boolean = true
private var useHeader: Boolean = false
private var delimiter: Character = ','
private var quote: Character = '"'
private var escape: Character = '\\'
private var schema: StructType = null
private var parseMode: String = ParseModes.DEFAULT

def withUseHeader(flag: Boolean): CsvParser = {
this.useHeader = flag
Expand All @@ -48,9 +52,27 @@ class CsvParser {
this
}

def withParseMode(mode: String): CsvParser = {
this.parseMode = mode
this
}

def withEscape(escapeChar: Character): CsvParser = {
this.escape = escapeChar
this
}

/** Returns a Schema RDD for the given CSV path. */
@throws[RuntimeException]
def csvFile(sqlContext: SQLContext, path: String): DataFrame = {
val relation: CsvRelation = CsvRelation(path, useHeader, delimiter, quote, schema)(sqlContext)
val relation: CsvRelation = CsvRelation(
path,
useHeader,
delimiter,
quote,
escape,
parseMode,
schema)(sqlContext)
sqlContext.baseRelationToDataFrame(relation)
}

Expand Down
36 changes: 30 additions & 6 deletions src/main/scala/com/databricks/spark/csv/CsvRelation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,28 @@ import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation, TableScan
import org.apache.spark.sql.types.{StructType, StructField, StringType}
import org.slf4j.LoggerFactory

import com.databricks.spark.csv.util.ParseModes

case class CsvRelation protected[spark] (
location: String,
useHeader: Boolean,
delimiter: Char,
quote: Char,
escape: Char,
parseMode: String,
userSchema: StructType = null)(@transient val sqlContext: SQLContext)
extends BaseRelation with TableScan with InsertableRelation {

private val logger = LoggerFactory.getLogger(CsvRelation.getClass)

// Parse mode flags
if (!ParseModes.isValidMode(parseMode)) {
logger.warn(s"$parseMode is not a valid parse mode. Using ${ParseModes.DEFAULT}.")
}
private val failFast = ParseModes.isFailFastMode(parseMode)
private val dropMalformed = ParseModes.isDropMalformedMode(parseMode)
private val permissive = ParseModes.isPermissiveMode(parseMode)

val schema = inferSchema()

// By making this a lazy val we keep the RDD around, amortizing the cost of locating splits.
Expand All @@ -53,6 +64,7 @@ case class CsvRelation protected[spark] (
val csvFormat = CSVFormat.DEFAULT
.withDelimiter(delimiter)
.withQuote(quote)
.withEscape(escape)
.withSkipHeaderRecord(false)
.withHeader(fieldNames: _*)

Expand All @@ -78,6 +90,7 @@ case class CsvRelation protected[spark] (
val csvFormat = CSVFormat.DEFAULT
.withDelimiter(delimiter)
.withQuote(quote)
.withEscape(escape)
.withSkipHeaderRecord(false)
val firstRow = CSVParser.parse(firstLine, csvFormat).getRecords.head.toList
val header = if (useHeader) {
Expand Down Expand Up @@ -115,22 +128,33 @@ case class CsvRelation protected[spark] (
projection: MutableProjection,
row: GenericMutableRow): Iterator[Row] = {
iter.flatMap { line =>
var index: Int = 0
try {
val records = CSVParser.parse(line, csvFormat).getRecords
if (records.isEmpty) {
logger.warn(s"Ignoring empty line: $line")
None
} else {
val tokens = records.head
var index = 0
while (index < schemaFields.length) {
row(index) = tokens.get(index)
index = index + 1
index = 0
if (dropMalformed && schemaFields.length != tokens.size) {
logger.warn(s"Dropping malformed line: $line")
None
} else if (failFast && schemaFields.length != tokens.size) {
throw new RuntimeException(s"Malformed line in FAILFAST mode: $line")
} else {
while (index < schemaFields.length) {
row(index) = tokens.get(index)
index = index + 1
}
Some(projection(row))
}
Some(projection(row))
}
} catch {
case NonFatal(e) =>
case aiob: ArrayIndexOutOfBoundsException if permissive =>
(index until schemaFields.length).foreach(ind => row(ind) = null)
Some(projection(row))
case NonFatal(e) if !failFast =>
logger.error(s"Exception while parsing line: $line. ", e)
None
}
Expand Down
19 changes: 17 additions & 2 deletions src/main/scala/com/databricks/spark/csv/DefaultSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,16 @@ class DefaultSource
throw new Exception("Quotation cannot be more than one character.")
}

val useHeader = parameters.getOrElse("header", "true")
val escape = parameters.getOrElse("escape", "\\")
val escapeChar = if (escape.length == 1) {
escape.charAt(0)
} else {
throw new Exception("Escape character cannot be more than one character.")
}

val parseMode = parameters.getOrElse("mode", "PERMISSIVE")

val useHeader = parameters.getOrElse("header", "false")
val headerFlag = if (useHeader == "true") {
true
} else if (useHeader == "false") {
Expand All @@ -71,7 +80,13 @@ class DefaultSource
throw new Exception("Header flag can be true or false")
}

CsvRelation(path, headerFlag, delimiterChar, quoteChar, schema)(sqlContext)
CsvRelation(path,
headerFlag,
delimiterChar,
quoteChar,
escapeChar,
parseMode,
schema)(sqlContext)
}

override def createRelation(
Expand Down
Loading