Skip to content

Commit

Permalink
[SPARK-29339][R] Support Arrow 0.14 in vectoried dapply and gapply (t…
Browse files Browse the repository at this point in the history
…est it in AppVeyor build)

This PR proposes:

1. Use `is.data.frame` to check if it is a DataFrame.
2. to install Arrow and test Arrow optimization in AppVeyor build. We're currently not testing this in CI.

1. To support SparkR with Arrow 0.14
2. To check if there's any regression and if it works correctly.

```r
df <- createDataFrame(mtcars)
collect(dapply(df, function(rdf) { data.frame(rdf$gear + 1) }, structType("gear double")))
```

**Before:**

```
Error in readBin(con, raw(), as.integer(dataLen), endian = "big") :
  invalid 'n' argument
```

**After:**

```
   gear
1     5
2     5
3     5
4     4
5     4
6     4
7     4
8     5
9     5
...
```

AppVeyor

Closes apache#25993 from HyukjinKwon/arrow-r-appveyor.

Authored-by: HyukjinKwon <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
  • Loading branch information
HyukjinKwon authored and rshkv committed Jun 29, 2020
1 parent 768ae42 commit 5154b7b
Show file tree
Hide file tree
Showing 9 changed files with 31 additions and 39 deletions.
4 changes: 4 additions & 0 deletions R/check-cran.sh
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ fi

echo "Running CRAN check with $CRAN_CHECK_OPTIONS options"

# Remove this environment variable to allow to check suggested packages once
# Jenkins installs arrow. See SPARK-29339.
export _R_CHECK_FORCE_SUGGESTS_=FALSE

if [ -n "$NO_TESTS" ] && [ -n "$NO_MANUAL" ]
then
"$R_SCRIPT_PATH/R" CMD check $CRAN_CHECK_OPTIONS "SparkR_$VERSION.tar.gz"
Expand Down
3 changes: 2 additions & 1 deletion R/pkg/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ Suggests:
rmarkdown,
testthat,
e1071,
survival
survival,
arrow
Collate:
'schema.R'
'generics.R'
Expand Down
20 changes: 4 additions & 16 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -148,19 +148,7 @@ getDefaultSqlSource <- function() {
}

