From 5154b7b409d1c39240f25ab32ce35b75ed54a2fa Mon Sep 17 00:00:00 2001 From: HyukjinKwon Date: Fri, 4 Oct 2019 08:56:45 +0900 Subject: [PATCH] [SPARK-29339][R] Support Arrow 0.14 in vectoried dapply and gapply (test it in AppVeyor build) This PR proposes: 1. Use `is.data.frame` to check if it is a DataFrame. 2. to install Arrow and test Arrow optimization in AppVeyor build. We're currently not testing this in CI. 1. To support SparkR with Arrow 0.14 2. To check if there's any regression and if it works correctly. ```r df <- createDataFrame(mtcars) collect(dapply(df, function(rdf) { data.frame(rdf$gear + 1) }, structType("gear double"))) ``` **Before:** ``` Error in readBin(con, raw(), as.integer(dataLen), endian = "big") : invalid 'n' argument ``` **After:** ``` gear 1 5 2 5 3 5 4 4 5 4 6 4 7 4 8 5 9 5 ... ``` AppVeyor Closes #25993 from HyukjinKwon/arrow-r-appveyor. Authored-by: HyukjinKwon Signed-off-by: HyukjinKwon --- R/check-cran.sh | 4 ++++ R/pkg/DESCRIPTION | 3 ++- R/pkg/R/SQLContext.R | 20 ++++---------------- R/pkg/R/deserialize.R | 8 ++------ R/pkg/R/serialize.R | 8 ++------ R/pkg/inst/worker/worker.R | 2 +- R/pkg/tests/fulltests/test_sparkSQL_arrow.R | 8 ++++---- appveyor.yml | 4 ++-- docs/sparkr.md | 13 ++++++++++--- 9 files changed, 31 insertions(+), 39 deletions(-) diff --git a/R/check-cran.sh b/R/check-cran.sh index 56ba1cdbdd333..113bc292cec26 100755 --- a/R/check-cran.sh +++ b/R/check-cran.sh @@ -61,6 +61,10 @@ fi echo "Running CRAN check with $CRAN_CHECK_OPTIONS options" +# Remove this environment variable to allow to check suggested packages once +# Jenkins installs arrow. See SPARK-29339. +export _R_CHECK_FORCE_SUGGESTS_=FALSE + if [ -n "$NO_TESTS" ] && [ -n "$NO_MANUAL" ] then "$R_SCRIPT_PATH/R" CMD check $CRAN_CHECK_OPTIONS "SparkR_$VERSION.tar.gz" diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index 74cdbd185e570..4a62ed880768d 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -22,7 +22,8 @@ Suggests: rmarkdown, testthat, e1071, - survival + survival, + arrow Collate: 'schema.R' 'generics.R' diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index a6ab156ec1a5b..b2d0d15d6a372 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -148,19 +148,7 @@ getDefaultSqlSource <- function() { } writeToFileInArrow <- function(fileName, rdf, numPartitions) { - requireNamespace1 <- requireNamespace - - # R API in Arrow is not yet released in CRAN. CRAN requires to add the - # package in requireNamespace at DESCRIPTION. Later, CRAN checks if the package is available - # or not. Therefore, it works around by avoiding direct requireNamespace. - # Currently, as of Arrow 0.12.0, it can be installed by install_github. See ARROW-3204. - if (requireNamespace1("arrow", quietly = TRUE)) { - record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE) - RecordBatchStreamWriter <- get( - "RecordBatchStreamWriter", envir = asNamespace("arrow"), inherits = FALSE) - FileOutputStream <- get( - "FileOutputStream", envir = asNamespace("arrow"), inherits = FALSE) - + if (requireNamespace("arrow", quietly = TRUE)) { numPartitions <- if (!is.null(numPartitions)) { numToInt(numPartitions) } else { @@ -176,11 +164,11 @@ writeToFileInArrow <- function(fileName, rdf, numPartitions) { stream_writer <- NULL tryCatch({ for (rdf_slice in rdf_slices) { - batch <- record_batch(rdf_slice) + batch <- arrow::record_batch(rdf_slice) if (is.null(stream_writer)) { - stream <- FileOutputStream(fileName) + stream <- arrow::FileOutputStream(fileName) schema <- batch$schema - stream_writer <- RecordBatchStreamWriter(stream, schema) + stream_writer <- arrow::RecordBatchStreamWriter(stream, schema) } stream_writer$write_batch(batch) diff --git a/R/pkg/R/deserialize.R b/R/pkg/R/deserialize.R index b38d245a0cca7..a6febb1cbd132 100644 --- a/R/pkg/R/deserialize.R +++ b/R/pkg/R/deserialize.R @@ -232,11 +232,7 @@ readMultipleObjectsWithKeys <- function(inputCon) { } readDeserializeInArrow <- function(inputCon) { - # This is a hack to avoid CRAN check. Arrow is not uploaded into CRAN now. See ARROW-3204. - requireNamespace1 <- requireNamespace - if (requireNamespace1("arrow", quietly = TRUE)) { - RecordBatchStreamReader <- get( - "RecordBatchStreamReader", envir = asNamespace("arrow"), inherits = FALSE) + if (requireNamespace("arrow", quietly = TRUE)) { # Arrow drops `as_tibble` since 0.14.0, see ARROW-5190. useAsTibble <- exists("as_tibble", envir = asNamespace("arrow")) @@ -246,7 +242,7 @@ readDeserializeInArrow <- function(inputCon) { # for now. dataLen <- readInt(inputCon) arrowData <- readBin(inputCon, raw(), as.integer(dataLen), endian = "big") - batches <- RecordBatchStreamReader(arrowData)$batches() + batches <- arrow::RecordBatchStreamReader(arrowData)$batches() if (useAsTibble) { as_tibble <- get("as_tibble", envir = asNamespace("arrow")) diff --git a/R/pkg/R/serialize.R b/R/pkg/R/serialize.R index 0d6f32c8f7e1f..cb3c1c59d12ed 100644 --- a/R/pkg/R/serialize.R +++ b/R/pkg/R/serialize.R @@ -222,15 +222,11 @@ writeArgs <- function(con, args) { } writeSerializeInArrow <- function(conn, df) { - # This is a hack to avoid CRAN check. Arrow is not uploaded into CRAN now. See ARROW-3204. - requireNamespace1 <- requireNamespace - if (requireNamespace1("arrow", quietly = TRUE)) { - write_arrow <- get("write_arrow", envir = asNamespace("arrow"), inherits = FALSE) - + if (requireNamespace("arrow", quietly = TRUE)) { # There looks no way to send each batch in streaming format via socket # connection. See ARROW-4512. # So, it writes the whole Arrow streaming-formatted binary at once for now. - writeRaw(conn, write_arrow(df, raw())) + writeRaw(conn, arrow::write_arrow(df, raw())) } else { stop("'arrow' package should be installed.") } diff --git a/R/pkg/inst/worker/worker.R b/R/pkg/inst/worker/worker.R index 80dc4ee634512..dfe69b7f4f1fb 100644 --- a/R/pkg/inst/worker/worker.R +++ b/R/pkg/inst/worker/worker.R @@ -50,7 +50,7 @@ compute <- function(mode, partition, serializer, deserializer, key, } else { # Check to see if inputData is a valid data.frame stopifnot(deserializer == "byte" || deserializer == "arrow") - stopifnot(class(inputData) == "data.frame") + stopifnot(is.data.frame(inputData)) } if (mode == 2) { diff --git a/R/pkg/tests/fulltests/test_sparkSQL_arrow.R b/R/pkg/tests/fulltests/test_sparkSQL_arrow.R index 25a6d3c6ce36e..4188dbaa4ff0e 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL_arrow.R +++ b/R/pkg/tests/fulltests/test_sparkSQL_arrow.R @@ -101,7 +101,7 @@ test_that("dapply() Arrow optimization", { tryCatch({ ret <- dapply(df, function(rdf) { - stopifnot(class(rdf) == "data.frame") + stopifnot(is.data.frame(rdf)) rdf }, schema(df)) @@ -115,7 +115,7 @@ test_that("dapply() Arrow optimization", { tryCatch({ ret <- dapply(df, function(rdf) { - stopifnot(class(rdf) == "data.frame") + stopifnot(is.data.frame(rdf)) # mtcars' hp is more then 50. stopifnot(all(rdf$hp > 50)) rdf @@ -199,7 +199,7 @@ test_that("gapply() Arrow optimization", { if (length(key) > 0) { stopifnot(is.numeric(key[[1]])) } - stopifnot(class(grouped) == "data.frame") + stopifnot(is.data.frame(grouped)) grouped }, schema(df)) @@ -217,7 +217,7 @@ test_that("gapply() Arrow optimization", { if (length(key) > 0) { stopifnot(is.numeric(key[[1]])) } - stopifnot(class(grouped) == "data.frame") + stopifnot(is.data.frame(grouped)) stopifnot(length(colnames(grouped)) == 11) # mtcars' hp is more then 50. stopifnot(all(grouped$hp > 50)) diff --git a/appveyor.yml b/appveyor.yml index 7fb45745a036f..be03763f2c50c 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -42,10 +42,10 @@ install: # Install maven and dependencies - ps: .\dev\appveyor-install-dependencies.ps1 # Required package for R unit tests - - cmd: R -e "install.packages(c('knitr', 'rmarkdown', 'devtools', 'e1071', 'survival'), repos='http://cran.us.r-project.org')" + - cmd: R -e "install.packages(c('knitr', 'rmarkdown', 'devtools', 'e1071', 'survival', 'arrow'), repos='https://cloud.r-project.org/')" # Here, we use the fixed version of testthat. For more details, please see SPARK-22817. - cmd: R -e "devtools::install_version('testthat', version = '1.0.2', repos='http://cran.us.r-project.org')" - - cmd: R -e "packageVersion('knitr'); packageVersion('rmarkdown'); packageVersion('testthat'); packageVersion('e1071'); packageVersion('survival')" + - cmd: R -e "packageVersion('knitr'); packageVersion('rmarkdown'); packageVersion('testthat'); packageVersion('e1071'); packageVersion('survival'), packageVersion('arrow')" build_script: - cmd: mvn -DskipTests -Psparkr -Phive package diff --git a/docs/sparkr.md b/docs/sparkr.md index 6cb4e42247c30..f4ae25f235210 100644 --- a/docs/sparkr.md +++ b/docs/sparkr.md @@ -648,13 +648,20 @@ Apache Arrow is an in-memory columnar data format that is used in Spark to effic ## Ensure Arrow Installed -Currently, Arrow R library is not on CRAN yet [ARROW-3204](https://issues.apache.org/jira/browse/ARROW-3204). Therefore, it should be installed directly from Github. You can use `remotes::install_github` as below. +Arrow R library is available on CRAN as of [ARROW-3204](https://issues.apache.org/jira/browse/ARROW-3204). It can be installed as below. ```bash -Rscript -e 'remotes::install_github("apache/arrow@TAG", subdir = "r")' +Rscript -e 'install.packages("arrow", repos="https://cloud.r-project.org/")' ``` -`TAG` is a version tag that can be checked in [Arrow at Github](https://github.com/apache/arrow/releases). You must ensure that Arrow R package is installed and available on all cluster nodes. The current supported version is 0.12.1. +If you need to install old versions, it should be installed directly from Github. You can use `remotes::install_github` as below. + +```bash +Rscript -e 'remotes::install_github("apache/arrow@apache-arrow-0.12.1", subdir = "r")' +``` + +`apache-arrow-0.12.1` is a version tag that can be checked in [Arrow at Github](https://github.com/apache/arrow/releases). You must ensure that Arrow R package is installed and available on all cluster nodes. +The current supported minimum version is 0.12.1; however, this might change between the minor releases since Arrow optimization in SparkR is experimental. ## Enabling for Conversion to/from R DataFrame, `dapply` and `gapply`