Skip to content

Commit

Permalink
[SPARK-17471][ML] Add compressed method to ML matrices
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This patch adds a `compressed` method to ML `Matrix` class, which returns the minimal storage representation of the matrix - either sparse or dense. Because the space occupied by a sparse matrix is dependent upon its layout (i.e. column major or row major), this method must consider both cases. It may also be useful to force the layout to be column or row major beforehand, so an overload is added which takes in a `columnMajor: Boolean` parameter.

The compressed implementation relies upon two new abstract methods `toDense(columnMajor: Boolean)` and `toSparse(columnMajor: Boolean)`, similar to the compressed method implemented in the `Vector` class. These methods also allow the layout of the resulting matrix to be specified via the `columnMajor` parameter. More detail on the new methods is given below.
## How was this patch tested?

Added many new unit tests
## New methods (summary, not exhaustive list)

**Matrix trait**
- `private[ml] def toDenseMatrix(columnMajor: Boolean): DenseMatrix` (abstract) - converts the matrix (either sparse or dense) to dense format
- `private[ml] def toSparseMatrix(columnMajor: Boolean): SparseMatrix` (abstract) -  converts the matrix (either sparse or dense) to sparse format
- `def toDense: DenseMatrix = toDense(true)`  - converts the matrix (either sparse or dense) to dense format in column major layout
- `def toSparse: SparseMatrix = toSparse(true)` -  converts the matrix (either sparse or dense) to sparse format in column major layout
- `def compressed: Matrix` - finds the minimum space representation of this matrix, considering both column and row major layouts, and converts it
- `def compressed(columnMajor: Boolean): Matrix` - finds the minimum space representation of this matrix considering only column OR row major, and converts it

**DenseMatrix class**
- `private[ml] def toDenseMatrix(columnMajor: Boolean): DenseMatrix` - converts the dense matrix to a dense matrix, optionally changing the layout (data is NOT duplicated if the layouts are the same)
- `private[ml] def toSparseMatrix(columnMajor: Boolean): SparseMatrix` - converts the dense matrix to sparse matrix, using the specified layout

**SparseMatrix class**
- `private[ml] def toDenseMatrix(columnMajor: Boolean): DenseMatrix` - converts the sparse matrix to a dense matrix, using the specified layout
- `private[ml] def toSparseMatrix(columnMajors: Boolean): SparseMatrix` - converts the sparse matrix to sparse matrix. If the sparse matrix contains any explicit zeros, they are removed. If the layout requested does not match the current layout, data is copied to a new representation. If the layouts match and no explicit zeros exist, the current matrix is returned.

Author: sethah <[email protected]>

Closes apache#15628 from sethah/matrix_compress.
  • Loading branch information
