Skip to content

Commit

Permalink
Merge branch 'master' into palantir-master
Browse files Browse the repository at this point in the history
  • Loading branch information
Robert Kruszewski committed Oct 7, 2016
2 parents 26b6b08 + 49d11d4 commit 9f5b0e2
Show file tree
Hide file tree
Showing 367 changed files with 11,786 additions and 3,582 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
R-unit-tests.log
R/unit-tests.out
R/cran-check.out
R/pkg/vignettes/sparkr-vignettes.html
build/*.jar
build/apache-maven*
build/scala*
Expand Down
20 changes: 15 additions & 5 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -2608,7 +2608,7 @@ setMethod("except",
#' @param ... additional argument(s) passed to the method.
#'
#' @family SparkDataFrame functions
#' @aliases write.df,SparkDataFrame,character-method
#' @aliases write.df,SparkDataFrame-method
#' @rdname write.df
#' @name write.df
#' @export
Expand All @@ -2622,21 +2622,31 @@ setMethod("except",
#' }
#' @note write.df since 1.4.0
setMethod("write.df",
signature(df = "SparkDataFrame", path = "character"),
function(df, path, source = NULL, mode = "error", ...) {
signature(df = "SparkDataFrame"),
function(df, path = NULL, source = NULL, mode = "error", ...) {
if (!is.null(path) && !is.character(path)) {
stop("path should be charactor, NULL or omitted.")
}
if (!is.null(source) && !is.character(source)) {
stop("source should be character, NULL or omitted. It is the datasource specified ",
"in 'spark.sql.sources.default' configuration by default.")
}
if (!is.character(mode)) {
stop("mode should be charactor or omitted. It is 'error' by default.")
}
if (is.null(source)) {
source <- getDefaultSqlSource()
}
jmode <- convertToJSaveMode(mode)
options <- varargsToEnv(...)
if (!is.null(path)) {
options[["path"]] <- path
options[["path"]] <- path
}
write <- callJMethod(df@sdf, "write")
write <- callJMethod(write, "format", source)
write <- callJMethod(write, "mode", jmode)
write <- callJMethod(write, "options", options)
write <- callJMethod(write, "save", path)
write <- handledCallJMethod(write, "save")
})

#' @rdname write.df
Expand Down
19 changes: 13 additions & 6 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,13 @@ dropTempView <- function(viewName) {
#' @method read.df default
#' @note read.df since 1.4.0
read.df.default <- function(path = NULL, source = NULL, schema = NULL, na.strings = "NA", ...) {
if (!is.null(path) && !is.character(path)) {
stop("path should be charactor, NULL or omitted.")
}
if (!is.null(source) && !is.character(source)) {
stop("source should be character, NULL or omitted. It is the datasource specified ",
"in 'spark.sql.sources.default' configuration by default.")
}
sparkSession <- getSparkSession()
options <- varargsToEnv(...)
if (!is.null(path)) {
Expand All @@ -784,16 +791,16 @@ read.df.default <- function(path = NULL, source = NULL, schema = NULL, na.string
}
if (!is.null(schema)) {
stopifnot(class(schema) == "structType")
sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "loadDF", sparkSession, source,
schema$jobj, options)
sdf <- handledCallJStatic("org.apache.spark.sql.api.r.SQLUtils", "loadDF", sparkSession,
source, schema$jobj, options)
} else {
sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils",
"loadDF", sparkSession, source, options)
sdf <- handledCallJStatic("org.apache.spark.sql.api.r.SQLUtils", "loadDF", sparkSession,
source, options)
}
dataFrame(sdf)
}

read.df <- function(x, ...) {
read.df <- function(x = NULL, ...) {
dispatchFunc("read.df(path = NULL, source = NULL, schema = NULL, ...)", x, ...)
}

Expand All @@ -805,7 +812,7 @@ loadDF.default <- function(path = NULL, source = NULL, schema = NULL, ...) {
read.df(path, source, schema, ...)
}

loadDF <- function(x, ...) {
loadDF <- function(x = NULL, ...) {
dispatchFunc("loadDF(path = NULL, source = NULL, schema = NULL, ...)", x, ...)
}

Expand Down
9 changes: 7 additions & 2 deletions R/pkg/R/context.R
Original file line number Diff line number Diff line change
Expand Up @@ -231,17 +231,22 @@ setCheckpointDir <- function(sc, dirName) {
#' filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
#' use spark.getSparkFiles(fileName) to find its download location.
#'
#' A directory can be given if the recursive option is set to true.
#' Currently directories are only supported for Hadoop-supported filesystems.
#' Refer Hadoop-supported filesystems at \url{https://wiki.apache.org/hadoop/HCFS}.
#'
#' @rdname spark.addFile
#' @param path The path of the file to be added
#' @param recursive Whether to add files recursively from the path. Default is FALSE.
#' @export
#' @examples
#'\dontrun{
#' spark.addFile("~/myfile")
#'}
#' @note spark.addFile since 2.1.0
spark.addFile <- function(path) {
spark.addFile <- function(path, recursive = FALSE) {
sc <- getSparkContext()
invisible(callJMethod(sc, "addFile", suppressWarnings(normalizePath(path))))
invisible(callJMethod(sc, "addFile", suppressWarnings(normalizePath(path)), recursive))
}

#' Get the root directory that contains files added through spark.addFile.
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ setGeneric("transform", function(`_data`, ...) {standardGeneric("transform") })

#' @rdname write.df
#' @export
setGeneric("write.df", function(df, path, source = NULL, mode = "error", ...) {
setGeneric("write.df", function(df, path = NULL, source = NULL, mode = "error", ...) {
standardGeneric("write.df")
})

Expand Down Expand Up @@ -732,7 +732,7 @@ setGeneric("withColumnRenamed",

#' @rdname write.df
#' @export
setGeneric("write.df", function(df, path, ...) { standardGeneric("write.df") })
setGeneric("write.df", function(df, path = NULL, ...) { standardGeneric("write.df") })

#' @rdname randomSplit
#' @export
Expand Down
3 changes: 3 additions & 0 deletions R/pkg/R/mllib.R
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,9 @@ setMethod("predict", signature(object = "KMeansModel"),
setMethod("spark.mlp", signature(data = "SparkDataFrame"),
function(data, layers, blockSize = 128, solver = "l-bfgs", maxIter = 100,
tol = 1E-6, stepSize = 0.03, seed = NULL) {
if (is.null(layers)) {
stop ("layers must be a integer vector with length > 1.")
}
layers <- as.integer(na.omit(layers))
if (length(layers) <= 1) {
stop ("layers must be a integer vector with length > 1.")
Expand Down
52 changes: 52 additions & 0 deletions R/pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,58 @@ isSparkRShell <- function() {
grepl(".*shell\\.R$", Sys.getenv("R_PROFILE_USER"), perl = TRUE)
}

# Works identically with `callJStatic(...)` but throws a pretty formatted exception.
handledCallJStatic <- function(cls, method, ...) {
result <- tryCatch(callJStatic(cls, method, ...),
error = function(e) {
captureJVMException(e, method)
})
result
}

# Works identically with `callJMethod(...)` but throws a pretty formatted exception.
handledCallJMethod <- function(obj, method, ...) {
result <- tryCatch(callJMethod(obj, method, ...),
error = function(e) {
captureJVMException(e, method)
})
result
}

captureJVMException <- function(e, method) {
rawmsg <- as.character(e)
if (any(grep("^Error in .*?: ", rawmsg))) {
# If the exception message starts with "Error in ...", this is possibly
# "Error in invokeJava(...)". Here, it replaces the characters to
# `paste("Error in", method, ":")` in order to identify which function
# was called in JVM side.
stacktrace <- strsplit(rawmsg, "Error in .*?: ")[[1]]
rmsg <- paste("Error in", method, ":")
stacktrace <- paste(rmsg[1], stacktrace[2])
} else {
# Otherwise, do not convert the error message just in case.
stacktrace <- rawmsg
}

if (any(grep("java.lang.IllegalArgumentException: ", stacktrace))) {
msg <- strsplit(stacktrace, "java.lang.IllegalArgumentException: ", fixed = TRUE)[[1]]
# Extract "Error in ..." message.
rmsg <- msg[1]
# Extract the first message of JVM exception.
first <- strsplit(msg[2], "\r?\n\tat")[[1]][1]
stop(paste0(rmsg, "illegal argument - ", first), call. = FALSE)
} else if (any(grep("org.apache.spark.sql.AnalysisException: ", stacktrace))) {
msg <- strsplit(stacktrace, "org.apache.spark.sql.AnalysisException: ", fixed = TRUE)[[1]]
# Extract "Error in ..." message.
rmsg <- msg[1]
# Extract the first message of JVM exception.
first <- strsplit(msg[2], "\r?\n\tat")[[1]][1]
stop(paste0(rmsg, "analysis error - ", first), call. = FALSE)
} else {
stop(stacktrace, call. = FALSE)
}
}

# rbind a list of rows with raw (binary) columns
#
# @param inputData a list of rows, with each row a list
Expand Down
22 changes: 22 additions & 0 deletions R/pkg/inst/tests/testthat/test_context.R
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ test_that("spark.lapply should perform simple transforms", {

test_that("add and get file to be downloaded with Spark job on every node", {
sparkR.sparkContext()
# Test add file.
path <- tempfile(pattern = "hello", fileext = ".txt")
filename <- basename(path)
words <- "Hello World!"
Expand All @@ -177,5 +178,26 @@ test_that("add and get file to be downloaded with Spark job on every node", {
download_path <- spark.getSparkFiles(filename)
expect_equal(readLines(download_path), words)
unlink(path)

# Test add directory recursively.
path <- paste0(tempdir(), "/", "recursive_dir")
dir.create(path)
dir_name <- basename(path)
path1 <- paste0(path, "/", "hello.txt")
file.create(path1)
sub_path <- paste0(path, "/", "sub_hello")
dir.create(sub_path)
path2 <- paste0(sub_path, "/", "sub_hello.txt")
file.create(path2)
words <- "Hello World!"
sub_words <- "Sub Hello World!"
writeLines(words, path1)
writeLines(sub_words, path2)
spark.addFile(path, recursive = TRUE)
download_path1 <- spark.getSparkFiles(paste0(dir_name, "/", "hello.txt"))
expect_equal(readLines(download_path1), words)
download_path2 <- spark.getSparkFiles(paste0(dir_name, "/", "sub_hello/sub_hello.txt"))
expect_equal(readLines(download_path2), sub_words)
unlink(path, recursive = TRUE)
sparkR.session.stop()
})
35 changes: 35 additions & 0 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -2544,6 +2544,41 @@ test_that("Spark version from SparkSession", {
expect_equal(ver, version)
})

test_that("Call DataFrameWriter.save() API in Java without path and check argument types", {
df <- read.df(jsonPath, "json")
# This tests if the exception is thrown from JVM not from SparkR side.
# It makes sure that we can omit path argument in write.df API and then it calls
# DataFrameWriter.save() without path.
expect_error(write.df(df, source = "csv"),
"Error in save : illegal argument - 'path' is not specified")

# Arguments checking in R side.
expect_error(write.df(df, "data.tmp", source = c(1, 2)),
paste("source should be character, NULL or omitted. It is the datasource specified",
"in 'spark.sql.sources.default' configuration by default."))
expect_error(write.df(df, path = c(3)),
"path should be charactor, NULL or omitted.")
expect_error(write.df(df, mode = TRUE),
"mode should be charactor or omitted. It is 'error' by default.")
})

test_that("Call DataFrameWriter.load() API in Java without path and check argument types", {
# This tests if the exception is thrown from JVM not from SparkR side.
# It makes sure that we can omit path argument in read.df API and then it calls
# DataFrameWriter.load() without path.
expect_error(read.df(source = "json"),
paste("Error in loadDF : analysis error - Unable to infer schema for JSON at .",
"It must be specified manually"))
expect_error(read.df("arbitrary_path"), "Error in loadDF : analysis error - Path does not exist")

# Arguments checking in R side.
expect_error(read.df(path = c(3)),
"path should be charactor, NULL or omitted.")
expect_error(read.df(jsonPath, source = c(1, 2)),
paste("source should be character, NULL or omitted. It is the datasource specified",
"in 'spark.sql.sources.default' configuration by default."))
})

unlink(parquetPath)
unlink(orcPath)
unlink(jsonPath)
Expand Down
10 changes: 10 additions & 0 deletions R/pkg/inst/tests/testthat/test_utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,16 @@ test_that("convertToJSaveMode", {
'mode should be one of "append", "overwrite", "error", "ignore"') #nolint
})

test_that("captureJVMException", {
method <- "getSQLDataType"
expect_error(tryCatch(callJStatic("org.apache.spark.sql.api.r.SQLUtils", method,
"unknown"),
error = function(e) {
captureJVMException(e, method)
}),
"Error in getSQLDataType : illegal argument - Invalid type unknown")
})

test_that("hashCode", {
expect_error(hashCode("bc53d3605e8a5b7de1e8e271c2317645"), NA)
})
Expand Down
31 changes: 20 additions & 11 deletions R/pkg/vignettes/sparkr-vignettes.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ library(SparkR)

We use default settings in which it runs in local mode. It auto downloads Spark package in the background if no previous installation is found. For more details about setup, see [Spark Session](#SetupSparkSession).

```{r, message=FALSE}
```{r, message=FALSE, results="hide"}
sparkR.session()
```

Expand Down Expand Up @@ -114,10 +114,12 @@ In particular, the following Spark driver properties can be set in `sparkConfig`

Property Name | Property group | spark-submit equivalent
---------------- | ------------------ | ----------------------
spark.driver.memory | Application Properties | --driver-memory
spark.driver.extraClassPath | Runtime Environment | --driver-class-path
spark.driver.extraJavaOptions | Runtime Environment | --driver-java-options
spark.driver.extraLibraryPath | Runtime Environment | --driver-library-path
`spark.driver.memory` | Application Properties | `--driver-memory`
`spark.driver.extraClassPath` | Runtime Environment | `--driver-class-path`
`spark.driver.extraJavaOptions` | Runtime Environment | `--driver-java-options`
`spark.driver.extraLibraryPath` | Runtime Environment | `--driver-library-path`
`spark.yarn.keytab` | Application Properties | `--keytab`
`spark.yarn.principal` | Application Properties | `--principal`

**For Windows users**: Due to different file prefixes across operating systems, to avoid the issue of potential wrong prefix, a current workaround is to specify `spark.sql.warehouse.dir` when starting the `SparkSession`.

Expand Down Expand Up @@ -161,7 +163,7 @@ head(df)
### Data Sources
SparkR supports operating on a variety of data sources through the `SparkDataFrame` interface. You can check the Spark SQL programming guide for more [specific options](https://spark.apache.org/docs/latest/sql-programming-guide.html#manually-specifying-options) that are available for the built-in data sources.

The general method for creating `SparkDataFrame` from data sources is `read.df`. This method takes in the path for the file to load and the type of data source, and the currently active Spark Session will be used automatically. SparkR supports reading CSV, JSON and Parquet files natively and through Spark Packages you can find data source connectors for popular file formats like Avro. These packages can be added with `sparkPackages` parameter when initializing SparkSession using `sparkR.session'.`
The general method for creating `SparkDataFrame` from data sources is `read.df`. This method takes in the path for the file to load and the type of data source, and the currently active Spark Session will be used automatically. SparkR supports reading CSV, JSON and Parquet files natively and through Spark Packages you can find data source connectors for popular file formats like Avro. These packages can be added with `sparkPackages` parameter when initializing SparkSession using `sparkR.session`.

```{r, eval=FALSE}
sparkR.session(sparkPackages = "com.databricks:spark-avro_2.11:3.0.0")
Expand Down Expand Up @@ -406,10 +408,17 @@ class(model.summaries)
```


To avoid lengthy display, we only present the result of the second fitted model. You are free to inspect other models as well.
To avoid lengthy display, we only present the partial result of the second fitted model. You are free to inspect other models as well.
```{r, include=FALSE}
ops <- options()
options(max.print=40)
```
```{r}
print(model.summaries[[2]])
```
```{r, include=FALSE}
options(ops)
```


### SQL Queries
Expand Down Expand Up @@ -544,7 +553,7 @@ head(select(kmeansPredictions, "model", "mpg", "hp", "wt", "prediction"), n = 20
Survival analysis studies the expected duration of time until an event happens, and often the relationship with risk factors or treatment taken on the subject. In contrast to standard regression analysis, survival modeling has to deal with special characteristics in the data including non-negative survival time and censoring.

Accelerated Failure Time (AFT) model is a parametric survival model for censored data that assumes the effect of a covariate is to accelerate or decelerate the life course of an event by some constant. For more information, refer to the Wikipedia page [AFT Model](https://en.wikipedia.org/wiki/Accelerated_failure_time_model) and the references there. Different from a [Proportional Hazards Model](https://en.wikipedia.org/wiki/Proportional_hazards_model) designed for the same purpose, the AFT model is easier to parallelize because each instance contributes to the objective function independently.
```{r}
```{r, warning=FALSE}
library(survival)
ovarianDF <- createDataFrame(ovarian)
aftModel <- spark.survreg(ovarianDF, Surv(futime, fustat) ~ ecog_ps + rx)
Expand Down Expand Up @@ -678,7 +687,7 @@ MLPC employs backpropagation for learning the model. We use the logistic loss fu

* `tol`: convergence tolerance of iterations.

* `stepSize`: step size for `"gd"`.
* `stepSize`: step size for `"gd"`.

* `seed`: seed parameter for weights initialization.

Expand Down Expand Up @@ -763,8 +772,8 @@ We also expect Decision Tree, Random Forest, Kolmogorov-Smirnov Test coming in t

### Model Persistence
The following example shows how to save/load an ML model by SparkR.
```{r}
irisDF <- suppressWarnings(createDataFrame(iris))
```{r, warning=FALSE}
irisDF <- createDataFrame(iris)
gaussianGLM <- spark.glm(irisDF, Sepal_Length ~ Sepal_Width + Species, family = "gaussian")
# Save and then load a fitted MLlib model
Expand Down
Loading

0 comments on commit 9f5b0e2

Please sign in to comment.