Skip to content

Commit

Permalink
Merge branch 'master' into palantir-master
Browse files Browse the repository at this point in the history
  • Loading branch information
Robert Kruszewski committed Oct 13, 2016
2 parents 9f5b0e2 + 0a8e51a commit 4a1c1a4
Show file tree
Hide file tree
Showing 308 changed files with 5,762 additions and 2,451 deletions.
4 changes: 1 addition & 3 deletions .github/PULL_REQUEST_TEMPLATE
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@

(Please fill in changes proposed in this fix)


## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)


(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before opening a pull request.
43 changes: 31 additions & 12 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,19 @@ setMethod("initialize", "SparkDataFrame", function(.Object, sdf, isCached) {
.Object
})

#' Set options/mode and then return the write object
#' @noRd
setWriteOptions <- function(write, path = NULL, mode = "error", ...) {
options <- varargsToStrEnv(...)
if (!is.null(path)) {
options[["path"]] <- path
}
jmode <- convertToJSaveMode(mode)
write <- callJMethod(write, "mode", jmode)
write <- callJMethod(write, "options", options)
write
}

#' @export
#' @param sdf A Java object reference to the backing Scala DataFrame
#' @param isCached TRUE if the SparkDataFrame is cached
Expand Down Expand Up @@ -727,6 +740,8 @@ setMethod("toJSON",
#'
#' @param x A SparkDataFrame
#' @param path The directory where the file is saved
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
#' @param ... additional argument(s) passed to the method.
#'
#' @family SparkDataFrame functions
#' @rdname write.json
Expand All @@ -743,8 +758,9 @@ setMethod("toJSON",
#' @note write.json since 1.6.0
setMethod("write.json",
signature(x = "SparkDataFrame", path = "character"),
function(x, path) {
function(x, path, mode = "error", ...) {
write <- callJMethod(x@sdf, "write")
write <- setWriteOptions(write, mode = mode, ...)
invisible(callJMethod(write, "json", path))
})

Expand All @@ -755,6 +771,8 @@ setMethod("write.json",
#'
#' @param x A SparkDataFrame
#' @param path The directory where the file is saved
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
#' @param ... additional argument(s) passed to the method.
#'
#' @family SparkDataFrame functions
#' @aliases write.orc,SparkDataFrame,character-method
Expand All @@ -771,8 +789,9 @@ setMethod("write.json",
#' @note write.orc since 2.0.0
setMethod("write.orc",
signature(x = "SparkDataFrame", path = "character"),
function(x, path) {
function(x, path, mode = "error", ...) {
write <- callJMethod(x@sdf, "write")
write <- setWriteOptions(write, mode = mode, ...)
invisible(callJMethod(write, "orc", path))
})

Expand All @@ -783,6 +802,8 @@ setMethod("write.orc",
#'
#' @param x A SparkDataFrame
#' @param path The directory where the file is saved
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
#' @param ... additional argument(s) passed to the method.
#'
#' @family SparkDataFrame functions
#' @rdname write.parquet
Expand All @@ -800,8 +821,9 @@ setMethod("write.orc",
#' @note write.parquet since 1.6.0
setMethod("write.parquet",
signature(x = "SparkDataFrame", path = "character"),
function(x, path) {
function(x, path, mode = "error", ...) {
write <- callJMethod(x@sdf, "write")
write <- setWriteOptions(write, mode = mode, ...)
invisible(callJMethod(write, "parquet", path))
})

Expand All @@ -825,6 +847,8 @@ setMethod("saveAsParquetFile",
#'
#' @param x A SparkDataFrame
#' @param path The directory where the file is saved
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
#' @param ... additional argument(s) passed to the method.
#'
#' @family SparkDataFrame functions
#' @aliases write.text,SparkDataFrame,character-method
Expand All @@ -841,8 +865,9 @@ setMethod("saveAsParquetFile",
#' @note write.text since 2.0.0
setMethod("write.text",
signature(x = "SparkDataFrame", path = "character"),
function(x, path) {
function(x, path, mode = "error", ...) {
write <- callJMethod(x@sdf, "write")
write <- setWriteOptions(write, mode = mode, ...)
invisible(callJMethod(write, "text", path))
})

Expand Down Expand Up @@ -2637,15 +2662,9 @@ setMethod("write.df",
if (is.null(source)) {
source <- getDefaultSqlSource()
}
jmode <- convertToJSaveMode(mode)
options <- varargsToEnv(...)
if (!is.null(path)) {
options[["path"]] <- path
}
write <- callJMethod(df@sdf, "write")
write <- callJMethod(write, "format", source)
write <- callJMethod(write, "mode", jmode)
write <- callJMethod(write, "options", options)
write <- setWriteOptions(write, path = path, mode = mode, ...)
write <- handledCallJMethod(write, "save")
})

Expand Down Expand Up @@ -2701,7 +2720,7 @@ setMethod("saveAsTable",
source <- getDefaultSqlSource()
}
jmode <- convertToJSaveMode(mode)
options <- varargsToEnv(...)
options <- varargsToStrEnv(...)

write <- callJMethod(df@sdf, "write")
write <- callJMethod(write, "format", source)
Expand Down
23 changes: 17 additions & 6 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ setMethod("toDF", signature(x = "RDD"),
#' It goes through the entire dataset once to determine the schema.
#'
#' @param path Path of file to read. A vector of multiple paths is allowed.
#' @param ... additional external data source specific named properties.
#' @return SparkDataFrame
#' @rdname read.json
#' @export
Expand All @@ -341,11 +342,13 @@ setMethod("toDF", signature(x = "RDD"),
#' @name read.json
#' @method read.json default
#' @note read.json since 1.6.0
read.json.default <- function(path) {
read.json.default <- function(path, ...) {
sparkSession <- getSparkSession()
options <- varargsToStrEnv(...)
# Allow the user to have a more flexible definiton of the text file path
paths <- as.list(suppressWarnings(normalizePath(path)))
read <- callJMethod(sparkSession, "read")
read <- callJMethod(read, "options", options)
sdf <- callJMethod(read, "json", paths)
dataFrame(sdf)
}
Expand Down Expand Up @@ -405,16 +408,19 @@ jsonRDD <- function(sqlContext, rdd, schema = NULL, samplingRatio = 1.0) {
#' Loads an ORC file, returning the result as a SparkDataFrame.
#'
#' @param path Path of file to read.
#' @param ... additional external data source specific named properties.
#' @return SparkDataFrame
#' @rdname read.orc
#' @export
#' @name read.orc
#' @note read.orc since 2.0.0
read.orc <- function(path) {
read.orc <- function(path, ...) {
sparkSession <- getSparkSession()
options <- varargsToStrEnv(...)
# Allow the user to have a more flexible definiton of the ORC file path
path <- suppressWarnings(normalizePath(path))
read <- callJMethod(sparkSession, "read")
read <- callJMethod(read, "options", options)
sdf <- callJMethod(read, "orc", path)
dataFrame(sdf)
}
Expand All @@ -430,11 +436,13 @@ read.orc <- function(path) {
#' @name read.parquet
#' @method read.parquet default
#' @note read.parquet since 1.6.0
read.parquet.default <- function(path) {
read.parquet.default <- function(path, ...) {
sparkSession <- getSparkSession()
options <- varargsToStrEnv(...)
# Allow the user to have a more flexible definiton of the Parquet file path
paths <- as.list(suppressWarnings(normalizePath(path)))
read <- callJMethod(sparkSession, "read")
read <- callJMethod(read, "options", options)
sdf <- callJMethod(read, "parquet", paths)
dataFrame(sdf)
}
Expand Down Expand Up @@ -467,6 +475,7 @@ parquetFile <- function(x, ...) {
#' Each line in the text file is a new row in the resulting SparkDataFrame.
#'
#' @param path Path of file to read. A vector of multiple paths is allowed.
#' @param ... additional external data source specific named properties.
#' @return SparkDataFrame
#' @rdname read.text
#' @export
Expand All @@ -479,11 +488,13 @@ parquetFile <- function(x, ...) {
#' @name read.text
#' @method read.text default
#' @note read.text since 1.6.1
read.text.default <- function(path) {
read.text.default <- function(path, ...) {
sparkSession <- getSparkSession()
options <- varargsToStrEnv(...)
# Allow the user to have a more flexible definiton of the text file path
paths <- as.list(suppressWarnings(normalizePath(path)))
read <- callJMethod(sparkSession, "read")
read <- callJMethod(read, "options", options)
sdf <- callJMethod(read, "text", paths)
dataFrame(sdf)
}
Expand Down Expand Up @@ -779,7 +790,7 @@ read.df.default <- function(path = NULL, source = NULL, schema = NULL, na.string
"in 'spark.sql.sources.default' configuration by default.")
}
sparkSession <- getSparkSession()
options <- varargsToEnv(...)
options <- varargsToStrEnv(...)
if (!is.null(path)) {
options[["path"]] <- path
}
Expand Down Expand Up @@ -842,7 +853,7 @@ loadDF <- function(x = NULL, ...) {
#' @note createExternalTable since 1.4.0
createExternalTable.default <- function(tableName, path = NULL, source = NULL, ...) {
sparkSession <- getSparkSession()
options <- varargsToEnv(...)
options <- varargsToStrEnv(...)
if (!is.null(path)) {
options[["path"]] <- path
}
Expand Down
45 changes: 43 additions & 2 deletions R/pkg/R/context.R
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ objectFile <- function(sc, path, minPartitions = NULL) {
#' in the list are split into \code{numSlices} slices and distributed to nodes
#' in the cluster.
#'
#' If size of serialized slices is larger than spark.r.maxAllocationLimit or (200MB), the function
#' will write it to disk and send the file name to JVM. Also to make sure each slice is not
#' larger than that limit, number of slices may be increased.
#'
#' @param sc SparkContext to use
#' @param coll collection to parallelize
#' @param numSlices number of partitions to create in the RDD
Expand Down Expand Up @@ -120,6 +124,11 @@ parallelize <- function(sc, coll, numSlices = 1) {
coll <- as.list(coll)
}

sizeLimit <- getMaxAllocationLimit(sc)
objectSize <- object.size(coll)

# For large objects we make sure the size of each slice is also smaller than sizeLimit
numSlices <- max(numSlices, ceiling(objectSize / sizeLimit))
if (numSlices > length(coll))
numSlices <- length(coll)

Expand All @@ -130,12 +139,44 @@ parallelize <- function(sc, coll, numSlices = 1) {
# 2-tuples of raws
serializedSlices <- lapply(slices, serialize, connection = NULL)

jrdd <- callJStatic("org.apache.spark.api.r.RRDD",
"createRDDFromArray", sc, serializedSlices)
# The PRC backend cannot handle arguments larger than 2GB (INT_MAX)
# If serialized data is safely less than that threshold we send it over the PRC channel.
# Otherwise, we write it to a file and send the file name
if (objectSize < sizeLimit) {
jrdd <- callJStatic("org.apache.spark.api.r.RRDD", "createRDDFromArray", sc, serializedSlices)
} else {
fileName <- writeToTempFile(serializedSlices)
jrdd <- tryCatch(callJStatic(
"org.apache.spark.api.r.RRDD", "createRDDFromFile", sc, fileName, as.integer(numSlices)),
finally = {
file.remove(fileName)
})
}

RDD(jrdd, "byte")
}

getMaxAllocationLimit <- function(sc) {
conf <- callJMethod(sc, "getConf")
as.numeric(
callJMethod(conf,
"get",
"spark.r.maxAllocationLimit",
toString(.Machine$integer.max / 10) # Default to a safe value: 200MB
))
}

writeToTempFile <- function(serializedSlices) {
fileName <- tempfile()
conn <- file(fileName, "wb")
for (slice in serializedSlices) {
writeBin(as.integer(length(slice)), conn, endian = "big")
writeBin(slice, conn, endian = "big")
}
close(conn)
fileName
}

#' Include this specified package on all workers
#'
#' This function can be used to include a package on all workers before the
Expand Down
10 changes: 6 additions & 4 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -651,23 +651,25 @@ setGeneric("write.jdbc", function(x, url, tableName, mode = "error", ...) {

#' @rdname write.json
#' @export
setGeneric("write.json", function(x, path) { standardGeneric("write.json") })
setGeneric("write.json", function(x, path, ...) { standardGeneric("write.json") })

#' @rdname write.orc
#' @export
setGeneric("write.orc", function(x, path) { standardGeneric("write.orc") })
setGeneric("write.orc", function(x, path, ...) { standardGeneric("write.orc") })

#' @rdname write.parquet
#' @export
setGeneric("write.parquet", function(x, path) { standardGeneric("write.parquet") })
setGeneric("write.parquet", function(x, path, ...) {
standardGeneric("write.parquet")
})

#' @rdname write.parquet
#' @export
setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParquetFile") })

#' @rdname write.text
#' @export
setGeneric("write.text", function(x, path) { standardGeneric("write.text") })
setGeneric("write.text", function(x, path, ...) { standardGeneric("write.text") })

#' @rdname schema
#' @export
Expand Down
22 changes: 22 additions & 0 deletions R/pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,28 @@ varargsToEnv <- function(...) {
env
}

# Utility function to capture the varargs into environment object but all values are converted
# into string.
varargsToStrEnv <- function(...) {
pairs <- list(...)
env <- new.env()
for (name in names(pairs)) {
value <- pairs[[name]]
if (!(is.logical(value) || is.numeric(value) || is.character(value) || is.null(value))) {
stop(paste0("Unsupported type for ", name, " : ", class(value),
". Supported types are logical, numeric, character and NULL."))
}
if (is.logical(value)) {
env[[name]] <- tolower(as.character(value))
} else if (is.null(value)) {
env[[name]] <- value
} else {
env[[name]] <- as.character(value)
}
}
env
}

getStorageLevel <- function(newLevel = c("DISK_ONLY",
"DISK_ONLY_2",
"MEMORY_AND_DISK",
Expand Down
10 changes: 10 additions & 0 deletions R/pkg/inst/tests/testthat/test_mllib.R
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,16 @@ test_that("spark.naiveBayes", {
expect_error(m <- e1071::naiveBayes(Survived ~ ., data = t1), NA)
expect_equal(as.character(predict(m, t1[1, ])), "Yes")
}

# Test numeric response variable
t1$NumericSurvived <- ifelse(t1$Survived == "No", 0, 1)
t2 <- t1[-4]
df <- suppressWarnings(createDataFrame(t2))
m <- spark.naiveBayes(df, NumericSurvived ~ ., smoothing = 0.0)
s <- summary(m)
expect_equal(as.double(s$apriori[1, 1]), 0.5833333, tolerance = 1e-6)
expect_equal(sum(s$apriori), 1)
expect_equal(as.double(s$tables[1, "Age_Adult"]), 0.5714286, tolerance = 1e-6)
})

test_that("spark.survreg", {
Expand Down
Loading

0 comments on commit 4a1c1a4

Please sign in to comment.