sethah authored and dbtsai committed Mar 24, 2017
1 parent 707e501 commit e8810b7
Show file tree
Hide file tree
Showing 4 changed files with 673 additions and 46 deletions.
274 changes: 242 additions & 32 deletions mllib-local/src/main/scala/org/apache/spark/ml/linalg/Matrices.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ sealed trait Matrix extends Serializable {
@Since("2.0.0")
val isTransposed: Boolean = false

/** Indicates whether the values backing this matrix are arranged in column major order. */
private[ml] def isColMajor: Boolean = !isTransposed

/** Indicates whether the values backing this matrix are arranged in row major order. */
private[ml] def isRowMajor: Boolean = isTransposed

/** Converts to a dense array in column major. */
@Since("2.0.0")
def toArray: Array[Double] = {
Expand Down Expand Up @@ -148,7 +154,8 @@ sealed trait Matrix extends Serializable {
* and column indices respectively with the type `Int`, and the final parameter is the
* corresponding value in the matrix with type `Double`.
*/
private[spark] def foreachActive(f: (Int, Int, Double) => Unit)
@Since("2.2.0")
def foreachActive(f: (Int, Int, Double) => Unit): Unit

/**
* Find the number of non-zero active values.
Expand All @@ -161,6 +168,116 @@ sealed trait Matrix extends Serializable {
*/
@Since("2.0.0")
def numActives: Int

/**
* Converts this matrix to a sparse matrix.
*
* @param colMajor Whether the values of the resulting sparse matrix should be in column major
* or row major order. If `false`, resulting matrix will be row major.
*/
private[ml] def toSparseMatrix(colMajor: Boolean): SparseMatrix

/**
* Converts this matrix to a sparse matrix in column major order.
*/
@Since("2.2.0")
def toSparseColMajor: SparseMatrix = toSparseMatrix(colMajor = true)

/**
* Converts this matrix to a sparse matrix in row major order.
*/
@Since("2.2.0")
def toSparseRowMajor: SparseMatrix = toSparseMatrix(colMajor = false)

/**
* Converts this matrix to a sparse matrix while maintaining the layout of the current matrix.
*/
@Since("2.2.0")
def toSparse: SparseMatrix = toSparseMatrix(colMajor = isColMajor)

/**
* Converts this matrix to a dense matrix.
*
* @param colMajor Whether the values of the resulting dense matrix should be in column major
* or row major order. If `false`, resulting matrix will be row major.
*/
private[ml] def toDenseMatrix(colMajor: Boolean): DenseMatrix

/**
* Converts this matrix to a dense matrix while maintaining the layout of the current matrix.
*/
@Since("2.2.0")
def toDense: DenseMatrix = toDenseMatrix(colMajor = isColMajor)

/**
* Converts this matrix to a dense matrix in row major order.
*/
@Since("2.2.0")
def toDenseRowMajor: DenseMatrix = toDenseMatrix(colMajor = false)

/**
* Converts this matrix to a dense matrix in column major order.
*/
@Since("2.2.0")
def toDenseColMajor: DenseMatrix = toDenseMatrix(colMajor = true)

/**
* Returns a matrix in dense or sparse column major format, whichever uses less storage.
*/
@Since("2.2.0")
def compressedColMajor: Matrix = {
if (getDenseSizeInBytes <= getSparseSizeInBytes(colMajor = true)) {
this.toDenseColMajor
} else {
this.toSparseColMajor
}
}

/**
* Returns a matrix in dense or sparse row major format, whichever uses less storage.
*/
@Since("2.2.0")
def compressedRowMajor: Matrix = {
if (getDenseSizeInBytes <= getSparseSizeInBytes(colMajor = false)) {
this.toDenseRowMajor
} else {
this.toSparseRowMajor
}
}

/**
* Returns a matrix in dense column major, dense row major, sparse row major, or sparse column
* major format, whichever uses less storage. When dense representation is optimal, it maintains
* the current layout order.
*/
@Since("2.2.0")
def compressed: Matrix = {
val cscSize = getSparseSizeInBytes(colMajor = true)
val csrSize = getSparseSizeInBytes(colMajor = false)
if (getDenseSizeInBytes <= math.min(cscSize, csrSize)) {
// dense matrix size is the same for column major and row major, so maintain current layout
this.toDense
} else if (cscSize <= csrSize) {
this.toSparseColMajor
} else {
this.toSparseRowMajor
}
}

/** Gets the size of the dense representation of this `Matrix`. */
private[ml] def getDenseSizeInBytes: Long = {
Matrices.getDenseSize(numCols, numRows)
}

/** Gets the size of the minimal sparse representation of this `Matrix`. */
private[ml] def getSparseSizeInBytes(colMajor: Boolean): Long = {
val nnz = numNonzeros
val numPtrs = if (colMajor) numCols + 1L else numRows + 1L
Matrices.getSparseSize(nnz, numPtrs)
}

/** Gets the current size in bytes of this `Matrix`. Useful for testing */
private[ml] def getSizeInBytes: Long
}

/**
Expand Down Expand Up @@ -258,7 +375,7 @@ class DenseMatrix @Since("2.0.0") (

override def transpose: DenseMatrix = new DenseMatrix(numCols, numRows, values, !isTransposed)

private[spark] override def foreachActive(f: (Int, Int, Double) => Unit): Unit = {
override def foreachActive(f: (Int, Int, Double) => Unit): Unit = {
if (!isTransposed) {
// outer loop over columns
var j = 0
Expand Down Expand Up @@ -291,31 +408,49 @@ class DenseMatrix @Since("2.0.0") (
override def numActives: Int = values.length

/**
* Generate a `SparseMatrix` from the given `DenseMatrix`. The new matrix will have isTransposed
* set to false.
* Generate a `SparseMatrix` from the given `DenseMatrix`.
*
* @param colMajor Whether the resulting `SparseMatrix` values will be in column major order.
*/
@Since("2.0.0")
def toSparse: SparseMatrix = {
val spVals: MArrayBuilder[Double] = new MArrayBuilder.ofDouble
val colPtrs: Array[Int] = new Array[Int](numCols + 1)
val rowIndices: MArrayBuilder[Int] = new MArrayBuilder.ofInt
var nnz = 0
var j = 0
while (j < numCols) {
var i = 0
while (i < numRows) {
val v = values(index(i, j))
if (v != 0.0) {
rowIndices += i
spVals += v
nnz += 1
private[ml] override def toSparseMatrix(colMajor: Boolean): SparseMatrix = {
if (!colMajor) this.transpose.toSparseColMajor.transpose
else {
val spVals: MArrayBuilder[Double] = new MArrayBuilder.ofDouble
val colPtrs: Array[Int] = new Array[Int](numCols + 1)
val rowIndices: MArrayBuilder[Int] = new MArrayBuilder.ofInt
var nnz = 0
var j = 0
while (j < numCols) {
var i = 0
while (i < numRows) {
val v = values(index(i, j))
if (v != 0.0) {
rowIndices += i
spVals += v
nnz += 1
}
i += 1
}
i += 1
j += 1
colPtrs(j) = nnz
}
j += 1
colPtrs(j) = nnz
new SparseMatrix(numRows, numCols, colPtrs, rowIndices.result(), spVals.result())
}
}

/**
* Generate a `DenseMatrix` from this `DenseMatrix`.
*
* @param colMajor Whether the resulting `DenseMatrix` values will be in column major order.
*/
private[ml] override def toDenseMatrix(colMajor: Boolean): DenseMatrix = {
if (isRowMajor && colMajor) {
new DenseMatrix(numRows, numCols, this.toArray, isTransposed = false)
} else if (isColMajor && !colMajor) {
new DenseMatrix(numRows, numCols, this.transpose.toArray, isTransposed = true)
} else {
this
}
new SparseMatrix(numRows, numCols, colPtrs, rowIndices.result(), spVals.result())
}

override def colIter: Iterator[Vector] = {
Expand All @@ -331,6 +466,8 @@ class DenseMatrix @Since("2.0.0") (
}
}
}

private[ml] def getSizeInBytes: Long = Matrices.getDenseSize(numCols, numRows)
}

/**
Expand Down Expand Up @@ -560,7 +697,7 @@ class SparseMatrix @Since("2.0.0") (
override def transpose: SparseMatrix =
new SparseMatrix(numCols, numRows, colPtrs, rowIndices, values, !isTransposed)

private[spark] override def foreachActive(f: (Int, Int, Double) => Unit): Unit = {
override def foreachActive(f: (Int, Int, Double) => Unit): Unit = {
if (!isTransposed) {
var j = 0
while (j < numCols) {
Expand All @@ -587,18 +724,67 @@ class SparseMatrix @Since("2.0.0") (
}
}

override def numNonzeros: Int = values.count(_ != 0)

override def numActives: Int = values.length

/**
* Generate a `DenseMatrix` from the given `SparseMatrix`. The new matrix will have isTransposed
* set to false.
* Generate a `SparseMatrix` from this `SparseMatrix`, removing explicit zero values if they
* exist.
*
* @param colMajor Whether or not the resulting `SparseMatrix` values are in column major
* order.
*/
@Since("2.0.0")
def toDense: DenseMatrix = {
new DenseMatrix(numRows, numCols, toArray)
private[ml] override def toSparseMatrix(colMajor: Boolean): SparseMatrix = {
if (isColMajor && !colMajor) {
// it is col major and we want row major, use breeze to remove explicit zeros
val breezeTransposed = asBreeze.asInstanceOf[BSM[Double]].t
Matrices.fromBreeze(breezeTransposed).transpose.asInstanceOf[SparseMatrix]
} else if (isRowMajor && colMajor) {
// it is row major and we want col major, use breeze to remove explicit zeros
val breezeTransposed = asBreeze.asInstanceOf[BSM[Double]]
Matrices.fromBreeze(breezeTransposed).asInstanceOf[SparseMatrix]
} else {
val nnz = numNonzeros
if (nnz != numActives) {
// remove explicit zeros
val rr = new Array[Int](nnz)
val vv = new Array[Double](nnz)
val numPtrs = if (isRowMajor) numRows else numCols
val cc = new Array[Int](numPtrs + 1)
var nzIdx = 0
var j = 0
while (j < numPtrs) {
var idx = colPtrs(j)
val idxEnd = colPtrs(j + 1)
cc(j) = nzIdx
while (idx < idxEnd) {
if (values(idx) != 0.0) {
vv(nzIdx) = values(idx)
rr(nzIdx) = rowIndices(idx)
nzIdx += 1
}
idx += 1
}
j += 1
}
cc(j) = nnz
new SparseMatrix(numRows, numCols, cc, rr, vv, isTransposed = isTransposed)
} else {
this
}
}
}

override def numNonzeros: Int = values.count(_ != 0)

override def numActives: Int = values.length
/**
* Generate a `DenseMatrix` from the given `SparseMatrix`.
*
* @param colMajor Whether the resulting `DenseMatrix` values are in column major order.
*/
private[ml] override def toDenseMatrix(colMajor: Boolean): DenseMatrix = {
if (colMajor) new DenseMatrix(numRows, numCols, this.toArray)
else new DenseMatrix(numRows, numCols, this.transpose.toArray, isTransposed = true)
}

override def colIter: Iterator[Vector] = {
if (isTransposed) {
Expand Down Expand Up @@ -631,6 +817,8 @@ class SparseMatrix @Since("2.0.0") (
}
}
}

private[ml] def getSizeInBytes: Long = Matrices.getSparseSize(numActives, colPtrs.length)
}

/**
Expand Down Expand Up @@ -1079,4 +1267,26 @@ object Matrices {
SparseMatrix.fromCOO(numRows, numCols, entries)
}
}

private[ml] def getSparseSize(numActives: Long, numPtrs: Long): Long = {
/*
Sparse matrices store two int arrays, one double array, two ints, and one boolean:
8 * values.length + 4 * rowIndices.length + 4 * colPtrs.length + arrayHeader * 3 + 2 * 4 + 1
*/
val doubleBytes = java.lang.Double.BYTES
val intBytes = java.lang.Integer.BYTES
val arrayHeader = 12L
doubleBytes * numActives + intBytes * numActives + intBytes * numPtrs + arrayHeader * 3L + 9L
}

private[ml] def getDenseSize(numCols: Long, numRows: Long): Long = {
/*
Dense matrices store one double array, two ints, and one boolean:
8 * values.length + arrayHeader + 2 * 4 + 1
*/
val doubleBytes = java.lang.Double.BYTES
val arrayHeader = 12L
doubleBytes * numCols * numRows + arrayHeader + 9L
}

}
Loading

0 comments on commit e8810b7

Please sign in to comment.