Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/main/java/org/apache/sysds/common/Builtins.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public enum Builtins {
GARCH("garch", true),
GAUSSIAN_CLASSIFIER("gaussianClassifier", true),
GET_ACCURACY("getAccuracy", true),
GET_CATEGORICAL_MASK("getCategoricalMask", false),
GLM("glm", true),
GLM_PREDICT("glmPredict", true),
GLOVE("glove", true),
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/apache/sysds/common/Opcodes.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ public enum Opcodes {
TRANSFORMMETA("transformmeta", InstructionType.ParameterizedBuiltin),
TRANSFORMENCODE("transformencode", InstructionType.MultiReturnParameterizedBuiltin, InstructionType.MultiReturnBuiltin),

GET_CATEGORICAL_MASK("get_categorical_mask", InstructionType.Binary),

//Ternary instruction opcodes
PM("+*", InstructionType.Ternary),
MINUSMULT("-*", InstructionType.Ternary),
Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/apache/sysds/common/Types.java
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,7 @@ public enum OpOp2 {
MINUS_NZ(false), //sparse-safe minus: X-(mean*ppred(X,0,!=))
LOG_NZ(false), //sparse-safe log; ppred(X,0,"!=")*log(X,0.5)
MINUS1_MULT(false), //1-X*Y
GET_CATEGORICAL_MASK(false), // get transformation mask
QUANTIZE_COMPRESS(false), //quantization-fused compression
UNION_DISTINCT(false);

Expand Down
41 changes: 27 additions & 14 deletions src/main/java/org/apache/sysds/hops/BinaryOp.java
Original file line number Diff line number Diff line change
Expand Up @@ -763,8 +763,8 @@ protected ExecType optFindExecType(boolean transitive) {

checkAndSetForcedPlatform();

DataType dt1 = getInput().get(0).getDataType();
DataType dt2 = getInput().get(1).getDataType();
final DataType dt1 = getInput(0).getDataType();
final DataType dt2 = getInput(1).getDataType();

if( _etypeForced != null ) {
setExecType(_etypeForced);
Expand Down Expand Up @@ -812,18 +812,28 @@ else if ( dt1 == DataType.SCALAR && dt2 == DataType.MATRIX ) {
checkAndSetInvalidCPDimsAndSize();
}

//spark-specific decision refinement (execute unary scalar w/ spark input and
// spark-specific decision refinement (execute unary scalar w/ spark input and
// single parent also in spark because it's likely cheap and reduces intermediates)
if(transitive && _etype == ExecType.CP && _etypeForced != ExecType.CP && _etypeForced != ExecType.FED &&
getDataType().isMatrix() // output should be a matrix
&& (dt1.isScalar() || dt2.isScalar()) // one side should be scalar
&& supportsMatrixScalarOperations() // scalar operations
&& !(getInput().get(dt1.isScalar() ? 1 : 0) instanceof DataOp) // input is not checkpoint
&& getInput().get(dt1.isScalar() ? 1 : 0).getParent().size() == 1 // unary scalar is only parent
&& !HopRewriteUtils.isSingleBlock(getInput().get(dt1.isScalar() ? 1 : 0)) // single block triggered exec
&& getInput().get(dt1.isScalar() ? 1 : 0).optFindExecType() == ExecType.SPARK) {
// pull unary scalar operation into spark
_etype = ExecType.SPARK;
if(transitive // we allow transitive Spark operations. continue sequences of spark operations
&& _etype == ExecType.CP // The instruction is currently in CP
&& _etypeForced != ExecType.CP // not forced CP
&& _etypeForced != ExecType.FED // not federated
&& (getDataType().isMatrix() || getDataType().isFrame()) // output should be a matrix or frame
) {
final boolean v1 = getInput(0).isScalarOrVectorBellowBlockSize();
final boolean v2 = getInput(1).isScalarOrVectorBellowBlockSize();
final boolean left = v1 == true; // left side is the vector or scalar
final Hop sparkIn = getInput(left ? 1 : 0);
if((v1 ^ v2) // XOR only one side is allowed to be a vector or a scalar.
&& (supportsMatrixScalarOperations() || op == OpOp2.APPLY_SCHEMA) // supported operation
&& sparkIn.getParent().size() == 1 // only one parent
&& !HopRewriteUtils.isSingleBlock(sparkIn) // single block triggered exec
&& sparkIn.optFindExecType() == ExecType.SPARK // input was spark op.
&& !(sparkIn instanceof DataOp) // input is not checkpoint
) {
// pull operation into spark
_etype = ExecType.SPARK;
}
}

if( OptimizerUtils.ALLOW_BINARY_UPDATE_IN_PLACE &&
Expand Down Expand Up @@ -853,7 +863,10 @@ else if( (op == OpOp2.CBIND && getDataType().isList())
|| (op == OpOp2.RBIND && getDataType().isList())) {
_etype = ExecType.CP;
}


if( op == OpOp2.GET_CATEGORICAL_MASK)
_etype = ExecType.CP;

//mark for recompile (forever)
setRequiresRecompileIfNecessary();

Expand Down
11 changes: 11 additions & 0 deletions src/main/java/org/apache/sysds/hops/Hop.java
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,12 @@ public final String toString() {
// ========================================================================================


protected boolean isScalarOrVectorBellowBlockSize(){
return getDataType().isScalar() || (dimsKnown() &&
(( _dc.getRows() == 1 && _dc.getCols() < ConfigurationManager.getBlocksize())
|| _dc.getCols() == 1 && _dc.getRows() < ConfigurationManager.getBlocksize()));
}

protected boolean isVector() {
return (dimsKnown() && (_dc.getRows() == 1 || _dc.getCols() == 1) );
}
Expand Down Expand Up @@ -1629,6 +1635,11 @@ protected void setMemoryAndComputeEstimates(Lop lop) {
lop.setComputeEstimate(ComputeCost.getHOPComputeCost(this));
}

protected boolean hasSparkOutput(){
return (this.optFindExecType() == ExecType.SPARK
|| (this instanceof DataOp && ((DataOp)this).hasOnlyRDD()));
}

/**
* Set parse information.
*
Expand Down
34 changes: 24 additions & 10 deletions src/main/java/org/apache/sysds/hops/UnaryOp.java
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,11 @@ protected double computeOutputMemEstimate( long dim1, long dim2, long nnz )
} else {
sparsity = OptimizerUtils.getSparsity(dim1, dim2, nnz);
}
return OptimizerUtils.estimateSizeExactSparsity(dim1, dim2, sparsity, getDataType());

if(getDataType() == DataType.FRAME)
return OptimizerUtils.estimateSizeExactFrame(dim1, dim2);
else
return OptimizerUtils.estimateSizeExactSparsity(dim1, dim2, sparsity);
}

@Override
Expand Down Expand Up @@ -463,6 +467,13 @@ public boolean isMetadataOperation() {
|| _op == OpOp1.CAST_AS_LIST;
}

private boolean isDisallowedSparkOps(){
return isCumulativeUnaryOperation()
|| isCastUnaryOperation()
|| _op==OpOp1.MEDIAN
|| _op==OpOp1.IQM;
}

@Override
protected ExecType optFindExecType(boolean transitive)
{
Expand Down Expand Up @@ -493,19 +504,22 @@ else if ( getInput().get(0).areDimsBelowThreshold() || getInput().get(0).isVecto
checkAndSetInvalidCPDimsAndSize();
}


//spark-specific decision refinement (execute unary w/ spark input and
//single parent also in spark because it's likely cheap and reduces intermediates)
if( _etype == ExecType.CP && _etypeForced != ExecType.CP
&& getInput().get(0).optFindExecType() == ExecType.SPARK
&& getDataType().isMatrix()
&& !isCumulativeUnaryOperation() && !isCastUnaryOperation()
&& _op!=OpOp1.MEDIAN && _op!=OpOp1.IQM
&& !(getInput().get(0) instanceof DataOp) //input is not checkpoint
&& getInput().get(0).getParent().size()==1 ) //unary is only parent
{
if(_etype == ExecType.CP // currently CP instruction
&& _etype != ExecType.SPARK /// currently not SP.
&& _etypeForced != ExecType.CP // not forced as CP instruction
&& getInput(0).hasSparkOutput() // input is a spark instruction
&& (getDataType().isMatrix() || getDataType().isFrame()) // output is a matrix or frame
&& !isDisallowedSparkOps() // is invalid spark instruction
// && !(getInput().get(0) instanceof DataOp) // input is not checkpoint
// && getInput(0).getParent().size() <= 1// unary is only parent
) {
//pull unary operation into spark
_etype = ExecType.SPARK;
}


//mark for recompile (forever)
setRequiresRecompileIfNecessary();
Expand All @@ -520,7 +534,7 @@ && getInput().get(0).getParent().size()==1 ) //unary is only parent
} else {
setRequiresRecompileIfNecessary();
}

return _etype;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2018,6 +2018,15 @@ else if(this.getOpCode() == Builtins.MAX_POOL || this.getOpCode() == Builtins.AV
else
raiseValidateError("The compress or decompress instruction is not allowed in dml scripts");
break;
case GET_CATEGORICAL_MASK:
checkNumParameters(2);
checkFrameParam(getFirstExpr());
checkScalarParam(getSecondExpr());
output.setDataType(DataType.MATRIX);
output.setDimensions(1, -1);
output.setBlocksize( id.getBlocksize());
output.setValueType(ValueType.FP64);
break;
case QUANTIZE_COMPRESS:
if(OptimizerUtils.ALLOW_SCRIPT_LEVEL_QUANTIZE_COMPRESS_COMMAND) {
checkNumParameters(2);
Expand Down Expand Up @@ -2383,6 +2392,13 @@ protected void checkMatrixFrameParam(Expression e) { //always unconditional
raiseValidateError("Expecting matrix or frame parameter for function "+ getOpCode(), false, LanguageErrorCodes.UNSUPPORTED_PARAMETERS);
}
}

protected void checkFrameParam(Expression e) {
if(e.getOutput().getDataType() != DataType.FRAME) {
raiseValidateError("Expecting frame parameter for function " + getOpCode(), false,
LanguageErrorCodes.UNSUPPORTED_PARAMETERS);
}
}

protected void checkMatrixScalarParam(Expression e) { //always unconditional
if (e.getOutput().getDataType() != DataType.MATRIX && e.getOutput().getDataType() != DataType.SCALAR) {
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/org/apache/sysds/parser/DMLTranslator.java
Original file line number Diff line number Diff line change
Expand Up @@ -2821,6 +2821,9 @@ else if ( in.length == 2 )
DataType.MATRIX, target.getValueType(), AggOp.COUNT_DISTINCT, Direction.Col, expr);
break;

case GET_CATEGORICAL_MASK:
currBuiltinOp = new BinaryOp(target.getName(), DataType.MATRIX, ValueType.FP64, OpOp2.GET_CATEGORICAL_MASK, expr, expr2);
break;
default:
throw new ParseException("Unsupported builtin function type: "+source.getOpCode());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,14 @@
import org.apache.sysds.runtime.compress.lib.CLALibMMChain;
import org.apache.sysds.runtime.compress.lib.CLALibMatrixMult;
import org.apache.sysds.runtime.compress.lib.CLALibMerge;
import org.apache.sysds.runtime.compress.lib.CLALibReplace;
import org.apache.sysds.runtime.compress.lib.CLALibRemoveEmpty;
import org.apache.sysds.runtime.compress.lib.CLALibReorg;
import org.apache.sysds.runtime.compress.lib.CLALibReplace;
import org.apache.sysds.runtime.compress.lib.CLALibReshape;
import org.apache.sysds.runtime.compress.lib.CLALibRexpand;
import org.apache.sysds.runtime.compress.lib.CLALibScalar;
import org.apache.sysds.runtime.compress.lib.CLALibSlice;
import org.apache.sysds.runtime.compress.lib.CLALibSort;
import org.apache.sysds.runtime.compress.lib.CLALibSquash;
import org.apache.sysds.runtime.compress.lib.CLALibTSMM;
import org.apache.sysds.runtime.compress.lib.CLALibTernaryOp;
Expand Down Expand Up @@ -101,6 +103,7 @@
import org.apache.sysds.runtime.util.IndexRange;
import org.apache.sysds.utils.DMLCompressionStatistics;
import org.apache.sysds.utils.stats.InfrastructureAnalyzer;
import org.apache.sysds.utils.stats.Timing;

public class CompressedMatrixBlock extends MatrixBlock {
private static final Log LOG = LogFactory.getLog(CompressedMatrixBlock.class.getName());
Expand Down Expand Up @@ -475,16 +478,20 @@ public void readFields(DataInput in) throws IOException {
}

public static CompressedMatrixBlock read(DataInput in) throws IOException {
Timing t = new Timing();
int rlen = in.readInt();
int clen = in.readInt();
long nonZeros = in.readLong();
boolean overlappingColGroups = in.readBoolean();
List<AColGroup> groups = ColGroupIO.readGroups(in, rlen);
return new CompressedMatrixBlock(rlen, clen, nonZeros, overlappingColGroups, groups);
CompressedMatrixBlock ret = new CompressedMatrixBlock(rlen, clen, nonZeros, overlappingColGroups, groups);
LOG.debug("Compressed read serialization time: " + t.stop());
return ret;
}

@Override
public void write(DataOutput out) throws IOException {
Timing t = new Timing();
final long estimateUncompressed = nonZeros > 0 ? MatrixBlock.estimateSizeOnDisk(rlen, clen,
nonZeros) : Long.MAX_VALUE;
final long estDisk = nonZeros > 0 ? getExactSizeOnDisk() : Long.MAX_VALUE;
Expand Down Expand Up @@ -512,6 +519,7 @@ public void write(DataOutput out) throws IOException {
out.writeLong(nonZeros);
out.writeBoolean(overlappingColGroups);
ColGroupIO.writeGroups(out, _colGroups);
LOG.debug("Compressed write serialization time: " + t.stop());
}

/**
Expand Down Expand Up @@ -611,14 +619,6 @@ public MatrixBlock aggregateUnaryOperations(AggregateUnaryOperator op, MatrixVal
public MatrixBlock transposeSelfMatrixMultOperations(MatrixBlock out, MMTSJType tstype, int k) {
// check for transpose type
if(tstype == MMTSJType.LEFT) {
if(isEmpty())
return new MatrixBlock(clen, clen, true);
// create output matrix block
if(out == null)
out = new MatrixBlock(clen, clen, false);
else
out.reset(clen, clen, false);
out.allocateDenseBlock();
CLALibTSMM.leftMultByTransposeSelf(this, out, k);
return out;
}
Expand Down Expand Up @@ -846,9 +846,8 @@ public CM_COV_Object covOperations(COVOperator op, MatrixBlock that, MatrixBlock
}

@Override
public MatrixBlock sortOperations(MatrixValue weights, MatrixBlock result) {
MatrixBlock right = getUncompressed(weights);
return getUncompressed("sortOperations").sortOperations(right, result);
public MatrixBlock sortOperations(MatrixValue weights, MatrixBlock result, int k) {
return CLALibSort.sort(this, weights, result, k);
}

@Override
Expand All @@ -871,9 +870,7 @@ public MatrixBlock groupedAggOperations(MatrixValue tgt, MatrixValue wghts, Matr

@Override
public MatrixBlock removeEmptyOperations(MatrixBlock ret, boolean rows, boolean emptyReturn, MatrixBlock select) {
printDecompressWarning("removeEmptyOperations");
MatrixBlock tmp = getUncompressed();
return tmp.removeEmptyOperations(ret, rows, emptyReturn, select);
return CLALibRemoveEmpty.rmempty(this, ret, rows, emptyReturn, select);
}

@Override
Expand Down Expand Up @@ -1202,8 +1199,8 @@ public void examSparsity(boolean allowCSR, int k) {
}

@Override
public void sparseToDense(int k) {
// do nothing
public MatrixBlock sparseToDense(int k) {
return this; // do nothing
}

@Override
Expand Down Expand Up @@ -1236,16 +1233,6 @@ public double interQuartileMean() {
return getUncompressed("interQuartileMean").interQuartileMean();
}

@Override
public MatrixBlock pickValues(MatrixValue quantiles, MatrixValue ret) {
return getUncompressed("pickValues").pickValues(quantiles, ret);
}

@Override
public double pickValue(double quantile, boolean average) {
return getUncompressed("pickValue").pickValue(quantile, average);
}

@Override
public double sumWeightForQuantile() {
return getUncompressed("sumWeightForQuantile").sumWeightForQuantile();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public class CompressedMatrixBlockFactory {

private static final Log LOG = LogFactory.getLog(CompressedMatrixBlockFactory.class.getName());

private static final Object asyncCompressLock = new Object();

/** Timing object to measure the time of each phase in the compression */
private final Timing time = new Timing(true);
/** Compression statistics gathered throughout the compression */
Expand Down Expand Up @@ -181,21 +183,23 @@ public static Future<Void> compressAsync(ExecutionContext ec, String varName) {
}

public static Future<Void> compressAsync(ExecutionContext ec, String varName, InstructionTypeCounter ins) {
LOG.debug("Compressing Async");
final ExecutorService pool = CommonThreadPool.get(); // We have to guarantee that a thread pool is allocated.
return CompletableFuture.runAsync(() -> {
// method call or code to be async
try {
CacheableData<?> data = ec.getCacheableData(varName);
if(data instanceof MatrixObject) {
MatrixObject mo = (MatrixObject) data;
MatrixBlock mb = mo.acquireReadAndRelease();
MatrixBlock mbc = CompressedMatrixBlockFactory.compress(mo.acquireReadAndRelease(), ins).getLeft();
if(mbc instanceof CompressedMatrixBlock) {
ExecutionContext.createCacheableData(mb);
mo.acquireModify(mbc);
mo.release();
mbc.sum(); // calculate sum to forcefully materialize counts
synchronized(asyncCompressLock){ // synchronize on the data object to not allow multiple compressions of the same matrix.
if(data instanceof MatrixObject) {
LOG.debug("Compressing Async");
MatrixObject mo = (MatrixObject) data;
MatrixBlock mb = mo.acquireReadAndRelease();
MatrixBlock mbc = CompressedMatrixBlockFactory.compress(mb, ins).getLeft();
if(mbc instanceof CompressedMatrixBlock) {
ExecutionContext.createCacheableData(mb);
mo.acquireModify(mbc);
mo.release();
mbc.sum(); // calculate sum to forcefully materialize counts
}
}
}
}
Expand Down
Loading
Loading