diff --git a/ArgusCore/pom.xml b/ArgusCore/pom.xml
index 3bba585d6..9c9d0dc6d 100644
--- a/ArgusCore/pom.xml
+++ b/ArgusCore/pom.xml
@@ -309,7 +309,7 @@
com.google.guava
guava
- 19.0
+ 23.0
org.apache.commons
diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java
index c6d29387a..ffb87f857 100644
--- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java
+++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/AbstractSchemaService.java
@@ -1,21 +1,23 @@
package com.salesforce.dva.argus.service.schema;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryPoolMXBean;
-import java.lang.management.MemoryType;
+import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Calendar;
import java.util.List;
import java.util.Map.Entry;
import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.googlecode.concurrenttrees.radix.ConcurrentRadixTree;
-import com.googlecode.concurrenttrees.radix.RadixTree;
-import com.googlecode.concurrenttrees.radix.node.concrete.SmartArrayBasedNodeFactory;
-import com.googlecode.concurrenttrees.radix.node.concrete.voidvalue.VoidValue;
+import com.google.common.hash.BloomFilter;
+import com.google.common.hash.Funnels;
+
import com.salesforce.dva.argus.entity.KeywordQuery;
import com.salesforce.dva.argus.entity.Metric;
import com.salesforce.dva.argus.entity.MetricSchemaRecord;
@@ -26,245 +28,257 @@
import com.salesforce.dva.argus.system.SystemConfiguration;
public abstract class AbstractSchemaService extends DefaultService implements SchemaService {
-
- private static final long MAX_MEMORY = Runtime.getRuntime().maxMemory();
- private static final long POLL_INTERVAL_MS = 60 * 1000L;
- protected static final RadixTree TRIE = new ConcurrentRadixTree<>(new SmartArrayBasedNodeFactory());
-
- private static boolean _writesToTrieEnabled = true;
-
- private final Logger _logger = LoggerFactory.getLogger(getClass());
- private final Thread _oldGenMonitorThread;
- private final boolean _cacheEnabled;
- protected final boolean _syncPut;
+ private static final long POLL_INTERVAL_MS = 10 * 60 * 1000L;
+ private static final int DAY_IN_SECONDS = 24 * 60 * 60;
+ private static final int HOUR_IN_SECONDS = 60 * 60;
+ protected static BloomFilter bloomFilter;
+ private Random rand = new Random();
+ private int randomNumber = rand.nextInt();
+ private int bloomFilterExpectedNumberInsertions;
+ private double bloomFilterErrorRate;
+ private final Logger _logger = LoggerFactory.getLogger(getClass());
+ private final Thread _bloomFilterMonitorThread;
+ protected final boolean _syncPut;
+ private int bloomFilterFlushHourToStartAt;
+ private ScheduledExecutorService scheduledExecutorService;
protected AbstractSchemaService(SystemConfiguration config) {
super(config);
-
- _cacheEnabled = Boolean.parseBoolean(
- config.getValue(Property.CACHE_SCHEMARECORDS.getName(), Property.CACHE_SCHEMARECORDS.getDefaultValue()));
- _syncPut = Boolean.parseBoolean(
- config.getValue(Property.SYNC_PUT.getName(), Property.SYNC_PUT.getDefaultValue()));
-
- _oldGenMonitorThread = new Thread(new OldGenMonitorThread(), "old-gen-monitor");
- if(_cacheEnabled) {
- _oldGenMonitorThread.start();
- }
+
+ bloomFilterExpectedNumberInsertions = Integer.parseInt(config.getValue(Property.BLOOMFILTER_EXPECTED_NUMBER_INSERTIONS.getName(),
+ Property.BLOOMFILTER_EXPECTED_NUMBER_INSERTIONS.getDefaultValue()));
+ bloomFilterErrorRate = Double.parseDouble(config.getValue(Property.BLOOMFILTER_ERROR_RATE.getName(),
+ Property.BLOOMFILTER_ERROR_RATE.getDefaultValue()));
+ bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()), bloomFilterExpectedNumberInsertions , bloomFilterErrorRate);
+
+ _syncPut = Boolean.parseBoolean(
+ config.getValue(Property.SYNC_PUT.getName(), Property.SYNC_PUT.getDefaultValue()));
+
+ _bloomFilterMonitorThread = new Thread(new BloomFilterMonitorThread(), "bloom-filter-monitor");
+ _bloomFilterMonitorThread.start();
+
+ bloomFilterFlushHourToStartAt = Integer.parseInt(config.getValue(Property.BLOOM_FILTER_FLUSH_HOUR_TO_START_AT.getName(),
+ Property.BLOOM_FILTER_FLUSH_HOUR_TO_START_AT.getDefaultValue()));
+ createScheduledExecutorService(bloomFilterFlushHourToStartAt);
}
@Override
public void put(Metric metric) {
requireNotDisposed();
SystemAssert.requireArgument(metric != null, "Metric cannot be null.");
-
+
put(Arrays.asList(metric));
}
@Override
public void put(List metrics) {
requireNotDisposed();
- SystemAssert.requireArgument(metrics != null, "Metric list cannot be null.");
-
- //If cache is not enabled, call implementation specific put with the list of metrics.
- if(!_cacheEnabled) {
- implementationSpecificPut(metrics);
- return;
- }
-
- //If cache is enabled, create a list of metricsToPut that do not exist on the TRIE and then call implementation
- // specific put with only those subset of metricsToPut.
- List metricsToPut = new ArrayList<>(metrics.size());
-
+ SystemAssert.requireArgument(metrics != null, "Metric list cannot be null.");
+
+ // Create a list of metricsToPut that do not exist on the BLOOMFILTER and then call implementation
+ // specific put with only those subset of metricsToPut.
+ List metricsToPut = new ArrayList<>(metrics.size());
+
for(Metric metric : metrics) {
if(metric.getTags().isEmpty()) {
- String key = constructTrieKey(metric, null);
- boolean found = TRIE.getValueForExactKey(key) != null;
- if(!found) {
- metricsToPut.add(metric);
- if(_writesToTrieEnabled) {
- TRIE.putIfAbsent(key, VoidValue.SINGLETON);
- }
- }
+ String key = constructKey(metric, null);
+ boolean found = bloomFilter.mightContain(key);
+ if(!found) {
+ metricsToPut.add(metric);
+ }
} else {
boolean newTags = false;
for(Entry tagEntry : metric.getTags().entrySet()) {
- String key = constructTrieKey(metric, tagEntry);
- boolean found = TRIE.getValueForExactKey(key) != null;
- if(!found) {
- newTags = true;
- if(_writesToTrieEnabled) {
- TRIE.putIfAbsent(key, VoidValue.SINGLETON);
- }
- }
+ String key = constructKey(metric, tagEntry);
+ boolean found = bloomFilter.mightContain(key);
+ if(!found) {
+ newTags = true;
+ }
}
-
+
if(newTags) {
metricsToPut.add(metric);
}
}
}
-
+
implementationSpecificPut(metricsToPut);
}
-
-
+
protected abstract void implementationSpecificPut(List metrics);
- protected String constructTrieKey(Metric metric, Entry tagEntry) {
+ @Override
+ public void dispose() {
+ requireNotDisposed();
+ if (_bloomFilterMonitorThread != null && _bloomFilterMonitorThread.isAlive()) {
+ _logger.info("Stopping bloom filter monitor thread.");
+ _bloomFilterMonitorThread.interrupt();
+ _logger.info("Bloom filter monitor thread interrupted.");
+ try {
+ _logger.info("Waiting for bloom filter monitor thread to terminate.");
+ _bloomFilterMonitorThread.join();
+ } catch (InterruptedException ex) {
+ _logger.warn("Bloom filter monitor thread was interrupted while shutting down.");
+ }
+ _logger.info("System monitoring stopped.");
+ } else {
+ _logger.info("Requested shutdown of bloom filter monitor thread aborted, as it is not yet running.");
+ }
+ shutdownScheduledExecutorService();
+ }
+
+ @Override
+ public abstract Properties getServiceProperties();
+
+ @Override
+ public abstract List get(MetricSchemaRecordQuery query);
+
+ @Override
+ public abstract List getUnique(MetricSchemaRecordQuery query, RecordType type);
+
+ @Override
+ public abstract List keywordSearch(KeywordQuery query);
+
+ protected String constructKey(Metric metric, Entry tagEntry) {
StringBuilder sb = new StringBuilder(metric.getScope());
sb.append('\0').append(metric.getMetric());
-
+
if(metric.getNamespace() != null) {
sb.append('\0').append(metric.getNamespace());
}
-
+
if(tagEntry != null) {
sb.append('\0').append(tagEntry.getKey()).append('\0').append(tagEntry.getValue());
}
-
+
+ // Add randomness for each instance of bloom filter running on different
+ // schema clients to reduce probability of false positives that metric schemas are not written to ES
+ sb.append('\0').append(randomNumber);
+
return sb.toString();
}
-
- protected String constructTrieKey(String scope, String metric, String tagk, String tagv, String namespace) {
+
+ protected String constructKey(String scope, String metric, String tagk, String tagv, String namespace) {
StringBuilder sb = new StringBuilder(scope);
sb.append('\0').append(metric);
-
+
if(namespace != null) {
sb.append('\0').append(namespace);
}
-
+
if(tagk != null) {
sb.append('\0').append(tagk);
}
-
+
if(tagv != null) {
sb.append('\0').append(tagv);
}
-
+
+ // Add randomness for each instance of bloom filter running on different
+ // schema clients to reduce probability of false positives that metric schemas are not written to ES
+ sb.append('\0').append(randomNumber);
+
return sb.toString();
}
-
- @Override
- public void dispose() {
- requireNotDisposed();
- if (_oldGenMonitorThread != null && _oldGenMonitorThread.isAlive()) {
- _logger.info("Stopping old gen monitor thread.");
- _oldGenMonitorThread.interrupt();
- _logger.info("Old gen monitor thread interrupted.");
- try {
- _logger.info("Waiting for old gen monitor thread to terminate.");
- _oldGenMonitorThread.join();
- } catch (InterruptedException ex) {
- _logger.warn("Old gen monitor thread was interrupted while shutting down.");
- }
- _logger.info("System monitoring stopped.");
- } else {
- _logger.info("Requested shutdown of old gen monitor thread aborted, as it is not yet running.");
- }
+ private void createScheduledExecutorService(int targetHourToStartAt){
+ scheduledExecutorService = Executors.newScheduledThreadPool(1);
+ int initialDelayInSeconds = getNumHoursUntilTargetHour(targetHourToStartAt) * HOUR_IN_SECONDS;
+ BloomFilterFlushThread bloomFilterFlushThread = new BloomFilterFlushThread();
+ scheduledExecutorService.scheduleAtFixedRate(bloomFilterFlushThread, initialDelayInSeconds, DAY_IN_SECONDS, TimeUnit.SECONDS);
}
- @Override
- public abstract Properties getServiceProperties();
-
- @Override
- public abstract List get(MetricSchemaRecordQuery query);
+ private void shutdownScheduledExecutorService(){
+ _logger.info("Shutting down scheduled bloom filter flush executor service");
+ scheduledExecutorService.shutdown();
+ try {
+ scheduledExecutorService.awaitTermination(10, TimeUnit.SECONDS);
+ } catch (InterruptedException ex) {
+ _logger.warn("Shutdown of executor service was interrupted.");
+ Thread.currentThread().interrupt();
+ }
+ }
- @Override
- public abstract List getUnique(MetricSchemaRecordQuery query, RecordType type);
+ protected int getNumHoursUntilTargetHour(int targetHour){
+ _logger.info("Initialized bloom filter flushing out, at {} hour of day", targetHour);
+ Calendar calendar = Calendar.getInstance();
+ int hour = calendar.get(Calendar.HOUR_OF_DAY);
+ return hour < targetHour ? (targetHour - hour) : (targetHour + 24 - hour);
+ }
- @Override
- public abstract List keywordSearch(KeywordQuery query);
-
-
/**
- * The set of implementation specific configuration properties.
- *
- * @author Bhinav Sura (bhinav.sura@salesforce.com)
- */
- public enum Property {
-
- /* If set to true, schema records will be cached on writes. This helps to check if a schema records already exists,
- * and if it does then do not rewrite. Provide more heap space when using this option. */
- CACHE_SCHEMARECORDS("service.property.schema.cache.schemarecords", "false"),
- SYNC_PUT("service.property.schema.sync.put", "false");
-
- private final String _name;
- private final String _defaultValue;
-
- private Property(String name, String defaultValue) {
- _name = name;
- _defaultValue = defaultValue;
- }
-
- /**
- * Returns the property name.
- *
- * @return The property name.
- */
- public String getName() {
- return _name;
- }
-
- /**
- * Returns the default value for the property.
- *
- * @return The default value.
- */
- public String getDefaultValue() {
- return _defaultValue;
- }
- }
-
-
- //~ Inner Classes ********************************************************************************************************************************
-
- /**
- * Old Generation monitoring thread.
- *
- * @author Bhinav Sura (bhinav.sura@salesforce.com)
- */
- private class OldGenMonitorThread implements Runnable {
+ * The set of implementation specific configuration properties.
+ *
+ * @author Bhinav Sura (bhinav.sura@salesforce.com)
+ */
+ public enum Property {
+ SYNC_PUT("service.property.schema.sync.put", "false"),
+ BLOOMFILTER_EXPECTED_NUMBER_INSERTIONS("service.property.schema.bloomfilter.expected.number.insertions", "40"),
+ BLOOMFILTER_ERROR_RATE("service.property.schema.bloomfilter.error.rate", "0.00001"),
+ /*
+ * Have a different configured flush start hour for different machines to prevent thundering herd problem.
+ */
+ BLOOM_FILTER_FLUSH_HOUR_TO_START_AT("service.property.schema.bloomfilter.flush.hour.to.start.at","2");
+
+ private final String _name;
+ private final String _defaultValue;
+ private Property(String name, String defaultValue) {
+ _name = name;
+ _defaultValue = defaultValue;
+ }
+
+ /**
+ * Returns the property name.
+ *
+ * @return The property name.
+ */
+ public String getName() {
+ return _name;
+ }
+
+ /**
+ * Returns the default value for the property.
+ *
+ * @return The default value.
+ */
+ public String getDefaultValue() {
+ return _defaultValue;
+ }
+ }
+
+
+ //~ Inner Classes ********************************************************************************************************************************
+
+ /**
+ * Bloom Filter monitoring thread.
+ *
+ * @author Dilip Devaraj (ddevaraj@salesforce.com)
+ */
+ private class BloomFilterMonitorThread implements Runnable {
@Override
public void run() {
+ _logger.info("Initialized random number for bloom filter key = {}", randomNumber);
while (!Thread.currentThread().isInterrupted()) {
_sleepForPollPeriod();
if (!Thread.currentThread().isInterrupted()) {
try {
- _checkOldGenUsage();
+ _checkBloomFilterUsage();
} catch (Exception ex) {
- _logger.warn("Exception occurred while checking old generation usage.", ex);
+ _logger.warn("Exception occurred while checking bloom filter usage.", ex);
}
}
}
}
- private void _checkOldGenUsage() {
- List memoryPoolBeans = ManagementFactory.getMemoryPoolMXBeans();
- for (MemoryPoolMXBean bean : memoryPoolBeans) {
- if (bean.getType() == MemoryType.HEAP) {
- String name = bean.getName().toLowerCase();
- if (name.contains("old") || name.contains("tenured")) {
- long oldGenUsed = bean.getUsage().getUsed();
- _logger.info("Old Gen Memory = {} bytes", oldGenUsed);
- _logger.info("Max JVM Memory = {} bytes", MAX_MEMORY);
- if (oldGenUsed > 0.90 * MAX_MEMORY) {
- _logger.info("JVM heap memory usage has exceeded 90% of the allocated heap memory. Disabling writes to TRIE.");
- _writesToTrieEnabled = false;
- } else if(oldGenUsed < 0.50 * MAX_MEMORY && !_writesToTrieEnabled) {
- _logger.info("JVM heap memory usage is below 50% of the allocated heap memory and writes to TRIE is disabled. "
- + "Enabling writes to TRIE now.");
- _writesToTrieEnabled = true;
- }
- }
- }
- }
+ private void _checkBloomFilterUsage() {
+ _logger.info("Bloom approx no. elements = {}", bloomFilter.approximateElementCount());
+ _logger.info("Bloom expected error rate = {}", bloomFilter.expectedFpp());
}
private void _sleepForPollPeriod() {
try {
- _logger.info("Sleeping for {}s before checking old gen usage.", POLL_INTERVAL_MS / 1000);
+ _logger.info("Sleeping for {}s before checking bloom filter statistics.", POLL_INTERVAL_MS / 1000);
Thread.sleep(POLL_INTERVAL_MS);
} catch (InterruptedException ex) {
_logger.warn("AbstractSchemaService memory monitor thread was interrupted.");
@@ -273,4 +287,21 @@ private void _sleepForPollPeriod() {
}
}
+ private class BloomFilterFlushThread implements Runnable {
+ @Override
+ public void run() {
+ try{
+ _flushBloomFilter();
+ } catch (Exception ex) {
+ _logger.warn("Exception occurred while flushing bloom filter.", ex);
+ }
+ }
+
+ private void _flushBloomFilter() {
+ _logger.info("Flushing out bloom filter entries");
+ bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()), bloomFilterExpectedNumberInsertions , bloomFilterErrorRate);
+ /* Don't need explicit synchronization to prevent slowness majority of the time*/
+ randomNumber = rand.nextInt();
+ }
+ }
}
diff --git a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/ElasticSearchSchemaService.java b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/ElasticSearchSchemaService.java
index 3f0a1f650..f104dd37c 100644
--- a/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/ElasticSearchSchemaService.java
+++ b/ArgusCore/src/main/java/com/salesforce/dva/argus/service/schema/ElasticSearchSchemaService.java
@@ -65,7 +65,7 @@
*/
@Singleton
public class ElasticSearchSchemaService extends AbstractSchemaService {
-
+
private static final String INDEX_NAME = "metadata_index";
private static final String TYPE_NAME = "metadata_type";
private static final String KEEP_SCROLL_CONTEXT_OPEN_FOR = "1m";
@@ -73,23 +73,23 @@ public class ElasticSearchSchemaService extends AbstractSchemaService {
private static final int MAX_RETRY_TIMEOUT = 300 * 1000;
private static final String FIELD_TYPE_TEXT = "text";
private static final String FIELD_TYPE_DATE ="date";
-
+
private final ObjectMapper _mapper;
- private Logger _logger = LoggerFactory.getLogger(getClass());
- private final MonitorService _monitorService;
- private final RestClient _esRestClient;
- private final int _replicationFactor;
+ private Logger _logger = LoggerFactory.getLogger(getClass());
+ private final MonitorService _monitorService;
+ private final RestClient _esRestClient;
+ private final int _replicationFactor;
private final int _numShards;
private final int _bulkIndexingSize;
private HashAlgorithm _idgenHashAlgo;
-
- @Inject
+
+ @Inject
public ElasticSearchSchemaService(SystemConfiguration config, MonitorService monitorService) {
super(config);
-
+
_monitorService = monitorService;
_mapper = _createObjectMapper();
-
+
String algorithm = config.getValue(Property.ELASTICSEARCH_IDGEN_HASH_ALGO.getName(), Property.ELASTICSEARCH_IDGEN_HASH_ALGO.getDefaultValue());
try {
_idgenHashAlgo = HashAlgorithm.fromString(algorithm);
@@ -97,21 +97,21 @@ public ElasticSearchSchemaService(SystemConfiguration config, MonitorService mon
_logger.warn("{} is not supported by this service. Valid values are: {}.", algorithm, Arrays.asList(HashAlgorithm.values()));
_idgenHashAlgo = HashAlgorithm.MD5;
}
-
+
_logger.info("Using {} for Elasticsearch document id generation.", _idgenHashAlgo);
-
+
_replicationFactor = Integer.parseInt(
config.getValue(Property.ELASTICSEARCH_NUM_REPLICAS.getName(), Property.ELASTICSEARCH_NUM_REPLICAS.getDefaultValue()));
-
+
_numShards = Integer.parseInt(
config.getValue(Property.ELASTICSEARCH_SHARDS_COUNT.getName(), Property.ELASTICSEARCH_SHARDS_COUNT.getDefaultValue()));
-
+
_bulkIndexingSize = Integer.parseInt(
config.getValue(Property.ELASTICSEARCH_INDEXING_BATCH_SIZE.getName(), Property.ELASTICSEARCH_INDEXING_BATCH_SIZE.getDefaultValue()));
-
+
String[] nodes = config.getValue(Property.ELASTICSEARCH_ENDPOINT.getName(), Property.ELASTICSEARCH_ENDPOINT.getDefaultValue()).split(",");
HttpHost[] httpHosts = new HttpHost[nodes.length];
-
+
for(int i=0; i metrics) {
SystemAssert.requireArgument(metrics != null, "Metrics list cannot be null.");
-
+
_logger.info("{} new metrics need to be indexed on ES.", metrics.size());
-
+
long start = System.currentTimeMillis();
List> fracturedList = _fracture(metrics);
-
+
for(List records : fracturedList) {
if(!records.isEmpty()) {
if(_syncPut) {
@@ -205,22 +205,22 @@ protected void implementationSpecificPut(List metrics) {
}
}
}
-
+
int count = 0;
for(List records : fracturedList) {
count += records.size();
}
-
+
_monitorService.modifyCounter(MonitorService.Counter.SCHEMARECORDS_WRITTEN, count, null);
_monitorService.modifyCounter(MonitorService.Counter.SCHEMARECORDS_WRITE_LATENCY, (System.currentTimeMillis() - start), null);
}
-
+
/* Convert the given list of metrics to a list of metric schema records. At the same time, fracture the records list
* if its size is greater than INDEXING_BATCH_SIZE.
*/
- private List> _fracture(List metrics) {
+ protected List> _fracture(List metrics) {
List> fracturedList = new ArrayList<>();
-
+
List records = new ArrayList<>(_bulkIndexingSize);
for(Metric metric : metrics) {
if(metric.getTags().isEmpty()) {
@@ -233,41 +233,41 @@ private List> _fracture(List metrics) {
}
continue;
}
-
+
for(Map.Entry entry : metric.getTags().entrySet()) {
records.add(new MetricSchemaRecord(metric.getNamespace(), metric.getScope(), metric.getMetric(),
- entry.getKey(), entry.getValue()));
+ entry.getKey(), entry.getValue()));
if(records.size() == _bulkIndexingSize) {
fracturedList.add(records);
records = new ArrayList<>(_bulkIndexingSize);
}
}
}
-
+
fracturedList.add(records);
return fracturedList;
}
-
+
@Override
public List get(MetricSchemaRecordQuery query) {
requireNotDisposed();
- SystemAssert.requireArgument(query != null, "MetricSchemaRecordQuery cannot be null.");
- long size = (long) query.getLimit() * query.getPage();
- SystemAssert.requireArgument(size > 0 && size <= Integer.MAX_VALUE,
- "(limit * page) must be greater than 0 and atmost Integer.MAX_VALUE");
-
-
- Map tags = new HashMap<>();
- tags.put("type", "REGEXP_WITHOUT_AGGREGATION");
- long start = System.currentTimeMillis();
- boolean scroll = false;
+ SystemAssert.requireArgument(query != null, "MetricSchemaRecordQuery cannot be null.");
+ long size = (long) query.getLimit() * query.getPage();
+ SystemAssert.requireArgument(size > 0 && size <= Integer.MAX_VALUE,
+ "(limit * page) must be greater than 0 and atmost Integer.MAX_VALUE");
+
+
+ Map tags = new HashMap<>();
+ tags.put("type", "REGEXP_WITHOUT_AGGREGATION");
+ long start = System.currentTimeMillis();
+ boolean scroll = false;
StringBuilder sb = new StringBuilder().append("/")
- .append(INDEX_NAME)
- .append("/")
- .append(TYPE_NAME)
- .append("/")
- .append("_search");
-
+ .append(INDEX_NAME)
+ .append("/")
+ .append(TYPE_NAME)
+ .append("/")
+ .append("_search");
+
int from = 0, scrollSize;
if(query.getLimit() * query.getPage() > 10000) {
sb.append("?scroll=").append(KEEP_SCROLL_CONTEXT_OPEN_FOR);
@@ -278,94 +278,94 @@ public List get(MetricSchemaRecordQuery query) {
from = query.getLimit() * (query.getPage() - 1);
scrollSize = query.getLimit();
}
-
+
String requestUrl = sb.toString();
String queryJson = _constructTermQuery(query, from, scrollSize);
-
+
try {
Response response = _esRestClient.performRequest(HttpMethod.POST.getName(), requestUrl, Collections.emptyMap(), new StringEntity(queryJson));
-
+
MetricSchemaRecordList list = toEntity(extractResponse(response), new TypeReference() {});
-
+
if(scroll) {
requestUrl = new StringBuilder().append("/").append("_search").append("/").append("scroll").toString();
List records = new LinkedList<>(list.getRecords());
-
+
while(true) {
String scrollID = list.getScrollID();
-
+
Map requestBody = new HashMap<>();
requestBody.put("scroll_id", scrollID);
requestBody.put("scroll", KEEP_SCROLL_CONTEXT_OPEN_FOR);
-
+
response = _esRestClient.performRequest(HttpMethod.POST.getName(), requestUrl, Collections.emptyMap(), new StringEntity(new ObjectMapper().writeValueAsString(requestBody)));
-
+
list = toEntity(extractResponse(response), new TypeReference() {});
records.addAll(list.getRecords());
-
+
if(records.size() >= query.getLimit() * query.getPage() || list.getRecords().size() < scrollSize) {
break;
}
}
-
+
int fromIndex = query.getLimit() * (query.getPage() - 1);
if(records.size() <= fromIndex) {
_monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_COUNT, 1, tags);
_monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_LATENCY, (System.currentTimeMillis() - start), tags);
return Collections.emptyList();
}
-
+
_monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_COUNT, 1, tags);
_monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_LATENCY, (System.currentTimeMillis() - start), tags);
return records.subList(fromIndex, records.size());
-
+
} else {
_monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_COUNT, 1, tags);
_monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_LATENCY, (System.currentTimeMillis() - start), tags);
return list.getRecords();
}
-
+
} catch (UnsupportedEncodingException | JsonProcessingException e) {
throw new SystemException("Search failed.", e);
} catch (IOException e) {
throw new SystemException("IOException when trying to perform ES request.", e);
}
}
-
+
@Override
public List getUnique(MetricSchemaRecordQuery query, RecordType type) {
requireNotDisposed();
SystemAssert.requireArgument(query != null, "MetricSchemaRecordQuery cannot be null.");
long size = (long) query.getLimit() * query.getPage();
- SystemAssert.requireArgument(size > 0 && size <= Integer.MAX_VALUE,
- "(limit * page) must be greater than 0 and atmost Integer.MAX_VALUE");
-
-
- Map tags = new HashMap<>();
- tags.put("type", "REGEXP_WITH_AGGREGATION");
- long start = System.currentTimeMillis();
+ SystemAssert.requireArgument(size > 0 && size <= Integer.MAX_VALUE,
+ "(limit * page) must be greater than 0 and atmost Integer.MAX_VALUE");
+
+
+ Map tags = new HashMap<>();
+ tags.put("type", "REGEXP_WITH_AGGREGATION");
+ long start = System.currentTimeMillis();
String requestUrl = new StringBuilder().append("/")
- .append(INDEX_NAME)
- .append("/")
- .append(TYPE_NAME)
- .append("/")
- .append("_search")
- .toString();
-
+ .append(INDEX_NAME)
+ .append("/")
+ .append(TYPE_NAME)
+ .append("/")
+ .append("_search")
+ .toString();
+
String queryJson = _constructTermAggregationQuery(query, type);
try {
Response response = _esRestClient.performRequest(HttpMethod.POST.getName(), requestUrl, Collections.emptyMap(), new StringEntity(queryJson));
String str = extractResponse(response);
List records = SchemaService.constructMetricSchemaRecordsForType(
- toEntity(str, new TypeReference>() {}), type);
-
+ toEntity(str, new TypeReference>() {}), type);
+
int fromIndex = query.getLimit() * (query.getPage() - 1);
if(records.size() <= fromIndex) {
_monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_COUNT, 1, tags);
_monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_LATENCY, (System.currentTimeMillis() - start), tags);
return Collections.emptyList();
}
-
+
if(records.size() < query.getLimit() * query.getPage()) {
_monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_COUNT, 1, tags);
_monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_LATENCY, (System.currentTimeMillis() - start), tags);
@@ -379,31 +379,31 @@ public List getUnique(MetricSchemaRecordQuery query, RecordT
throw new SystemException(e);
}
}
-
+
@Override
public List keywordSearch(KeywordQuery kq) {
requireNotDisposed();
- SystemAssert.requireArgument(kq != null, "Query cannot be null.");
- SystemAssert.requireArgument(kq.getQuery() != null || kq.getType() != null, "Either the query string or the type must not be null.");
-
- long size = (long) kq.getLimit() * kq.getPage();
- SystemAssert.requireArgument(size > 0 && size <= Integer.MAX_VALUE,
- "(limit * page) must be greater than 0 and atmost Integer.MAX_VALUE");
-
-
- Map tags = new HashMap<>();
- tags.put("type", "FTS_WITH_AGGREGATION");
- long start = System.currentTimeMillis();
+ SystemAssert.requireArgument(kq != null, "Query cannot be null.");
+ SystemAssert.requireArgument(kq.getQuery() != null || kq.getType() != null, "Either the query string or the type must not be null.");
+
+ long size = (long) kq.getLimit() * kq.getPage();
+ SystemAssert.requireArgument(size > 0 && size <= Integer.MAX_VALUE,
+ "(limit * page) must be greater than 0 and atmost Integer.MAX_VALUE");
+
+
+ Map tags = new HashMap<>();
+ tags.put("type", "FTS_WITH_AGGREGATION");
+ long start = System.currentTimeMillis();
StringBuilder sb = new StringBuilder().append("/")
- .append(INDEX_NAME)
- .append("/")
- .append(TYPE_NAME)
- .append("/")
- .append("_search");
+ .append(INDEX_NAME)
+ .append("/")
+ .append(TYPE_NAME)
+ .append("/")
+ .append("_search");
try {
-
+
if(kq.getQuery() != null) {
-
+
int from = 0, scrollSize = 0;
boolean scroll = false;;
if(kq.getLimit() * kq.getPage() > 10000) {
@@ -415,82 +415,82 @@ public List keywordSearch(KeywordQuery kq) {
from = kq.getLimit() * (kq.getPage() - 1);
scrollSize = kq.getLimit();
}
-
+
List tokens = _analyzedTokens(kq.getQuery());
String queryJson = _constructQueryStringQuery(tokens, from, scrollSize);
String requestUrl = sb.toString();
-
+
Response response = _esRestClient.performRequest(HttpMethod.POST.getName(), requestUrl, Collections.emptyMap(), new StringEntity(queryJson));
String strResponse = extractResponse(response);
MetricSchemaRecordList list = toEntity(strResponse, new TypeReference() {});
-
+
if(scroll) {
requestUrl = new StringBuilder().append("/").append("_search").append("/").append("scroll").toString();
List records = new LinkedList<>(list.getRecords());
-
+
while(true) {
Map requestBody = new HashMap<>();
requestBody.put("scroll_id", list.getScrollID());
requestBody.put("scroll", KEEP_SCROLL_CONTEXT_OPEN_FOR);
-
+
response = _esRestClient.performRequest(HttpMethod.POST.getName(), requestUrl, Collections.emptyMap(),
new StringEntity(new ObjectMapper().writeValueAsString(requestBody)));
-
+
list = toEntity(extractResponse(response), new TypeReference() {});
-
+
records.addAll(list.getRecords());
-
+
if(records.size() >= kq.getLimit() * kq.getPage() || list.getRecords().size() < scrollSize) {
break;
}
}
-
+
int fromIndex = kq.getLimit() * (kq.getPage() - 1);
if(records.size() <= fromIndex) {
_monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_COUNT, 1, tags);
_monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_LATENCY, (System.currentTimeMillis() - start), tags);
return Collections.emptyList();
}
-
+
_monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_COUNT, 1, tags);
_monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_LATENCY, (System.currentTimeMillis() - start), tags);
return records.subList(fromIndex, records.size());
-
+
} else {
_monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_COUNT, 1, tags);
_monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_LATENCY, (System.currentTimeMillis() - start), tags);
return list.getRecords();
}
-
-
+
+
} else {
Map> tokensMap = new HashMap<>();
-
+
List tokens = _analyzedTokens(kq.getScope());
if(!tokens.isEmpty()) {
tokensMap.put(RecordType.SCOPE, tokens);
}
-
+
tokens = _analyzedTokens(kq.getMetric());
if(!tokens.isEmpty()) {
tokensMap.put(RecordType.METRIC, tokens);
}
-
+
tokens = _analyzedTokens(kq.getTagKey());
if(!tokens.isEmpty()) {
tokensMap.put(RecordType.TAGK, tokens);
}
-
+
tokens = _analyzedTokens(kq.getTagValue());
if(!tokens.isEmpty()) {
tokensMap.put(RecordType.TAGV, tokens);
}
-
+
tokens = _analyzedTokens(kq.getNamespace());
if(!tokens.isEmpty()) {
tokensMap.put(RecordType.NAMESPACE, tokens);
}
-
+
String queryJson = _constructQueryStringQuery(kq, tokensMap);
String requestUrl = sb.toString();
Response response = _esRestClient.performRequest(HttpMethod.POST.getName(), requestUrl, Collections.emptyMap(), new StringEntity(queryJson));
@@ -498,14 +498,14 @@ public List keywordSearch(KeywordQuery kq) {
List records = SchemaService.constructMetricSchemaRecordsForType(
toEntity(strResponse, new TypeReference>() {}), kq.getType());
-
+
int fromIndex = kq.getLimit() * (kq.getPage() - 1);
if(records.size() <= fromIndex) {
_monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_COUNT, 1, tags);
_monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_LATENCY, (System.currentTimeMillis() - start), tags);
return Collections.emptyList();
}
-
+
if(records.size() < kq.getLimit() * kq.getPage()) {
_monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_COUNT, 1, tags);
_monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_LATENCY, (System.currentTimeMillis() - start), tags);
@@ -515,27 +515,27 @@ public List keywordSearch(KeywordQuery kq) {
_monitorService.modifyCounter(Counter.SCHEMARECORDS_QUERY_LATENCY, (System.currentTimeMillis() - start), tags);
return records.subList(fromIndex, kq.getLimit() * kq.getPage());
}
-
+
}
-
+
} catch (IOException e) {
throw new SystemException(e);
}
}
-
-
+
+
private List _analyzedTokens(String query) {
-
+
if(!SchemaService.containsFilter(query)) {
return Collections.emptyList();
}
-
+
List tokens = new ArrayList<>();
-
+
String requestUrl = new StringBuilder("/").append(INDEX_NAME).append("/_analyze").toString();
-
+
String requestBody = "{\"analyzer\" : \"metadata_analyzer\", \"text\": \"" + query + "\" }";
-
+
try {
Response response = _esRestClient.performRequest(HttpMethod.POST.getName(), requestUrl, Collections.emptyMap(), new StringEntity(requestBody));
String strResponse = extractResponse(response);
@@ -545,7 +545,7 @@ private List _analyzedTokens(String query) {
tokens.add(tokenNode.get("token").asText());
}
}
-
+
return tokens;
} catch (IOException e) {
throw new SystemException(e);
@@ -554,17 +554,17 @@ private List _analyzedTokens(String query) {
private void _upsert(List records) {
-
+
String requestUrl = new StringBuilder().append("/")
- .append(INDEX_NAME)
- .append("/")
- .append(TYPE_NAME)
- .append("/")
- .append("_bulk")
- .toString();
-
+ .append(INDEX_NAME)
+ .append("/")
+ .append(TYPE_NAME)
+ .append("/")
+ .append("_bulk")
+ .toString();
+
String strResponse = "";
-
+
MetricSchemaRecordList msrList = new MetricSchemaRecordList(records, _idgenHashAlgo);
try {
String requestBody = _mapper.writeValueAsString(msrList);
@@ -572,10 +572,9 @@ private void _upsert(List records) {
strResponse = extractResponse(response);
} catch (IOException e) {
//TODO: Retry with exponential back-off for handling EsRejectedExecutionException/RemoteTransportException/TimeoutException??
- _removeFromTrie(records);
throw new SystemException(e);
}
-
+
try {
PutResponse putResponse = new ObjectMapper().readValue(strResponse, PutResponse.class);
//TODO: If response contains HTTP 429 Too Many Requests (EsRejectedExecutionException), then retry with exponential back-off.
@@ -586,42 +585,45 @@ private void _upsert(List records) {
_logger.warn("Failed to index metric. Reason: " + new ObjectMapper().writeValueAsString(item.create.error));
recordsToRemove.add(msrList.getRecord(item.create._id));
}
-
+
if(item.index != null && item.index.status == HttpStatus.SC_NOT_FOUND) {
_logger.warn("Index does not exist. Error: " + new ObjectMapper().writeValueAsString(item.index.error));
recordsToRemove.add(msrList.getRecord(item.index._id));
}
}
- _removeFromTrie(recordsToRemove);
+ if(recordsToRemove.size() != 0) {
+ _logger.info("{} records were not written to ES", recordsToRemove.size());
+ records.removeAll(recordsToRemove);
+ }
}
+ //add to bloom filter
+ _addToBloomFilter(records);
} catch(IOException e) {
- _removeFromTrie(records);
throw new SystemException("Failed to parse reponse of put metrics. The response was: " + strResponse, e);
}
}
-
+
private void _upsertAsync(List records) {
-
+
String requestUrl = new StringBuilder().append("/")
- .append(INDEX_NAME)
- .append("/")
- .append(TYPE_NAME)
- .append("/")
- .append("_bulk")
- .toString();
-
+ .append(INDEX_NAME)
+ .append("/")
+ .append(TYPE_NAME)
+ .append("/")
+ .append("_bulk")
+ .toString();
+
MetricSchemaRecordList msrList = new MetricSchemaRecordList(records, _idgenHashAlgo);
StringEntity entity;
try {
String requestBody = _mapper.writeValueAsString(msrList);
entity = new StringEntity(requestBody);
} catch (JsonProcessingException | UnsupportedEncodingException e) {
- _removeFromTrie(records);
throw new SystemException("Failed to parse metrics to schema records when indexing.", e);
}
-
+
ResponseListener responseListener = new ResponseListener() {
-
+
@Override
public void onSuccess(Response response) {
String strResponse = extractResponse(response);
@@ -635,84 +637,87 @@ public void onSuccess(Response response) {
_logger.warn("Failed to index metric. Reason: " + new ObjectMapper().writeValueAsString(item.create.error));
recordsToRemove.add(msrList.getRecord(item.create._id));
}
-
+
if(item.index != null && item.index.status == HttpStatus.SC_NOT_FOUND) {
_logger.warn("Index does not exist. Error: " + new ObjectMapper().writeValueAsString(item.index.error));
recordsToRemove.add(msrList.getRecord(item.index._id));
}
}
- _removeFromTrie(recordsToRemove);
- }
+ if(recordsToRemove.size() != 0) {
+ _logger.info("{} records were not written to ES", recordsToRemove.size());
+ records.removeAll(recordsToRemove);
+ }
+ }
+ //add to bloom filter
+ _addToBloomFilter(records);
} catch(IOException e) {
- _removeFromTrie(records);
_logger.warn("Failed to parse reponse of put metrics. The response was: " + strResponse, e);
}
}
-
+
@Override
public void onFailure(Exception e) {
//TODO: Retry with exponential back-off for handling EsRejectedExecutionException/RemoteTransportException/TimeoutException??
- _removeFromTrie(records);
_logger.warn("Failed to execute the indexing request.", e);
}
};
-
+
_esRestClient.performRequestAsync(HttpMethod.POST.getName(), requestUrl, Collections.emptyMap(), entity, responseListener);
}
-
- private void _removeFromTrie(List records) {
- _logger.info("Removing {} records from trie.", records.size());
- for(MetricSchemaRecord record : records) {
- String key = constructTrieKey(record.getScope(), record.getMetric(), record.getTagKey(), record.getTagValue(), record.getNamespace());
- TRIE.remove(key);
- }
+
+ protected void _addToBloomFilter(List records){
+ _logger.info("Adding {} records into bloom filter.", records.size());
+ for(MetricSchemaRecord record : records) {
+ String key = constructKey(record.getScope(), record.getMetric(), record.getTagKey(), record.getTagValue(), record.getNamespace());
+ bloomFilter.put(key);
+ }
}
-
+
private String _constructTermAggregationQuery(MetricSchemaRecordQuery query, RecordType type) {
ObjectMapper mapper = new ObjectMapper();
ObjectNode queryNode = _constructQueryNode(query, mapper);
-
+
long size = query.getLimit() * query.getPage();
SystemAssert.requireArgument(size > 0 && size <= Integer.MAX_VALUE,
"(limit * page) must be greater than 0 and less than Integer.MAX_VALUE");
-
+
ObjectNode aggsNode = _constructAggsNode(type, Math.max(size, 10000), mapper);
-
+
ObjectNode rootNode = mapper.createObjectNode();
rootNode.put("query", queryNode);
rootNode.put("size", 0);
rootNode.put("aggs", aggsNode);
-
+
return rootNode.toString();
}
-
+
private String _constructTermQuery(MetricSchemaRecordQuery query, int from, int size) {
ObjectMapper mapper = new ObjectMapper();
-
+
ObjectNode queryNode = _constructQueryNode(query, mapper);
-
+
ObjectNode rootNode = _mapper.createObjectNode();
rootNode.put("query", queryNode);
rootNode.put("from", from);
rootNode.put("size", size);
-
+
return rootNode.toString();
}
-
+
private ObjectNode _constructSimpleQueryStringNode(List tokens, RecordType... types) {
-
+
if(tokens.isEmpty()) {
return null;
}
-
+
ObjectMapper mapper = new ObjectMapper();
-
+
StringBuilder queryString = new StringBuilder();
for(String token : tokens) {
queryString.append('+').append(token).append(' ');
}
queryString.replace(queryString.length() - 1, queryString.length(), "*");
-
+
ObjectNode node = mapper.createObjectNode();
ArrayNode fieldsNode = mapper.createArrayNode();
for(RecordType type : types) {
@@ -720,54 +725,54 @@ private ObjectNode _constructSimpleQueryStringNode(List tokens, RecordTy
}
node.put("fields", fieldsNode);
node.put("query", queryString.toString());
-
+
ObjectNode simpleQueryStringNode = mapper.createObjectNode();
simpleQueryStringNode.put("simple_query_string", node);
-
+
return simpleQueryStringNode;
}
-
+
private String _constructQueryStringQuery(List tokens, int from, int size) {
ObjectMapper mapper = new ObjectMapper();
-
+
ObjectNode simpleQueryStringNode = _constructSimpleQueryStringNode(tokens, RecordType.values());
-
+
ObjectNode rootNode = mapper.createObjectNode();
rootNode.put("query", simpleQueryStringNode);
rootNode.put("from", from);
rootNode.put("size", size);
-
+
return rootNode.toString();
}
-
+
private String _constructQueryStringQuery(KeywordQuery kq, Map> tokensMap) {
ObjectMapper mapper = new ObjectMapper();
-
+
ArrayNode filterNodes = mapper.createArrayNode();
for(Map.Entry> entry : tokensMap.entrySet()) {
ObjectNode simpleQueryStringNode = _constructSimpleQueryStringNode(entry.getValue(), entry.getKey());
filterNodes.add(simpleQueryStringNode);
}
-
+
ObjectNode boolNode = mapper.createObjectNode();
boolNode.put("filter", filterNodes);
-
+
ObjectNode queryNode = mapper.createObjectNode();
queryNode.put("bool", boolNode);
-
+
ObjectNode rootNode = mapper.createObjectNode();
rootNode.put("query", queryNode);
rootNode.put("size", 0);
-
+
long size = kq.getLimit() * kq.getPage();
SystemAssert.requireArgument(size > 0 && size <= Integer.MAX_VALUE,
"(limit * page) must be greater than 0 and less than Integer.MAX_VALUE");
rootNode.put("aggs", _constructAggsNode(kq.getType(), Math.max(size, 10000), mapper));
-
+
return rootNode.toString();
-
+
}
-
+
private ObjectNode _constructQueryNode(MetricSchemaRecordQuery query, ObjectMapper mapper) {
ArrayNode filterNodes = mapper.createArrayNode();
if(SchemaService.containsFilter(query.getMetric())) {
@@ -777,7 +782,7 @@ private ObjectNode _constructQueryNode(MetricSchemaRecordQuery query, ObjectMapp
node.put("regexp", regexpNode);
filterNodes.add(node);
}
-
+
if(SchemaService.containsFilter(query.getScope())) {
ObjectNode node = mapper.createObjectNode();
ObjectNode regexpNode = mapper.createObjectNode();
@@ -785,7 +790,7 @@ private ObjectNode _constructQueryNode(MetricSchemaRecordQuery query, ObjectMapp
node.put("regexp", regexpNode);
filterNodes.add(node);
}
-
+
if(SchemaService.containsFilter(query.getTagKey())) {
ObjectNode node = mapper.createObjectNode();
ObjectNode regexpNode = mapper.createObjectNode();
@@ -793,7 +798,7 @@ private ObjectNode _constructQueryNode(MetricSchemaRecordQuery query, ObjectMapp
node.put("regexp", regexpNode);
filterNodes.add(node);
}
-
+
if(SchemaService.containsFilter(query.getTagValue())) {
ObjectNode node = mapper.createObjectNode();
ObjectNode regexpNode = mapper.createObjectNode();
@@ -801,7 +806,7 @@ private ObjectNode _constructQueryNode(MetricSchemaRecordQuery query, ObjectMapp
node.put("regexp", regexpNode);
filterNodes.add(node);
}
-
+
if(SchemaService.containsFilter(query.getNamespace())) {
ObjectNode node = mapper.createObjectNode();
ObjectNode regexpNode = mapper.createObjectNode();
@@ -809,186 +814,186 @@ private ObjectNode _constructQueryNode(MetricSchemaRecordQuery query, ObjectMapp
node.put("regexp", regexpNode);
filterNodes.add(node);
}
-
+
ObjectNode boolNode = mapper.createObjectNode();
boolNode.put("filter", filterNodes);
-
+
ObjectNode queryNode = mapper.createObjectNode();
queryNode.put("bool", boolNode);
return queryNode;
}
-
-
+
+
private ObjectNode _constructAggsNode(RecordType type, long limit, ObjectMapper mapper) {
-
+
ObjectNode termsNode = mapper.createObjectNode();
termsNode.put("field", type.getName() + ".raw");
termsNode.put("order", mapper.createObjectNode().put("_term", "asc"));
termsNode.put("size", limit);
termsNode.put("execution_hint", "map");
-
+
ObjectNode distinctValuesNode = mapper.createObjectNode();
distinctValuesNode.put("terms", termsNode);
-
+
ObjectNode aggsNode = mapper.createObjectNode();
aggsNode.put("distinct_values", distinctValuesNode);
return aggsNode;
}
-
-
+
+
/* Helper method to convert JSON String representation to the corresponding Java entity. */
- private T toEntity(String content, TypeReference type) {
- try {
- return _mapper.readValue(content, type);
- } catch (IOException ex) {
- throw new SystemException(ex);
- }
- }
-
-
+ private T toEntity(String content, TypeReference type) {
+ try {
+ return _mapper.readValue(content, type);
+ } catch (IOException ex) {
+ throw new SystemException(ex);
+ }
+ }
+
+
/** Helper to process the response.
* Throws a SystemException when the http status code is outsdie of the range 200 - 300.
*/
- private String extractResponse(Response response) {
- requireArgument(response != null, "HttpResponse object cannot be null.");
-
- int status = response.getStatusLine().getStatusCode();
- String strResponse = extractStringResponse(response);
-
- if ((status < HttpStatus.SC_OK) || (status >= HttpStatus.SC_MULTIPLE_CHOICES)) {
- throw new SystemException("Status code: " + status + " . Error occurred. " + strResponse);
- } else {
- return strResponse;
- }
- }
-
- private String extractStringResponse(Response content) {
- requireArgument(content != null, "Response content is null.");
-
- String result;
- HttpEntity entity = null;
-
- try {
- entity = content.getEntity();
- if (entity == null) {
- result = "";
- } else {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
- entity.writeTo(baos);
- result = baos.toString("UTF-8");
- }
- return result;
- } catch (IOException ex) {
- throw new SystemException(ex);
- } finally {
- if (entity != null) {
- try {
- EntityUtils.consume(entity);
- } catch (IOException ex) {
- _logger.warn("Failed to close entity stream.", ex);
- }
- }
- }
- }
-
+ private String extractResponse(Response response) {
+ requireArgument(response != null, "HttpResponse object cannot be null.");
+
+ int status = response.getStatusLine().getStatusCode();
+ String strResponse = extractStringResponse(response);
+
+ if ((status < HttpStatus.SC_OK) || (status >= HttpStatus.SC_MULTIPLE_CHOICES)) {
+ throw new SystemException("Status code: " + status + " . Error occurred. " + strResponse);
+ } else {
+ return strResponse;
+ }
+ }
+
+ private String extractStringResponse(Response content) {
+ requireArgument(content != null, "Response content is null.");
+
+ String result;
+ HttpEntity entity = null;
+
+ try {
+ entity = content.getEntity();
+ if (entity == null) {
+ result = "";
+ } else {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ entity.writeTo(baos);
+ result = baos.toString("UTF-8");
+ }
+ return result;
+ } catch (IOException ex) {
+ throw new SystemException(ex);
+ } finally {
+ if (entity != null) {
+ try {
+ EntityUtils.consume(entity);
+ } catch (IOException ex) {
+ _logger.warn("Failed to close entity stream.", ex);
+ }
+ }
+ }
+ }
+
private ObjectMapper _createObjectMapper() {
ObjectMapper mapper = new ObjectMapper();
-
+
mapper.setSerializationInclusion(Include.NON_NULL);
SimpleModule module = new SimpleModule();
module.addSerializer(MetricSchemaRecordList.class, new MetricSchemaRecordList.Serializer());
module.addDeserializer(MetricSchemaRecordList.class, new MetricSchemaRecordList.Deserializer());
module.addDeserializer(List.class, new MetricSchemaRecordList.AggDeserializer());
mapper.registerModule(module);
-
+
return mapper;
}
-
-
- private ObjectNode _createSettingsNode() {
- ObjectMapper mapper = new ObjectMapper();
-
- ObjectNode metadataAnalyzer = mapper.createObjectNode();
- metadataAnalyzer.put("tokenizer", "metadata_tokenizer");
- metadataAnalyzer.put("filter", mapper.createArrayNode().add("lowercase"));
-
- ObjectNode analyzerNode = mapper.createObjectNode();
- analyzerNode.put("metadata_analyzer", metadataAnalyzer);
-
- ObjectNode tokenizerNode = mapper.createObjectNode();
- tokenizerNode.put("metadata_tokenizer", mapper.createObjectNode().put("type", "pattern").put("pattern", "([^\\p{L}\\d]+)|(?<=[\\p{L}&&[^\\p{Lu}]])(?=\\p{Lu})|(?<=\\p{Lu})(?=\\p{Lu}[\\p{L}&&[^\\p{Lu}]])"));
-
- ObjectNode analysisNode = mapper.createObjectNode();
- analysisNode.put("analyzer", analyzerNode);
- analysisNode.put("tokenizer", tokenizerNode);
-
- ObjectNode indexNode = mapper.createObjectNode();
- indexNode.put("max_result_window", INDEX_MAX_RESULT_WINDOW);
- indexNode.put("number_of_replicas", _replicationFactor);
- indexNode.put("number_of_shards", _numShards);
-
- ObjectNode settingsNode = mapper.createObjectNode();
- settingsNode.put("analysis", analysisNode);
- settingsNode.put("index", indexNode);
-
- return settingsNode;
-
- }
-
-
- private ObjectNode _createMappingsNode() {
- ObjectMapper mapper = new ObjectMapper();
-
- ObjectNode propertiesNode = mapper.createObjectNode();
- propertiesNode.put(RecordType.SCOPE.getName(), _createFieldNode(FIELD_TYPE_TEXT));
- propertiesNode.put(RecordType.METRIC.getName(), _createFieldNode(FIELD_TYPE_TEXT));
- propertiesNode.put(RecordType.TAGK.getName(), _createFieldNode(FIELD_TYPE_TEXT));
- propertiesNode.put(RecordType.TAGV.getName(), _createFieldNode(FIELD_TYPE_TEXT));
- propertiesNode.put(RecordType.NAMESPACE.getName(), _createFieldNode(FIELD_TYPE_TEXT));
-
- propertiesNode.put("mts", _createFieldNode(FIELD_TYPE_DATE));
-
- ObjectNode typeNode = mapper.createObjectNode();
- typeNode.put("properties", propertiesNode);
-
- ObjectNode mappingsNode = mapper.createObjectNode();
- mappingsNode.put(TYPE_NAME, typeNode);
-
- return mappingsNode;
- }
-
-
- private ObjectNode _createFieldNode(String type) {
- ObjectMapper mapper = new ObjectMapper();
-
- ObjectNode fieldNode = mapper.createObjectNode();
- fieldNode.put("type", type);
- fieldNode.put("analyzer", "metadata_analyzer");
- ObjectNode keywordNode = mapper.createObjectNode();
- keywordNode.put("type", "keyword");
- ObjectNode fieldsNode = mapper.createObjectNode();
- fieldsNode.put("raw", keywordNode);
- fieldNode.put("fields", fieldsNode);
- return fieldNode;
- }
-
+
+
+ private ObjectNode _createSettingsNode() {
+ ObjectMapper mapper = new ObjectMapper();
+
+ ObjectNode metadataAnalyzer = mapper.createObjectNode();
+ metadataAnalyzer.put("tokenizer", "metadata_tokenizer");
+ metadataAnalyzer.put("filter", mapper.createArrayNode().add("lowercase"));
+
+ ObjectNode analyzerNode = mapper.createObjectNode();
+ analyzerNode.put("metadata_analyzer", metadataAnalyzer);
+
+ ObjectNode tokenizerNode = mapper.createObjectNode();
+ tokenizerNode.put("metadata_tokenizer", mapper.createObjectNode().put("type", "pattern").put("pattern", "([^\\p{L}\\d]+)|(?<=[\\p{L}&&[^\\p{Lu}]])(?=\\p{Lu})|(?<=\\p{Lu})(?=\\p{Lu}[\\p{L}&&[^\\p{Lu}]])"));
+
+ ObjectNode analysisNode = mapper.createObjectNode();
+ analysisNode.put("analyzer", analyzerNode);
+ analysisNode.put("tokenizer", tokenizerNode);
+
+ ObjectNode indexNode = mapper.createObjectNode();
+ indexNode.put("max_result_window", INDEX_MAX_RESULT_WINDOW);
+ indexNode.put("number_of_replicas", _replicationFactor);
+ indexNode.put("number_of_shards", _numShards);
+
+ ObjectNode settingsNode = mapper.createObjectNode();
+ settingsNode.put("analysis", analysisNode);
+ settingsNode.put("index", indexNode);
+
+ return settingsNode;
+
+ }
+
+
+ private ObjectNode _createMappingsNode() {
+ ObjectMapper mapper = new ObjectMapper();
+
+ ObjectNode propertiesNode = mapper.createObjectNode();
+ propertiesNode.put(RecordType.SCOPE.getName(), _createFieldNode(FIELD_TYPE_TEXT));
+ propertiesNode.put(RecordType.METRIC.getName(), _createFieldNode(FIELD_TYPE_TEXT));
+ propertiesNode.put(RecordType.TAGK.getName(), _createFieldNode(FIELD_TYPE_TEXT));
+ propertiesNode.put(RecordType.TAGV.getName(), _createFieldNode(FIELD_TYPE_TEXT));
+ propertiesNode.put(RecordType.NAMESPACE.getName(), _createFieldNode(FIELD_TYPE_TEXT));
+
+ propertiesNode.put("mts", _createFieldNode(FIELD_TYPE_DATE));
+
+ ObjectNode typeNode = mapper.createObjectNode();
+ typeNode.put("properties", propertiesNode);
+
+ ObjectNode mappingsNode = mapper.createObjectNode();
+ mappingsNode.put(TYPE_NAME, typeNode);
+
+ return mappingsNode;
+ }
+
+
+ private ObjectNode _createFieldNode(String type) {
+ ObjectMapper mapper = new ObjectMapper();
+
+ ObjectNode fieldNode = mapper.createObjectNode();
+ fieldNode.put("type", type);
+ fieldNode.put("analyzer", "metadata_analyzer");
+ ObjectNode keywordNode = mapper.createObjectNode();
+ keywordNode.put("type", "keyword");
+ ObjectNode fieldsNode = mapper.createObjectNode();
+ fieldsNode.put("raw", keywordNode);
+ fieldNode.put("fields", fieldsNode);
+ return fieldNode;
+ }
+
private void _createIndexIfNotExists() {
try {
Response response = _esRestClient.performRequest(HttpMethod.HEAD.getName(), "/" + INDEX_NAME);
boolean indexExists = response.getStatusLine().getStatusCode() == HttpStatus.SC_OK ? true : false;
-
+
if(!indexExists) {
_logger.info("Index [" + INDEX_NAME + "] does not exist. Will create one.");
ObjectMapper mapper = new ObjectMapper();
-
+
ObjectNode rootNode = mapper.createObjectNode();
rootNode.put("settings", _createSettingsNode());
rootNode.put("mappings", _createMappingsNode());
-
+
String settingsAndMappingsJson = rootNode.toString();
String requestUrl = new StringBuilder().append("/").append(INDEX_NAME).toString();
-
+
response = _esRestClient.performRequest(HttpMethod.PUT.getName(), requestUrl, Collections.emptyMap(), new StringEntity(settingsAndMappingsJson));
extractResponse(response);
}
@@ -996,26 +1001,26 @@ private void _createIndexIfNotExists() {
_logger.error("Failed to check/create elasticsearch index. ElasticSearchSchemaService may not function.", e);
}
}
-
- /**
- * Enumeration of supported HTTP methods.
- *
- * @author Bhinav Sura (bhinav.sura@salesforce.com)
- */
- private enum HttpMethod {
-
- /** POST operation. */
- POST("POST"),
- /** PUT operation. */
- PUT("PUT"),
- /** HEAD operation. */
- HEAD("HEAD");
-
- private String name;
-
- HttpMethod(String name) {
- this.setName(name);
- }
+
+ /**
+ * Enumeration of supported HTTP methods.
+ *
+ * @author Bhinav Sura (bhinav.sura@salesforce.com)
+ */
+ private enum HttpMethod {
+
+ /** POST operation. */
+ POST("POST"),
+ /** PUT operation. */
+ PUT("PUT"),
+ /** HEAD operation. */
+ HEAD("HEAD");
+
+ private String name;
+
+ HttpMethod(String name) {
+ this.setName(name);
+ }
public String getName() {
return name;
@@ -1024,70 +1029,70 @@ public String getName() {
public void setName(String name) {
this.name = name;
}
- }
-
-
+ }
+
+
/**
- * The set of implementation specific configuration properties.
- *
- * @author Bhinav Sura (bhinav.sura@salesforce.com)
- */
- public enum Property {
-
- ELASTICSEARCH_ENDPOINT("service.property.schema.elasticsearch.endpoint", "http://localhost:9200,http://localhost:9201"),
- /** Connection timeout for ES REST client. */
- ELASTICSEARCH_ENDPOINT_CONNECTION_TIMEOUT("service.property.schema.elasticsearch.endpoint.connection.timeout", "10000"),
- /** Socket connection timeout for ES REST client. */
- ELASTICSEARCH_ENDPOINT_SOCKET_TIMEOUT("service.property.schema.elasticsearch.endpoint.socket.timeout", "10000"),
- /** Connection count for ES REST client. */
- ELASTICSEARCH_CONNECTION_COUNT("service.property.schema.elasticsearch.connection.count", "10"),
- /** Replication factor for metadata_index. */
- ELASTICSEARCH_NUM_REPLICAS("service.property.schema.elasticsearch.num.replicas", "1"),
- /** Shard count for metadata_index. */
- ELASTICSEARCH_SHARDS_COUNT("service.property.schema.elasticsearch.shards.count", "10"),
- /** The no. of records to batch for bulk indexing requests.
- * https://www.elastic.co/guide/en/elasticsearch/guide/current/indexing-performance.html#_using_and_sizing_bulk_requests
- */
- ELASTICSEARCH_INDEXING_BATCH_SIZE("service.property.schema.elasticsearch.indexing.batch.size", "10000"),
- /** The hashing algorithm to use for generating document id. */
- ELASTICSEARCH_IDGEN_HASH_ALGO("service.property.schema.elasticsearch.idgen.hash.algo", "MD5");
-
- private final String _name;
- private final String _defaultValue;
-
- private Property(String name, String defaultValue) {
- _name = name;
- _defaultValue = defaultValue;
- }
-
- /**
- * Returns the property name.
- *
- * @return The property name.
- */
- public String getName() {
- return _name;
- }
-
- /**
- * Returns the default value for the property.
- *
- * @return The default value.
- */
- public String getDefaultValue() {
- return _defaultValue;
- }
- }
-
-
- static class PutResponse {
- private int took;
- private boolean errors;
- private List- items;
-
- public PutResponse() {}
-
- public int getTook() {
+ * The set of implementation specific configuration properties.
+ *
+ * @author Bhinav Sura (bhinav.sura@salesforce.com)
+ */
+ public enum Property {
+
+ ELASTICSEARCH_ENDPOINT("service.property.schema.elasticsearch.endpoint", "http://localhost:9200,http://localhost:9201"),
+ /** Connection timeout for ES REST client. */
+ ELASTICSEARCH_ENDPOINT_CONNECTION_TIMEOUT("service.property.schema.elasticsearch.endpoint.connection.timeout", "10000"),
+ /** Socket connection timeout for ES REST client. */
+ ELASTICSEARCH_ENDPOINT_SOCKET_TIMEOUT("service.property.schema.elasticsearch.endpoint.socket.timeout", "10000"),
+ /** Connection count for ES REST client. */
+ ELASTICSEARCH_CONNECTION_COUNT("service.property.schema.elasticsearch.connection.count", "10"),
+ /** Replication factor for metadata_index. */
+ ELASTICSEARCH_NUM_REPLICAS("service.property.schema.elasticsearch.num.replicas", "1"),
+ /** Shard count for metadata_index. */
+ ELASTICSEARCH_SHARDS_COUNT("service.property.schema.elasticsearch.shards.count", "10"),
+ /** The no. of records to batch for bulk indexing requests.
+ * https://www.elastic.co/guide/en/elasticsearch/guide/current/indexing-performance.html#_using_and_sizing_bulk_requests
+ */
+ ELASTICSEARCH_INDEXING_BATCH_SIZE("service.property.schema.elasticsearch.indexing.batch.size", "10000"),
+ /** The hashing algorithm to use for generating document id. */
+ ELASTICSEARCH_IDGEN_HASH_ALGO("service.property.schema.elasticsearch.idgen.hash.algo", "MD5");
+
+ private final String _name;
+ private final String _defaultValue;
+
+ private Property(String name, String defaultValue) {
+ _name = name;
+ _defaultValue = defaultValue;
+ }
+
+ /**
+ * Returns the property name.
+ *
+ * @return The property name.
+ */
+ public String getName() {
+ return _name;
+ }
+
+ /**
+ * Returns the default value for the property.
+ *
+ * @return The default value.
+ */
+ public String getDefaultValue() {
+ return _defaultValue;
+ }
+ }
+
+
+ static class PutResponse {
+ private int took;
+ private boolean errors;
+ private List
- items;
+
+ public PutResponse() {}
+
+ public int getTook() {
return took;
}
@@ -1113,10 +1118,10 @@ public void setItems(List
- items) {
@JsonIgnoreProperties(ignoreUnknown = true)
static class Item {
- private CreateItem create;
- private CreateItem index;
-
- public Item() {}
+ private CreateItem create;
+ private CreateItem index;
+
+ public Item() {}
public CreateItem getCreate() {
return create;
@@ -1125,7 +1130,7 @@ public CreateItem getCreate() {
public void setCreate(CreateItem create) {
this.create = create;
}
-
+
public CreateItem getIndex() {
return index;
}
@@ -1133,46 +1138,46 @@ public CreateItem getIndex() {
public void setIndex(CreateItem index) {
this.index = index;
}
- }
-
+ }
+
@JsonIgnoreProperties(ignoreUnknown = true)
- static class CreateItem {
- private String _index;
- private String _type;
- private String _id;
- private int status;
- private Error error;
-
- public CreateItem() {}
-
+ static class CreateItem {
+ private String _index;
+ private String _type;
+ private String _id;
+ private int status;
+ private Error error;
+
+ public CreateItem() {}
+
public String get_index() {
return _index;
}
-
+
public void set_index(String _index) {
this._index = _index;
}
-
+
public String get_type() {
return _type;
}
-
+
public void set_type(String _type) {
this._type = _type;
}
-
+
public String get_id() {
return _id;
}
-
+
public void set_id(String _id) {
this._id = _id;
}
-
+
public int getStatus() {
return status;
}
-
+
public void setStatus(int status) {
this.status = status;
}
@@ -1184,13 +1189,13 @@ public Error getError() {
public void setError(Error error) {
this.error = error;
}
- }
-
+ }
+
@JsonIgnoreProperties(ignoreUnknown = true)
static class Error {
private String type;
private String reason;
-
+
public Error() {}
public String getType() {
@@ -1209,6 +1214,6 @@ public void setReason(String reason) {
this.reason = reason;
}
}
- }
+ }
}
diff --git a/ArgusCore/src/test/java/com/salesforce/dva/argus/service/schema/AbstractSchemaServiceTest.java b/ArgusCore/src/test/java/com/salesforce/dva/argus/service/schema/AbstractSchemaServiceTest.java
index 55293b376..dc6d52bf1 100644
--- a/ArgusCore/src/test/java/com/salesforce/dva/argus/service/schema/AbstractSchemaServiceTest.java
+++ b/ArgusCore/src/test/java/com/salesforce/dva/argus/service/schema/AbstractSchemaServiceTest.java
@@ -3,7 +3,10 @@
import static org.junit.Assert.assertTrue;
import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.util.ArrayList;
+import java.util.Calendar;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -16,11 +19,12 @@
import com.salesforce.dva.argus.AbstractTest;
import com.salesforce.dva.argus.entity.Metric;
+import com.salesforce.dva.argus.entity.MetricSchemaRecord;
import com.salesforce.dva.argus.service.schema.ElasticSearchSchemaService;
/**
- * This test suite tests the trie-based caching in the AbstractSchemaService class. Although we are instantiating
+ * This test suite tests the bloom filter caching in the AbstractSchemaService class. Although we are instantiating
* ElasticSearchSchemaService object, the implemtationSpecificPut (which is part of ES Schema Service) has been
* mocked out. In essence, these tests only test the caching functionality.
*
@@ -31,47 +35,48 @@ public class AbstractSchemaServiceTest extends AbstractTest {
@Test
public void testPutEverythingCached() {
-
List metrics = createRandomMetrics("test-scope", "test-metric", 10);
ElasticSearchSchemaService service = new ElasticSearchSchemaService(system.getConfiguration(), system.getServiceFactory().getMonitorService());
- _enableCaching(service);
final AtomicInteger count = new AtomicInteger();
ElasticSearchSchemaService spyService = _initializeSpyService(service, count);
spyService.put(metrics);
+ // add to bloom filter cache
+ spyService._addToBloomFilter(spyService._fracture(metrics).get(0));
assertTrue(count.get() == metrics.size());
spyService.put(metrics);
+ // count should be same since we are re-reading cached value
assertTrue(count.get() == metrics.size());
}
@Test
public void testPutPartialCached() {
-
List metrics = createRandomMetrics("test-scope", "test-metric", 10);
List newMetrics = createRandomMetrics("test-scope", "test-metric1", 5);
Set total = new HashSet<>(metrics);
total.addAll(newMetrics);
ElasticSearchSchemaService service = new ElasticSearchSchemaService(system.getConfiguration(), system.getServiceFactory().getMonitorService());
- _enableCaching(service);
final AtomicInteger count = new AtomicInteger();
ElasticSearchSchemaService spyService = _initializeSpyService(service, count);
spyService.put(metrics);
+ // 1st metric cached
+ spyService._addToBloomFilter(spyService._fracture(metrics).get(0));
assertTrue(count.get() == metrics.size());
+ // 1st metric already in cache (partial case scenario), and now 2nd metric will also be added to cache.
+ // Total number of metrics in cache = metric1.size() and metric2.size()
spyService.put(new ArrayList<>(total));
+ spyService._addToBloomFilter(spyService._fracture(new ArrayList<>(total)).get(0));
assertTrue(count.get() == total.size());
-
}
@Test
public void testPutNothingCached() {
-
List metrics = createRandomMetrics("test-scope", "test-metric", 10);
List newMetrics = createRandomMetrics("test-scope", "test-metric1", 5);
ElasticSearchSchemaService service = new ElasticSearchSchemaService(system.getConfiguration(), system.getServiceFactory().getMonitorService());
- _enableCaching(service);
final AtomicInteger count = new AtomicInteger();
ElasticSearchSchemaService spyService = _initializeSpyService(service, count);
@@ -80,22 +85,7 @@ public void testPutNothingCached() {
spyService.put(newMetrics);
assertTrue(count.get() == metrics.size() + newMetrics.size());
}
-
- @Test
- public void testPutCachingDisabled() {
-
- List metrics = createRandomMetrics("test-scope", "test-metric", 10);
-
- ElasticSearchSchemaService service = new ElasticSearchSchemaService(system.getConfiguration(), system.getServiceFactory().getMonitorService());
- final AtomicInteger count = new AtomicInteger();
- ElasticSearchSchemaService spyService = _initializeSpyService(service, count);
-
- spyService.put(metrics);
- assertTrue(count.get() == metrics.size());
- spyService.put(metrics);
- assertTrue(count.get() == metrics.size() * 2);
- }
-
+
private ElasticSearchSchemaService _initializeSpyService(ElasticSearchSchemaService service, final AtomicInteger count) {
ElasticSearchSchemaService spyService = Mockito.spy(service);
@@ -111,17 +101,14 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
return spyService;
}
-
-
- private void _enableCaching(ElasticSearchSchemaService service) {
- try {
- Field field = service.getClass().getSuperclass().getDeclaredField("_cacheEnabled");
- field.setAccessible(true);
- field.set(service, true);
- } catch (NoSuchFieldException | SecurityException | IllegalArgumentException | IllegalAccessException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+ @Test
+ public void getNumHoursUntilNextFlushBloomFilter() {
+ ElasticSearchSchemaService service = new ElasticSearchSchemaService(system.getConfiguration(), system.getServiceFactory().getMonitorService());
+
+ Calendar calendar = Calendar.getInstance();
+
+ // Will wait 24 hours before next flush if at same hour boundary
+ int hour = calendar.get(Calendar.HOUR_OF_DAY);
+ assertTrue(service.getNumHoursUntilTargetHour(hour) == 24);
}
-
}