Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

Commit

Permalink
Merge pull request #553 from ArgusMonitoring/develop
Browse files Browse the repository at this point in the history
Release
  • Loading branch information
Rajavardhan Sarkapally authored and GitHub Enterprise committed Aug 5, 2019
2 parents e1d4ebb + e7b3897 commit d4f831b
Show file tree
Hide file tree
Showing 28 changed files with 1,731 additions and 315 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,30 @@ public interface MetricService extends Service {
*/
List<MetricQuery> getQueries(List<String> expression, long relativeTo);

/**
* Returns a list of <tt>MetricQuery</tt> objects corresponding to the given expression where the query time range is relativeTo by the given value.
*
* @param expression The metric expressions to evaluate. Cannot be null, but may be empty. All entries must be a valid metric expression.
* @param relativeTo The timestamp from which the start and end times should be relative to. Only applied when using
* relative timestamps in expressions.
* For e.g. If the expression is -1h:argus.jvm:mem.heap.used:avg, 1 hour should be subtracted from
* relativeTo
* @return The corresponding list of metric query objects. Will never return null.
*/
List<MetricQuery> parseToMetricQuery(String expression, long relativeTo);

/**
* Returns a list of <tt>MetricQuery</tt> objects corresponding to the given expression where the query time range is relativeTo by the given value.
*
* @param expression The list of metric expressions to evaluate. Cannot be null, but may be empty. All entries must be a valid metric expression.
* @param relativeTo The timestamp from which the start and end times should be relative to. Only applied when using
* relative timestamps in expressions.
* For e.g. If the expression is -1h:argus.jvm:mem.heap.used:avg, 1 hour should be subtracted from
* relativeTo
* @return The corresponding list of metric query objects. Will never return null.
*/
List<MetricQuery> parseToMetricQuery(List<String> expression, long relativeTo);

/**
* Returns list of DC from the metric query list, if present.
* @param mQList The list of MetricQuery expressions to evaluate. Cannot be null, but may be empty. All entries must be a valid metric expression.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ public static enum Counter {
QUERY_STORE_BLOOM_CREATED_APPROXIMATE_ELEMENT_COUNT("argus.core", "querystore.bloomfilter.created.approximate.element.count", MetricType.COUNTER),

DATALAG_PER_DC_TIME_LAG("argus.core", "datalag.seconds"),
DATALAG_PER_DC_OFFSET_LAG("argus.core", "datalag.offset"),
QUERY_DATAPOINTS_LIMIT_EXCEEDED("argus.core", "query.datapoints.limit.exceeded"),

ELASTIC_SEARCH_GET_FAILURES("argus.core", "elastic.search.get.failures", MetricType.COUNTER),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import com.salesforce.dva.argus.service.mail.EmailContext;
import com.salesforce.dva.argus.service.metric.MetricQueryResult;
import com.salesforce.dva.argus.service.metric.transform.MissingDataException;
import com.salesforce.dva.argus.service.monitor.DataLagService;
import com.salesforce.dva.argus.service.tsdb.MetricQuery;
import com.salesforce.dva.argus.system.SystemConfiguration;
import com.salesforce.dva.argus.service.alert.testing.AlertTestResults;
Expand Down Expand Up @@ -408,7 +409,7 @@ private void loadWhiteListRegexPatterns()
{
if (_whiteListedScopeRegexPatterns == null)
{
String whiteListedScopesProperty = _configuration.getValue(SystemConfiguration.Property.DATA_LAG_WHITE_LISTED_SCOPES);
String whiteListedScopesProperty = _configuration.getValue(DataLagService.Property.DATA_LAG_WHITE_LISTED_SCOPES.getName(), DataLagService.Property.DATA_LAG_WHITE_LISTED_SCOPES.getDefaultValue());
if (!StringUtils.isEmpty(whiteListedScopesProperty))
{
_whiteListedScopeRegexPatterns = Stream.of(whiteListedScopesProperty.split(",")).map(elem -> Pattern.compile(elem.toLowerCase())).collect(Collectors.toList());
Expand All @@ -420,7 +421,7 @@ private void loadWhiteListRegexPatterns()

if (_whiteListedUserRegexPatterns == null)
{
String whiteListedUsersProperty = _configuration.getValue(SystemConfiguration.Property.DATA_LAG_WHITE_LISTED_USERS);
String whiteListedUsersProperty = _configuration.getValue(DataLagService.Property.DATA_LAG_WHITE_LISTED_USERS.getName(), DataLagService.Property.DATA_LAG_WHITE_LISTED_USERS.getDefaultValue());
if (!StringUtils.isEmpty(whiteListedUsersProperty))
{
_whiteListedUserRegexPatterns = Stream.of(whiteListedUsersProperty.split(",")).map(elem -> Pattern.compile(elem.toLowerCase())).collect(Collectors.toList());
Expand Down Expand Up @@ -538,7 +539,7 @@ public Integer executeScheduledAlerts(int alertCount, int timeout) {

NotificationProcessor np = new NotificationProcessor(this, _logger);
_monitorService.modifyCounter(Counter.ALERTS_EVALUATED_TOTAL, alerts.size(), new HashMap<>());
boolean datalagMonitorEnabled = Boolean.valueOf(_configuration.getValue(SystemConfiguration.Property.DATA_LAG_MONITOR_ENABLED));
boolean datalagMonitorEnabled = Boolean.valueOf(_configuration.getValue(DataLagService.Property.DATA_LAG_MONITOR_ENABLED.getName(), DataLagService.Property.DATA_LAG_MONITOR_ENABLED.getDefaultValue()));
AtomicInteger numberOfAlertsEvaluated = new AtomicInteger(alerts.size());
for (Alert alert : alerts) {

Expand Down Expand Up @@ -1321,7 +1322,7 @@ public void sendNotification(Trigger trigger, Metric metric, History history, No
}
}

Long timestamp = (triggerFiredTime != null) ? triggerFiredTime : System.currentTimeMillis();
Long timestamp = (alertEnqueueTime != null) ? alertEnqueueTime : System.currentTimeMillis();
String alertEvaluationTrackingID = getAlertEvaluationTrackingID(alert, timestamp);

NotificationContext context = new NotificationContext(alert, trigger, notification, triggerFiredTime,
Expand Down Expand Up @@ -1764,7 +1765,7 @@ public boolean testEvaluateAlert(Alert alert, Long alertEvaluationTime, AlertTes

// Evaluate Alert, Triggers, Notifications -----------------------------------------------------------------
// TODO - enable datalag monitor in alert testing?
boolean datalagMonitorEnabled = Boolean.valueOf(_configuration.getValue(SystemConfiguration.Property.DATA_LAG_MONITOR_ENABLED)); // TODO - get default value
boolean datalagMonitorEnabled = Boolean.valueOf(_configuration.getValue(DataLagService.Property.DATA_LAG_MONITOR_ENABLED.getName(), DataLagService.Property.DATA_LAG_MONITOR_ENABLED.getDefaultValue())); // TODO - get default value

long jobStartTime = System.currentTimeMillis();
long evaluateEndTime = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import com.salesforce.dva.argus.system.SystemAssert;
import com.salesforce.dva.argus.system.SystemConfiguration;
import com.salesforce.dva.argus.system.SystemException;
import com.salesforce.dva.argus.util.QueryContext;
import com.salesforce.dva.argus.util.QueryContextHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -212,6 +213,34 @@ public List<MetricQuery> getQueries(List<String> expressions, long relativeTo) {
}
return queries;
}

@Override
public List<MetricQuery> parseToMetricQuery(String expressions, long relativeTo) {
requireNotDisposed();
SystemAssert.requireArgument(MetricReader.isValid(expressions), "Illegal metric expression found: " + expressions);
return parseToMetricQuery(Arrays.asList(expressions), relativeTo);
}


@Override
public List<MetricQuery> parseToMetricQuery(List<String> expressions, long relativeTo) {
requireNotDisposed();

MetricReader<MetricQuery> reader = _metricReaderProviderForQueries.get();
List<MetricQuery> queries = new ArrayList<>();

try {
for (String expression : expressions) {
_logger.debug("Parsing expression to metric query for {}", expression);
QueryContextHolder contextHolder = new QueryContextHolder();
reader.parse(expression, relativeTo, MetricQuery.class, contextHolder, false);
queries.add(_queryProcessor.convertTSDBQueryToMetricQuery(contextHolder.getCurrentQueryContext().getExpression()));
}
} catch (ParseException ex) {
throw new SystemException("Failed to parse the given expression", ex);
}
return queries;
}

@Override
public void dispose() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ public Map<MetricQuery, List<Metric>> getMetrics(List<MetricQuery> queries) {
try {
for (MetricQuery query : queries) {
List<ConsumerOffsetMetric> consumerOffsetMetrics = new ArrayList<>();
String queryJson = constructQuery(query, from, scrollSize);
String queryJson = constructQuery(new MetricQuery(query), from, scrollSize);
final long start = System.currentTimeMillis();
Request request = new Request(ElasticSearchUtils.HttpMethod.POST.getName(), requestUrl);
request.setEntity(new StringEntity(queryJson, ContentType.APPLICATION_JSON));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,7 @@ public void mergeQueryResults(MetricQueryResult parentResult, MetricQueryResult
}
}

private MetricQueryResult evaluateTSDBQuery(TSDBQueryExpression expression) {
final long start = System.currentTimeMillis();

MetricQueryResult queryResult = new MetricQueryResult();
public MetricQuery convertTSDBQueryToMetricQuery(TSDBQueryExpression expression) {
Long startTimestamp = expression.getStartTimestamp();
Long endTimestamp = expression.getEndTimestamp();
String namespace = expression.getNamespace();
Expand All @@ -130,6 +127,16 @@ private MetricQueryResult evaluateTSDBQuery(TSDBQueryExpression expression) {
query.setAggregator(aggregator);
}

return query;
}

private MetricQueryResult evaluateTSDBQuery(TSDBQueryExpression expression) {
final long start = System.currentTimeMillis();

MetricQueryResult queryResult = new MetricQueryResult();


MetricQuery query = convertTSDBQueryToMetricQuery(expression);
List<MetricQuery> queries = _discoveryService.getMatchingQueries(query);

if (queries.size() == 0) { // No metrics inflow to argus in last DEFAULT_RETENTION_DISCOVERY_DAYS days. Save the raw query processed within inBoundMetricQuery.
Expand Down Expand Up @@ -157,7 +164,8 @@ private MetricQueryResult evaluateTSDBQuery(TSDBQueryExpression expression) {
}
Collections.sort(metrics);
queryResult.setMetricsList(metrics);
queryResult.setQueryTimeRangeInMillis(endTimestamp-startTimestamp);
Long startTimestamp = expression.getStartTimestamp();
queryResult.setQueryTimeRangeInMillis(expression.getEndTimestamp() - startTimestamp);
queryResult.setQueryStartTimeMillis(startTimestamp);
if(queries.size() !=1 || queries.get(0) != query) {
queryResult.setNumDiscoveryResults(queries.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ CloseableHttpClient createClient(SystemConfiguration config, int connectionCount
keystore = new DynamicKeyStoreBuilder()
.withMonitoredDirectory(config.getValue(Property.IDB_KEYSTORE_MONITORED_DIRECTORY.getName(),
Property.IDB_KEYSTORE_MONITORED_DIRECTORY.getDefaultValue()))
.withCADirectory(config.getValue(Property.IDB_KEYSTORE_CA_DIRECTORY.getName(),
Property.IDB_KEYSTORE_CA_DIRECTORY.getDefaultValue()))
.withCADirectory(config.getValue(Property.IDB_CA_DIRECTORY.getName(),
Property.IDB_CA_DIRECTORY.getDefaultValue()))
.withStartThread(true).build();
SSLContext sslContext = keystore.getSSLContext();
SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext);
Expand Down Expand Up @@ -196,7 +196,7 @@ public enum Property {
IDB_ENDPOINT("service.property.idbclient.endpoint", "https://cfg0-cidbapima1-0-prd.data.sfdc.net:443"),
IDB_CONN_COUNT("service.property.idbclient.conn.count", "30"),
IDB_KEYSTORE_MONITORED_DIRECTORY("service.property.idbclient.keystore.monitored.directory", "/etc/pki_service/sfdc/argus-ajnaconsumer"),
IDB_KEYSTORE_CA_DIRECTORY("service.property.idbclient.keystore.monitored.directory", "/etc/pki_service/ca"),
IDB_CA_DIRECTORY("service.property.idbclient.ca.directory", "/etc/pki_service/ca"),
IDB_CACHE_TTL_SECS("service.property.idbclient.cache.ttl.secs", "1800"),
IDB_CONN_TIMEOUT("service.property.idbclient.conn.timeout", "30000"),
IDB_SOCKET_TIMEOUT("service.property.idbclient.socket.timeout", "30000");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package com.salesforce.dva.argus.service.metric.transform;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.salesforce.dva.argus.entity.Metric;
import com.salesforce.dva.argus.system.SystemAssert;
import com.salesforce.dva.argus.util.QueryContext;
import com.salesforce.dva.argus.util.TransformUtil;
/**
* It provides methods to implement Slice transform
* @author Raj Sarkapally
*
*/
public class SliceTransform implements Transform{
private static String START_TIME = "start";
private static String END_TIME = "end";
private static long SECOND_IN_MILLI=1000l;

@Override
public List<Metric> transform(QueryContext context, List<Metric> metrics) {
throw new UnsupportedOperationException("Slice Transform needs interval start time and end time.");
}

@Override
public List<Metric> transform(QueryContext queryContext,
List<Metric> metrics, List<String> constants) {
SystemAssert.requireArgument(constants != null, "Slice Transform needs interval start time and end time.");
SystemAssert.requireArgument(constants.size() == 2, "Slice Transform must provide exactly 2 constants which are interval start time and interval end time.");

String startEndTimePattern= "("+ START_TIME + "|"+ END_TIME +")(\\s*[+-]\\s*\\d+[smhd])?";
String sliceStartTime = constants.get(0).trim();
String sliceEndTime = constants.get(1).trim();
SystemAssert.requireArgument((isLong(sliceStartTime) || sliceStartTime.matches(startEndTimePattern)), "The start time of Slice transform is invalid.");
SystemAssert.requireArgument((isLong(sliceEndTime) || sliceEndTime.matches(startEndTimePattern)), "The end time of Slice transform is invalid.");

long sliceStartTimeInMilli = calculateTime(sliceStartTime, queryContext.getChildContexts().get(0).getExpression().getStartTimestamp(),
queryContext.getChildContexts().get(0).getExpression().getEndTimestamp());

long sliceEndTimeInMilli = calculateTime(sliceEndTime, queryContext.getChildContexts().get(0).getExpression().getStartTimestamp(),
queryContext.getChildContexts().get(0).getExpression().getEndTimestamp());

metrics.forEach(metric -> {
Map<Long, Double> slicedDatapoints = new HashMap<>();
metric.getDatapoints().forEach((timestamp,value) ->{
if(timestamp >= sliceStartTimeInMilli && timestamp <=sliceEndTimeInMilli) {
slicedDatapoints.put(timestamp, value);
}
});
metric.setDatapoints(slicedDatapoints);
});
return metrics;
}

@Override
public List<Metric> transform(QueryContext queryContext,
List<Metric>... metrics) {
throw new UnsupportedOperationException("Slice Transform doesn't need list of list.");
}

@Override
public String getResultScopeName() {
return TransformFactory.Function.SLICE.name();
}

private long calculateTime(String time,long queryStartTime, long queryEndTime) {
if(isLong(time)) {
return Long.valueOf(time);
}else {
long startREndtime;
String remTimeString;
if(time.contains(START_TIME)) {
startREndtime=queryStartTime;
remTimeString=time.substring(START_TIME.length()).trim();
if(remTimeString.isEmpty()) {
return queryStartTime;
}
}else {
startREndtime=queryEndTime;
remTimeString=time.substring(END_TIME.length()).trim();
if(remTimeString.isEmpty()) {
return queryEndTime;
}
}
return calculate(startREndtime, remTimeString.charAt(0), SECOND_IN_MILLI * TransformUtil.getWindowInSeconds(remTimeString.substring(1).trim()));
}
}

private long calculate(long operand1, char operator, long operand2) {
switch(operator) {
case '+':
return operand1 + operand2;
case '-':
return operand1 - operand2;
case '*':
return operand1 * operand2;
case '/':
return operand1/operand2;
default:
return operand1-operand2;
}
}

private boolean isLong(String s) {
try {
Long.valueOf(s);
return true;
}catch(NumberFormatException e) {
return false;
}catch(Throwable t) {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ public Transform getTransform(String functionName) {
return new InterpolateTransform();
case RATE:
return new RateTransform();
case SLICE:
return new SliceTransform();
default:
throw new UnsupportedOperationException(functionName);
} // end switch
Expand Down Expand Up @@ -277,7 +279,8 @@ public enum Function {
ANOMALY_KMEANS("ANOMALY_KMEANS", "Calculates an anomaly score (0-100) for each value of the metric based on a K-means clustering of the metric data."),
ANOMALY_RPCA("ANOMALY_RPCA", "Calculates an anomaly score (0-100) for each value of the metric based on the RPCA matrix decomposition algorithm."),
INTERPOLATE("INTERPOLATE", "Performs interpolation of multiple time series, that can then be used for aggregation"),
RATE("RATE", "Performs Rate for all given time series");
RATE("RATE", "Performs Rate for all given time series"),
SLICE("SLICE", "Removes data points before interval start time and after interval end time. ");

private final String _name;
private final String _description;
Expand Down
Loading

0 comments on commit d4f831b

Please sign in to comment.