diff --git a/scripts/builtin/scaleRobust.dml b/scripts/builtin/scaleRobust.dml new file mode 100644 index 00000000000..b2733f0e0f1 --- /dev/null +++ b/scripts/builtin/scaleRobust.dml @@ -0,0 +1,65 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +# Robust scaling using median and IQR (Interquartile Range) +# Resistant to outliers by centering with the median and scaling with IQR. +# +# INPUT: +# ------------------------------------------------------------------------------------- +# X Input feature matrix of shape n-by-m +# ------------------------------------------------------------------------------------- +# +# OUTPUT: +# ------------------------------------------------------------------------------------- +# Y Scaled output matrix of shape n-by-m +# med Column medians (Q2) of shape 1-by-m +# q1 Column first quantiles (Q1) of shape 1-by-m +# q3 Column first quantiles (Q3) of shape 1-by-m +# ------------------------------------------------------------------------------------- + +m_scaleRobust = function(Matrix[Double] X) + return (Matrix[Double] Y, Matrix[Double] med, Matrix[Double] q1, Matrix[Double] q3) +{ + n = nrow(X) + m = ncol(X) + + med = matrix(0.0, rows=1, cols=m) + q1 = matrix(0.0, rows=1, cols=m) + q3 = matrix(0.0, rows=1, cols=m) + + # Define quantile probabilities once, outside the loop + q_probs = matrix(0.0, rows=3, cols=1) + q_probs[1,1] = 0.25 + q_probs[2,1] = 0.5 + q_probs[3,1] = 0.75 + + + # Loop over columns to compute quantiles + parfor (j in 1:m) { + col_j = X[,j] # get column vector + q = quantile(col_j, q_probs) + med[1,j] = q[2,1] + q1[1,j] = q[1,1] + q3[1,j] = q[3,1] + } + + Y = scaleRobustApply(X, med, q1, q3); +} diff --git a/scripts/builtin/scaleRobustApply.dml b/scripts/builtin/scaleRobustApply.dml new file mode 100644 index 00000000000..d9986425f13 --- /dev/null +++ b/scripts/builtin/scaleRobustApply.dml @@ -0,0 +1,50 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +# Apply robust scaling using precomputed medians and IQRs +# +# INPUT: +# ------------------------------------------------------------------------------------- +# X Input feature matrix of shape n-by-m +# med Column medians (Q2) of shape 1-by-m +# q1 Column first quantiles (Q1) of shape 1-by-m +# q3 Column first quantiles (Q3) of shape 1-by-m +# ------------------------------------------------------------------------------------- +# +# OUTPUT: +# ------------------------------------------------------------------------------------- +# Y Scaled output matrix of shape n-by-m +# ------------------------------------------------------------------------------------- + +m_scaleRobustApply = function(Matrix[Double] X, Matrix[Double] med, Matrix[Double] q1, Matrix[Double] q3) + return (Matrix[Double] Y) +{ + + iqr = q3 - q1 + + # Ensure robust scaling is safe by replacing invalid IQRs + iqr = replace(target=iqr, pattern=0, replacement=1) + iqr = replace(target=iqr, pattern=NaN, replacement=1) + + # Apply robust transformation + Y = (X - med) / iqr + +} diff --git a/src/main/java/org/apache/sysds/api/DMLScript.java b/src/main/java/org/apache/sysds/api/DMLScript.java index d6853891e24..76735a1abe2 100644 --- a/src/main/java/org/apache/sysds/api/DMLScript.java +++ b/src/main/java/org/apache/sysds/api/DMLScript.java @@ -78,6 +78,7 @@ import org.apache.sysds.runtime.util.CommonThreadPool; import org.apache.sysds.runtime.util.HDFSTool; import org.apache.sysds.runtime.util.LocalFileUtils; +import org.apache.sysds.runtime.util.MemoryMonitor; import org.apache.sysds.utils.Explain; import org.apache.sysds.utils.Explain.ExplainCounts; import org.apache.sysds.utils.Explain.ExplainType; @@ -320,6 +321,9 @@ public static boolean executeScript( String[] args ) Map argVals = dmlOptions.argVals; DML_FILE_PATH_ANTLR_PARSER = dmlOptions.filePath; + + // Start memory monitor in a background thread + // new Thread(new MemoryMonitor()).start(); //Step 3: invoke dml script printInvocationInfo(fileOrScript, fnameOptConfig, argVals); diff --git a/src/main/java/org/apache/sysds/common/Builtins.java b/src/main/java/org/apache/sysds/common/Builtins.java index 423679d038c..cfd4560a108 100644 --- a/src/main/java/org/apache/sysds/common/Builtins.java +++ b/src/main/java/org/apache/sysds/common/Builtins.java @@ -390,6 +390,8 @@ public enum Builtins { RMEMPTY("removeEmpty", false, true), SCALE("scale", true, false), SCALEAPPLY("scaleApply", true, false), + SCALEROBUST("scaleRobust", true, false), + SCALEROBUSTAPPLY("scaleRobustApply", true, false), SCALE_MINMAX("scaleMinMax", true, false), TIME("time", false), TOKENIZE("tokenize", false, true), diff --git a/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinScaleRobustTest.java b/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinScaleRobustTest.java new file mode 100644 index 00000000000..9e482caf24b --- /dev/null +++ b/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinScaleRobustTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.functions.builtin.part2; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.util.HashMap; + +import org.junit.Test; + +import org.apache.sysds.common.Types.ExecMode; +import org.apache.sysds.common.Types.ExecType; +import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex; +import org.apache.sysds.test.AutomatedTestBase; +import org.apache.sysds.test.TestConfiguration; +import org.apache.sysds.test.TestUtils; + +public class BuiltinScaleRobustTest extends AutomatedTestBase { + private final static String TEST_NAME = "scaleRobust"; + private final static String TEST_DIR = "functions/builtin/"; + private final static String TEST_CLASS_DIR = TEST_DIR + BuiltinScaleRobustTest.class.getSimpleName() + "/"; + private final static double eps = 1e-10; + private final static int rows = 70; + private final static int cols = 50; + + + @Override + public void setUp() { + addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[]{"B"})); + } + + @Test + public void testScaleRobustDenseCP() { + runTest(false, ExecType.CP); + } + + private void runTest(boolean sparse, ExecType et) { + ExecMode old = setExecMode(et); + try { + loadTestConfiguration(getTestConfiguration(TEST_NAME)); + double sparsity = sparse ? 0.1 : 0.9; + String HOME = SCRIPT_DIR + TEST_DIR; + fullDMLScriptName = HOME + TEST_NAME + ".dml"; + fullRScriptName = HOME + TEST_NAME + ".R"; + String fullPyScriptName = HOME + TEST_NAME + ".py"; + programArgs = new String[]{"-args", input("A"), output("B")}; + programArgs = new String[]{"-exec", "singlenode", "-args", input("A"), output("B")}; + rCmd = "Rscript " + fullRScriptName + " " + inputDir() + " " + expectedDir(); + String pyCmd = "python " + fullPyScriptName + " " + inputDir() + " " + expectedDir(); + + double[][] A = getRandomMatrix(rows, cols, -10, 10, sparsity, 7); + writeInputMatrixWithMTD("A", A, true); + + // Run DML + runTest(true, false, null, -1); + + // Run R + runRScript(true); + + // Run Python + Process p = Runtime.getRuntime().exec(pyCmd); + // Capture stdout + BufferedReader stdInput = new BufferedReader(new InputStreamReader(p.getInputStream())); + BufferedReader stdError = new BufferedReader(new InputStreamReader(p.getErrorStream())); + + String s; + while ((s = stdInput.readLine()) != null) { + System.out.println("[PYTHON OUT] " + s); + } + while ((s = stdError.readLine()) != null) { + System.err.println("[PYTHON ERR] " + s); + } + + int exitCode = p.waitFor(); + if(exitCode != 0) { + throw new RuntimeException("Python script failed with exit code: " + exitCode); + } + + // Read matrices and compare + HashMap dmlfile = readDMLMatrixFromOutputDir("B"); + HashMap rfile = readRMatrixFromExpectedDir("B"); + HashMap pyfile = readRMatrixFromExpectedDir("B"); + + TestUtils.compareMatrices(dmlfile, rfile, eps, "DML", "R"); + TestUtils.compareMatrices(dmlfile, pyfile, eps, "DML", "Python"); + + + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + rtplatform = old; + } + } + +} \ No newline at end of file diff --git a/src/test/scripts/functions/builtin/scaleRobust.R b/src/test/scripts/functions/builtin/scaleRobust.R new file mode 100644 index 00000000000..553555cb39c --- /dev/null +++ b/src/test/scripts/functions/builtin/scaleRobust.R @@ -0,0 +1,42 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +library("Matrix") + +args <- commandArgs(TRUE) +options(digits=22) + + +X = as.matrix(readMM(paste(args[1], "A.mtx", sep=""))) +colnames(X) = colnames(X, do.NULL=FALSE, prefix="C") +Y = X + +for (j in 1:ncol(X)) { + col = X[, j] + med = quantile(col, probs=0.5, type=1, names=FALSE, na.rm=FALSE) + q1 = quantile(col, probs=0.25, type=1, names=FALSE, na.rm=FALSE) + q3 = quantile(col, probs=0.75, type=1, names=FALSE, na.rm=FALSE) + iqr = q3 - q1 + if (iqr == 0 || is.nan(iqr)) iqr = 1 + Y[, j] = (col - med) / iqr +} + +writeMM(as(Y, "CsparseMatrix"), paste(args[2], "B", sep="")) diff --git a/src/test/scripts/functions/builtin/scaleRobust.dml b/src/test/scripts/functions/builtin/scaleRobust.dml new file mode 100644 index 00000000000..23dcd5f97a4 --- /dev/null +++ b/src/test/scripts/functions/builtin/scaleRobust.dml @@ -0,0 +1,24 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +X = read($1); +[Y, med, iqr] = scaleRobust(X); +write(Y, $2); diff --git a/src/test/scripts/functions/builtin/scaleRobust.py b/src/test/scripts/functions/builtin/scaleRobust.py new file mode 100644 index 00000000000..37d13f41e66 --- /dev/null +++ b/src/test/scripts/functions/builtin/scaleRobust.py @@ -0,0 +1,38 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +import sys +import numpy as np +from scipy.io import mmread, mmwrite +from scipy.sparse import csc_matrix +from sklearn.preprocessing import RobustScaler + +if __name__ == "__main__": + input_path = sys.argv[1] + "A.mtx" + output_path = sys.argv[2] + "B" + + X = mmread(input_path).toarray() + + # Apply RobustScaler + scaler = RobustScaler() + Y = scaler.fit_transform(X) + + mmwrite(output_path, csc_matrix(Y))