Skip to content

Commit

Permalink
Mean shift movers query (stanford-futuredata#264)
Browse files Browse the repository at this point in the history
Introduce a new query type (Classifier+Summarizer) that returns itemsets that have minimal support where the mean of some metric has shifted from inliers to outliers.
  • Loading branch information
kraftp authored May 9, 2018
1 parent f8706b1 commit 9af7c8f
Show file tree
Hide file tree
Showing 10 changed files with 588 additions and 14 deletions.
19 changes: 19 additions & 0 deletions conf/CountMeanShift.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"pipeline": "CubePipeline",
"inputURI": "csv://lib/src/test/resources/sample_cubedshift.csv",
"classifier": "countmeanshift",
"metric": "time",
"predicate": "==",
"cutoff": "1",
"meanColumn": "meanLatency",
"countColumn": "count",
"summarizer": "countmeanshift",
"attributes": [
"location",
"version",
"language",
],
"meanShiftRatio": 1.1,
"minSupport": 0.05,
"numThreads": 1
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package edu.stanford.futuredata.macrobase.pipeline;

import edu.stanford.futuredata.macrobase.analysis.classify.Classifier;
import edu.stanford.futuredata.macrobase.analysis.classify.PercentileClassifier;
import edu.stanford.futuredata.macrobase.analysis.classify.PredicateClassifier;
import com.fasterxml.jackson.databind.jsonschema.SchemaAware;
import edu.stanford.futuredata.macrobase.analysis.classify.*;
import edu.stanford.futuredata.macrobase.analysis.summary.Explanation;
import edu.stanford.futuredata.macrobase.analysis.summary.aplinear.APLCountMeanShiftSummarizer;
import edu.stanford.futuredata.macrobase.analysis.summary.aplinear.APLOutlierSummarizer;
import edu.stanford.futuredata.macrobase.analysis.summary.BatchSummarizer;
import edu.stanford.futuredata.macrobase.analysis.summary.fpg.FPGrowthSummarizer;
Expand All @@ -27,6 +27,7 @@ public class BasicBatchPipeline implements Pipeline {
private String classifierType;
private String metric;
private double cutoff;
private Optional<String> meanColumn;
private String strCutoff;
private boolean isStrPredicate;
private boolean pctileHigh;
Expand All @@ -39,6 +40,7 @@ public class BasicBatchPipeline implements Pipeline {
private String ratioMetric;
private double minSupport;
private double minRiskRatio;
private double meanShiftRatio;


public BasicBatchPipeline (PipelineConfig conf) {
Expand All @@ -47,7 +49,7 @@ public BasicBatchPipeline (PipelineConfig conf) {
classifierType = conf.get("classifier", "percentile");
metric = conf.get("metric");

if (classifierType.equals("predicate")) {
if (classifierType.equals("predicate") || classifierType.equals("countmeanshift")){
Object rawCutoff = conf.get("cutoff");
isStrPredicate = rawCutoff instanceof String;
if (isStrPredicate) {
Expand All @@ -70,6 +72,8 @@ public BasicBatchPipeline (PipelineConfig conf) {
minRiskRatio = conf.get("minRatioMetric", 3.0);
minSupport = conf.get("minSupport", 0.01);
numThreads = conf.get("numThreads", Runtime.getRuntime().availableProcessors());
meanColumn = Optional.ofNullable(conf.get("meanColumn"));
meanShiftRatio = conf.get("meanShiftRatio", 1.0);
}

public Classifier getClassifier() throws MacroBaseException {
Expand All @@ -81,6 +85,21 @@ public Classifier getClassifier() throws MacroBaseException {
classifier.setIncludeLow(pctileLow);
return classifier;
}
case "countmeanshift": {
if (isStrPredicate) {
return new CountMeanShiftClassifier(
metric,
meanColumn.orElseThrow(
() -> new MacroBaseException("mean column not present in config")), predicateStr,
strCutoff);
} else {
return new CountMeanShiftClassifier(
metric,
meanColumn.orElseThrow(
() -> new MacroBaseException("mean column not present in config")), predicateStr,
cutoff);
}
}
case "predicate": {
if (isStrPredicate){
PredicateClassifier classifier = new PredicateClassifier(metric, predicateStr, strCutoff);
Expand Down Expand Up @@ -116,6 +135,14 @@ public BatchSummarizer getSummarizer(String outlierColumnName) throws MacroBaseE
summarizer.setNumThreads(numThreads);
return summarizer;
}
case "countmeanshift": {
APLCountMeanShiftSummarizer summarizer = new APLCountMeanShiftSummarizer();
summarizer.setAttributes(attributes);
summarizer.setMinSupport(minSupport);
summarizer.setMinMeanShift(meanShiftRatio);
summarizer.setNumThreads(numThreads);
return summarizer;
}
default: {
throw new MacroBaseException("Bad Summarizer Type");
}
Expand All @@ -131,6 +158,11 @@ public DataFrame loadData() throws Exception {
colTypes.put(metric, Schema.ColType.DOUBLE);
}
List<String> requiredColumns = new ArrayList<>(attributes);
if (meanColumn.isPresent()) {
colTypes.put(meanColumn.get(), Schema.ColType.DOUBLE);
requiredColumns.add(meanColumn.get());

}
requiredColumns.add(metric);
return PipelineUtils.loadDataFrame(inputURI, colTypes, requiredColumns);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
package edu.stanford.futuredata.macrobase.pipeline;

import edu.stanford.futuredata.macrobase.analysis.classify.ArithmeticClassifier;
import edu.stanford.futuredata.macrobase.analysis.classify.CubeClassifier;
import edu.stanford.futuredata.macrobase.analysis.classify.PredicateCubeClassifier;
import edu.stanford.futuredata.macrobase.analysis.classify.QuantileClassifier;
import edu.stanford.futuredata.macrobase.analysis.classify.RawClassifier;
import edu.stanford.futuredata.macrobase.analysis.summary.aplinear.APLExplanation;
import edu.stanford.futuredata.macrobase.analysis.summary.aplinear.APLMeanSummarizer;
import edu.stanford.futuredata.macrobase.analysis.summary.aplinear.APLOutlierSummarizer;
import edu.stanford.futuredata.macrobase.analysis.summary.aplinear.APLSummarizer;
import edu.stanford.futuredata.macrobase.analysis.classify.*;
import edu.stanford.futuredata.macrobase.analysis.summary.aplinear.*;
import edu.stanford.futuredata.macrobase.datamodel.DataFrame;
import edu.stanford.futuredata.macrobase.datamodel.Schema;
import edu.stanford.futuredata.macrobase.ingest.CSVDataFrameWriter;
Expand Down Expand Up @@ -57,6 +50,7 @@ public class CubePipeline implements Pipeline {
private List<String> attributes;
private double minSupport;
private double minRatioMetric;
private double meanShiftRatio;

private boolean debugDump;

Expand All @@ -69,7 +63,7 @@ public CubePipeline(PipelineConfig conf) {
classifierType = conf.get("classifier", "arithmetic");
countColumn = conf.get("countColumn", "count");

if (classifierType.equals("predicate")) {
if (classifierType.equals("predicate") || classifierType.equals("countmeanshift")){
Object rawCutoff = conf.get("cutoff");
isStrPredicate = rawCutoff instanceof String;
if (isStrPredicate) {
Expand All @@ -94,6 +88,7 @@ public CubePipeline(PipelineConfig conf) {
attributes = conf.get("attributes");
minSupport = conf.get("minSupport", 3.0);
minRatioMetric = conf.get("minRatioMetric", 0.01);
meanShiftRatio = conf.get("meanShiftRatio", 1.0);
numThreads = conf.get("numThreads", Runtime.getRuntime().availableProcessors());

debugDump = conf.get("debugDump", false);
Expand Down Expand Up @@ -147,6 +142,20 @@ private Map<String, Schema.ColType> getColTypes() throws MacroBaseException {
Map<String, Schema.ColType> colTypes = new HashMap<>();
colTypes.put(countColumn, Schema.ColType.DOUBLE);
switch (classifierType) {
case "countmeanshift":
if (isStrPredicate) {
colTypes.put(metric.orElseThrow(
() -> new MacroBaseException("metric column not present in config")),
Schema.ColType.STRING);
} else {
colTypes.put(metric.orElseThrow(
() -> new MacroBaseException("metric column not present in config")),
Schema.ColType.DOUBLE);
}
colTypes.put(meanColumn
.orElseThrow(() -> new MacroBaseException("mean column not present in config")),
Schema.ColType.DOUBLE);
return colTypes;
case "meanshift":
case "arithmetic": {
colTypes.put(meanColumn
Expand Down Expand Up @@ -187,6 +196,23 @@ private Map<String, Schema.ColType> getColTypes() throws MacroBaseException {

private CubeClassifier getClassifier() throws MacroBaseException {
switch (classifierType) {
case "countmeanshift": {
if (isStrPredicate) {
return new CountMeanShiftCubedClassifier(countColumn,
metric.orElseThrow(
() -> new MacroBaseException("metric column not present in config")),
meanColumn.orElseThrow(
() -> new MacroBaseException("mean column not present in config")), predicateStr,
strCutoff);
} else {
return new CountMeanShiftCubedClassifier(countColumn,
metric.orElseThrow(
() -> new MacroBaseException("metric column not present in config")),
meanColumn.orElseThrow(
() -> new MacroBaseException("mean column not present in config")), predicateStr,
cutoff);
}
}
case "arithmetic": {
ArithmeticClassifier classifier =
new ArithmeticClassifier(countColumn, meanColumn.orElseThrow(
Expand Down Expand Up @@ -234,6 +260,14 @@ private CubeClassifier getClassifier() throws MacroBaseException {

private APLSummarizer getSummarizer(CubeClassifier classifier) throws Exception {
switch (classifierType) {
case "countmeanshift": {
APLCountMeanShiftSummarizer summarizer = new APLCountMeanShiftSummarizer();
summarizer.setAttributes(attributes);
summarizer.setMinSupport(minSupport);
summarizer.setMinMeanShift(meanShiftRatio);
summarizer.setNumThreads(numThreads);
return summarizer;
}
case "meanshift": {
APLMeanSummarizer summarizer = new APLMeanSummarizer();
summarizer.setCountColumn(countColumn);
Expand All @@ -244,6 +278,7 @@ private APLSummarizer getSummarizer(CubeClassifier classifier) throws Exception
summarizer.setAttributes(attributes);
summarizer.setMinSupport(minSupport);
summarizer.setMinStdDev(minRatioMetric);
summarizer.setNumThreads(numThreads);
return summarizer;
}
default: {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package edu.stanford.futuredata.macrobase.analysis.classify;

import edu.stanford.futuredata.macrobase.analysis.classify.stats.MBPredicate;
import edu.stanford.futuredata.macrobase.datamodel.DataFrame;
import edu.stanford.futuredata.macrobase.util.MacroBaseException;

import java.util.function.DoublePredicate;
import java.util.function.Predicate;

public class CountMeanShiftClassifier extends Classifier {
private Predicate<String> strPredicate;
private DoublePredicate doublePredicate;
private DataFrame output;
private String metricColumnName;
private String meanColumnName;
private boolean isStrPredicate;
public static String outlierCountColumnName = "_OUTLIERCOUNT";
public static String inlierCountColumnName = "_INLIERCOUNT";
public static String outlierMeanSumColumnName = "_OUTLIERMEANSUM";
public static String inlierMeanSumColumnName = "_INLIERMEANSUM";

/**
* @param metricColumnName Column on which to classify outliers
* @param meanColumnName Column containing means whose shifts will be explained
* @param predicateStr Predicate used for classification: "==" or "!="
* @param sentinel String sentinel value used when evaluating the predicate to determine outlier
* @throws MacroBaseException
*/
public CountMeanShiftClassifier(
final String metricColumnName,
final String meanColumnName,
final String predicateStr,
final String sentinel
) throws MacroBaseException {
super(meanColumnName);
this.metricColumnName = metricColumnName;
this.meanColumnName = meanColumnName;
this.strPredicate = MBPredicate.getStrPredicate(predicateStr, sentinel);
this.isStrPredicate = true;
}

/**
* @param metricColumnName Column on which to classify outliers
* @param meanColumnName Column containing means whose shifts will be explained
* @param predicateStr Predicate used for classification: "==", "!=", "<", ">", "<=", or ">="
* @param sentinel Double sentinel value used when evaluating the predicate to determine outlier
*/
public CountMeanShiftClassifier(
final String metricColumnName,
final String meanColumnName,
final String predicateStr,
final double sentinel
) throws MacroBaseException {
super(meanColumnName);
this.metricColumnName = metricColumnName;
this.meanColumnName = meanColumnName;
this.doublePredicate = MBPredicate.getDoublePredicate(predicateStr, sentinel);
this.isStrPredicate = false;
}

/**
* Scan through the metric column, and evaluate the predicate on every value in the column. The ``input'' DataFrame
* remains unmodified; a copy is created and all modifications are made on the copy. Then store counts and
* meancounts for both outliers and inliers.
* @throws Exception
*/
@Override
public void process(DataFrame input) throws Exception {
String[] stringMetrics = null;
if (isStrPredicate)
stringMetrics = input.getStringColumnByName(metricColumnName);
double[] doubleMetrics = null;
if (!isStrPredicate)
doubleMetrics = input.getDoubleColumnByName(metricColumnName);
output = input.copy();
double[] totalMeanColumn = input.getDoubleColumnByName(meanColumnName);
int len = totalMeanColumn.length;
double[] outlierCountColumn = new double[len];
double[] inlierCountColumn = new double[len];
double[] outlierMeanColumn = new double[len];
double[] inlierMeanColumn = new double[len];
for (int i = 0; i < len; i++) {
if ((isStrPredicate && strPredicate.test(stringMetrics[i])) ||
(!isStrPredicate && doublePredicate.test(doubleMetrics[i]))) {
outlierCountColumn[i] = 1.0;
outlierMeanColumn[i] = totalMeanColumn[i];
} else {
inlierCountColumn[i] = 1.0;
inlierMeanColumn[i] = totalMeanColumn[i];
}
}
output.addColumn(outlierCountColumnName, outlierCountColumn);
output.addColumn(inlierCountColumnName, inlierCountColumn);
output.addColumn(outlierMeanSumColumnName, outlierMeanColumn);
output.addColumn(inlierMeanSumColumnName, inlierMeanColumn);
}


@Override
public DataFrame getResults() {
return output;
}
}
Loading

0 comments on commit 9af7c8f

Please sign in to comment.