writeToFileInArrow <- function(fileName, rdf, numPartitions) {
requireNamespace1 <- requireNamespace

# R API in Arrow is not yet released in CRAN. CRAN requires to add the
# package in requireNamespace at DESCRIPTION. Later, CRAN checks if the package is available
# or not. Therefore, it works around by avoiding direct requireNamespace.
# Currently, as of Arrow 0.12.0, it can be installed by install_github. See ARROW-3204.
if (requireNamespace1("arrow", quietly = TRUE)) {
record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE)
RecordBatchStreamWriter <- get(
"RecordBatchStreamWriter", envir = asNamespace("arrow"), inherits = FALSE)
FileOutputStream <- get(
"FileOutputStream", envir = asNamespace("arrow"), inherits = FALSE)

if (requireNamespace("arrow", quietly = TRUE)) {
numPartitions <- if (!is.null(numPartitions)) {
numToInt(numPartitions)
} else {
Expand All @@ -176,11 +164,11 @@ writeToFileInArrow <- function(fileName, rdf, numPartitions) {
stream_writer <- NULL
tryCatch({
for (rdf_slice in rdf_slices) {
batch <- record_batch(rdf_slice)
batch <- arrow::record_batch(rdf_slice)
if (is.null(stream_writer)) {
stream <- FileOutputStream(fileName)
stream <- arrow::FileOutputStream(fileName)
schema <- batch$schema
stream_writer <- RecordBatchStreamWriter(stream, schema)
stream_writer <- arrow::RecordBatchStreamWriter(stream, schema)
}

stream_writer$write_batch(batch)
Expand Down
8 changes: 2 additions & 6 deletions R/pkg/R/deserialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,7 @@ readMultipleObjectsWithKeys <- function(inputCon) {
}

readDeserializeInArrow <- function(inputCon) {
# This is a hack to avoid CRAN check. Arrow is not uploaded into CRAN now. See ARROW-3204.
requireNamespace1 <- requireNamespace
if (requireNamespace1("arrow", quietly = TRUE)) {
RecordBatchStreamReader <- get(
"RecordBatchStreamReader", envir = asNamespace("arrow"), inherits = FALSE)
if (requireNamespace("arrow", quietly = TRUE)) {
# Arrow drops `as_tibble` since 0.14.0, see ARROW-5190.
useAsTibble <- exists("as_tibble", envir = asNamespace("arrow"))

Expand All @@ -246,7 +242,7 @@ readDeserializeInArrow <- function(inputCon) {
# for now.
dataLen <- readInt(inputCon)
arrowData <- readBin(inputCon, raw(), as.integer(dataLen), endian = "big")
batches <- RecordBatchStreamReader(arrowData)$batches()
batches <- arrow::RecordBatchStreamReader(arrowData)$batches()

if (useAsTibble) {
as_tibble <- get("as_tibble", envir = asNamespace("arrow"))
Expand Down
8 changes: 2 additions & 6 deletions R/pkg/R/serialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -222,15 +222,11 @@ writeArgs <- function(con, args) {
}

writeSerializeInArrow <- function(conn, df) {
# This is a hack to avoid CRAN check. Arrow is not uploaded into CRAN now. See ARROW-3204.
requireNamespace1 <- requireNamespace
if (requireNamespace1("arrow", quietly = TRUE)) {
write_arrow <- get("write_arrow", envir = asNamespace("arrow"), inherits = FALSE)

if (requireNamespace("arrow", quietly = TRUE)) {
# There looks no way to send each batch in streaming format via socket
# connection. See ARROW-4512.
# So, it writes the whole Arrow streaming-formatted binary at once for now.
writeRaw(conn, write_arrow(df, raw()))
writeRaw(conn, arrow::write_arrow(df, raw()))
} else {
stop("'arrow' package should be installed.")
}
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/worker/worker.R
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ compute <- function(mode, partition, serializer, deserializer, key,
} else {
# Check to see if inputData is a valid data.frame
stopifnot(deserializer == "byte" || deserializer == "arrow")
stopifnot(class(inputData) == "data.frame")
stopifnot(is.data.frame(inputData))
}

if (mode == 2) {
Expand Down
8 changes: 4 additions & 4 deletions R/pkg/tests/fulltests/test_sparkSQL_arrow.R
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ test_that("dapply() Arrow optimization", {
tryCatch({
ret <- dapply(df,
function(rdf) {
stopifnot(class(rdf) == "data.frame")
stopifnot(is.data.frame(rdf))
rdf
},
schema(df))
Expand All @@ -115,7 +115,7 @@ test_that("dapply() Arrow optimization", {
tryCatch({
ret <- dapply(df,
function(rdf) {
stopifnot(class(rdf) == "data.frame")
stopifnot(is.data.frame(rdf))
# mtcars' hp is more then 50.
stopifnot(all(rdf$hp > 50))
rdf
Expand Down Expand Up @@ -199,7 +199,7 @@ test_that("gapply() Arrow optimization", {
if (length(key) > 0) {
stopifnot(is.numeric(key[[1]]))
}
stopifnot(class(grouped) == "data.frame")
stopifnot(is.data.frame(grouped))
grouped
},
schema(df))
Expand All @@ -217,7 +217,7 @@ test_that("gapply() Arrow optimization", {
if (length(key) > 0) {
stopifnot(is.numeric(key[[1]]))
}
stopifnot(class(grouped) == "data.frame")
stopifnot(is.data.frame(grouped))
stopifnot(length(colnames(grouped)) == 11)
# mtcars' hp is more then 50.
stopifnot(all(grouped$hp > 50))
Expand Down
4 changes: 2 additions & 2 deletions appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ install:
# Install maven and dependencies
- ps: .\dev\appveyor-install-dependencies.ps1
# Required package for R unit tests
- cmd: R -e "install.packages(c('knitr', 'rmarkdown', 'devtools', 'e1071', 'survival'), repos='http://cran.us.r-project.org')"
- cmd: R -e "install.packages(c('knitr', 'rmarkdown', 'devtools', 'e1071', 'survival', 'arrow'), repos='https://cloud.r-project.org/')"
# Here, we use the fixed version of testthat. For more details, please see SPARK-22817.
- cmd: R -e "devtools::install_version('testthat', version = '1.0.2', repos='http://cran.us.r-project.org')"
- cmd: R -e "packageVersion('knitr'); packageVersion('rmarkdown'); packageVersion('testthat'); packageVersion('e1071'); packageVersion('survival')"
- cmd: R -e "packageVersion('knitr'); packageVersion('rmarkdown'); packageVersion('testthat'); packageVersion('e1071'); packageVersion('survival'), packageVersion('arrow')"

build_script:
- cmd: mvn -DskipTests -Psparkr -Phive package
Expand Down
13 changes: 10 additions & 3 deletions docs/sparkr.md
Original file line number Diff line number Diff line change
Expand Up @@ -648,13 +648,20 @@ Apache Arrow is an in-memory columnar data format that is used in Spark to effic

## Ensure Arrow Installed

Currently, Arrow R library is not on CRAN yet [ARROW-3204](https://issues.apache.org/jira/browse/ARROW-3204). Therefore, it should be installed directly from Github. You can use `remotes::install_github` as below.
Arrow R library is available on CRAN as of [ARROW-3204](https://issues.apache.org/jira/browse/ARROW-3204). It can be installed as below.

```bash
Rscript -e 'remotes::install_github("apache/arrow@TAG", subdir = "r")'
Rscript -e 'install.packages("arrow", repos="https://cloud.r-project.org/")'
```

`TAG` is a version tag that can be checked in [Arrow at Github](https://github.com/apache/arrow/releases). You must ensure that Arrow R package is installed and available on all cluster nodes. The current supported version is 0.12.1.
If you need to install old versions, it should be installed directly from Github. You can use `remotes::install_github` as below.

```bash
Rscript -e 'remotes::install_github("apache/[email protected]", subdir = "r")'
```

`apache-arrow-0.12.1` is a version tag that can be checked in [Arrow at Github](https://github.com/apache/arrow/releases). You must ensure that Arrow R package is installed and available on all cluster nodes.
The current supported minimum version is 0.12.1; however, this might change between the minor releases since Arrow optimization in SparkR is experimental.

## Enabling for Conversion to/from R DataFrame, `dapply` and `gapply`

Expand Down

0 comments on commit 5154b7b

Please sign in to comment.