diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 26823c040f7..ba4ce4cab82 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -626,6 +626,12 @@
swagger-jaxrs2-servlet-initializer-v2
${swagger.version}
+
+
+ com.github.ben-manes.caffeine
+ caffeine
+ 2.9.3
+
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CustomCacheManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CustomCacheManager.java
new file mode 100644
index 00000000000..c82cba092ea
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CustomCacheManager.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler.CacheKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+
+public class CustomCacheManager {
+ private static final Logger logger = LoggerFactory.getLogger(CustomCacheManager.class);
+
+ private static Cache queryCache;
+ private static Cache transformCache;
+
+ private static int queryMaxEntries;
+ private static int queryTtlMinutes;
+ private static int transformMaxEntries;
+ private static int transformTtlMinutes;
+
+ static {
+ loadConfig();
+ }
+
+ private static void loadConfig() {
+ DrillConfig config = DrillConfig.create();
+
+ queryMaxEntries = getConfigInt(config, "planner.query.cache.max_entries_amount", 100);
+ queryTtlMinutes = getConfigInt(config, "planner.query.cache.plan_cache_ttl_minutes", 300);
+ transformMaxEntries = getConfigInt(config, "planner.transform.cache.max_entries_amount", 100);
+ transformTtlMinutes = getConfigInt(config, "planner.transform.cache.plan_cache_ttl_minutes", 300);
+
+ queryCache = Caffeine.newBuilder()
+ .maximumSize(queryMaxEntries)
+ .expireAfterWrite(queryTtlMinutes, TimeUnit.MINUTES)
+ .recordStats()
+ .build();
+
+ transformCache = Caffeine.newBuilder()
+ .maximumSize(transformMaxEntries)
+ .expireAfterWrite(transformTtlMinutes, TimeUnit.MINUTES)
+ .recordStats()
+ .build();
+ }
+
+ private static int getConfigInt(DrillConfig config, String path, int defaultValue) {
+ logger.info("Fetching: " + path);
+ Boolean pathFound = config.hasPath(path);
+ int value = pathFound ? config.getInt(path) : defaultValue;
+ if (!pathFound) {
+ logger.info("Using default value: " + defaultValue);
+ } else {
+ logger.info("Using found value: " + value);
+ }
+ return value;
+ }
+
+ public static PhysicalPlan getQueryPlan(String sql) {
+ return queryCache.getIfPresent(sql);
+ }
+
+ public static void putQueryPlan(String sql, PhysicalPlan plan) {
+ queryCache.put(sql, plan);
+ }
+
+ public static RelNode getTransformedPlan(CacheKey key) {
+ return transformCache.getIfPresent(key);
+ }
+
+ public static void putTransformedPlan(CacheKey key, RelNode plan) {
+ transformCache.put(key, plan);
+ }
+
+ public static void logCacheStats() {
+ logger.info("Query Cache Stats: " + queryCache.stats());
+ logger.info("Query Cache Size: " + queryCache.estimatedSize());
+
+ logger.info("Transform Cache Stats: " + transformCache.stats());
+ logger.info("Transform Cache Size: " + transformCache.estimatedSize());
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index 6fa145a1b01..e8ab405c557 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -132,6 +132,20 @@ public class PlannerSettings implements Context{
public static final String UNIONALL_DISTRIBUTE_KEY = "planner.enable_unionall_distribute";
public static final BooleanValidator UNIONALL_DISTRIBUTE = new BooleanValidator(UNIONALL_DISTRIBUTE_KEY, null);
+ public static final BooleanValidator PLAN_CACHE = new BooleanValidator("planner.cache.enable",
+ new OptionDescription("Enables caching of generated query plans in memory, so repeated queries can bypass the planning phase and execute faster.")
+ );
+
+ // Only settable in config, due to pub-sub requirements for recreating the cache on value change
+ // public static final RangeLongValidator PLAN_CACHE_TTL = new RangeLongValidator("planner.cache.ttl_minutes",
+ // 0, Long.MAX_VALUE,
+ // new OptionDescription("Time-to-live for cached query plans in minutes. Plans older than this are evicted. Default is 0 (disabled)")
+ // );
+ // public static final RangeLongValidator MAX_CACHE_ENTRIES = new RangeLongValidator("planner.cache.max_entries",
+ // 1, Long.MAX_VALUE,
+ // new OptionDescription("Maximum total number of entries for cached query plans. When exceeded, least recently used plans are evicted.")
+ // );
+
// ------------------------------------------- Index planning related options BEGIN --------------------------------------------------------------
public static final String USE_SIMPLE_OPTIMIZER_KEY = "planner.use_simple_optimizer";
public static final BooleanValidator USE_SIMPLE_OPTIMIZER = new BooleanValidator(USE_SIMPLE_OPTIMIZER_KEY,
@@ -416,6 +430,10 @@ public boolean isUnionAllDistributeEnabled() {
return options.getOption(UNIONALL_DISTRIBUTE);
}
+ public boolean isPlanCacheEnabled() {
+ return options.getOption(PLAN_CACHE);
+ }
+
public boolean isParquetRowGroupFilterPushdownPlanningEnabled() {
return options.getOption(PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
index c706f8f3733..30e157a1852 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
@@ -18,6 +18,9 @@
package org.apache.drill.exec.planner.sql;
import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.calcite.sql.SqlDescribeSchema;
import org.apache.calcite.sql.SqlKind;
@@ -29,12 +32,14 @@
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.tools.RelConversionException;
import org.apache.calcite.tools.ValidationException;
+import org.apache.calcite.util.Litmus;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.MetadataException;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.ops.QueryContext.SqlStatementType;
import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.planner.sql.conversion.SqlConverter;
import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
import org.apache.drill.exec.planner.sql.handlers.AnalyzeTableHandler;
import org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler;
@@ -52,7 +57,6 @@
import org.apache.drill.exec.planner.sql.parser.DrillSqlDescribeTable;
import org.apache.drill.exec.planner.sql.parser.DrillSqlResetOption;
import org.apache.drill.exec.planner.sql.parser.SqlSchema;
-import org.apache.drill.exec.planner.sql.conversion.SqlConverter;
import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
import org.apache.drill.exec.testing.ControlsInjector;
import org.apache.drill.exec.testing.ControlsInjectorFactory;
@@ -128,6 +132,8 @@ private static PhysicalPlan convertPlan(QueryContext context, String sql, Pointe
try {
return getPhysicalPlan(context, sql, textPlan, retryAttempts);
} catch (Exception e) {
+logger.info("DrillSqlWorker.convertPlan() retrying???: attempt # {}", retryAttempts);
+e.printStackTrace(System.out);
logger.trace("There was an error during conversion into physical plan.", e);
// It is prohibited to retry query planning for ANALYZE statement since it changes
@@ -176,9 +182,11 @@ private static PhysicalPlan convertPlan(QueryContext context, String sql, Pointe
private static PhysicalPlan getPhysicalPlan(QueryContext context, String sql, Pointer textPlan,
long retryAttempts) throws ForemanSetupException, RelConversionException, IOException, ValidationException {
try {
+ logger.info("DrillSqlWorker.getPhysicalPlan() is called {}", retryAttempts);
return getQueryPlan(context, sql, textPlan);
} catch (Exception e) {
Throwable rootCause = Throwables.getRootCause(e);
+ logger.info("DrillSqlWorker.getPhysicalPlan() is called {}", rootCause.getMessage());
// Calcite wraps exceptions thrown during planning, so checks whether original exception is OutdatedMetadataException
if (rootCause instanceof MetadataException) {
// resets SqlStatementType to avoid errors when it is set during further attempts
@@ -216,12 +224,21 @@ private static PhysicalPlan getPhysicalPlan(QueryContext context, String sql, Po
* @param textPlan text plan
* @return query physical plan
*/
+
+ private static ConcurrentMap getQueryPlanCache = new ConcurrentHashMap<>();
+
private static PhysicalPlan getQueryPlan(QueryContext context, String sql, Pointer textPlan)
throws ForemanSetupException, RelConversionException, IOException, ValidationException {
final SqlConverter parser = new SqlConverter(context);
injector.injectChecked(context.getExecutionControls(), "sql-parsing", ForemanSetupException.class);
final SqlNode sqlNode = checkAndApplyAutoLimit(parser, context, sql);
+ QueryPlanCacheKey queryPlanCacheKey = new QueryPlanCacheKey(sqlNode);
+
+ if(getQueryPlanCache.containsKey(queryPlanCacheKey)) {
+ logger.info("Using getQueryPlanCache");
+ return getQueryPlanCache.get(queryPlanCacheKey);
+ }
final AbstractSqlHandler handler;
final SqlHandlerConfig config = new SqlHandlerConfig(context, parser);
@@ -287,6 +304,8 @@ private static PhysicalPlan getQueryPlan(QueryContext context, String sql, Point
context.setSQLStatementType(SqlStatementType.OTHER);
}
+
+
// Determines whether result set should be returned for the query based on return result set option and sql node kind.
// Overrides the option on a query level if it differs from the current value.
boolean currentReturnResultValue = context.getOptions().getBoolean(ExecConstants.RETURN_RESULT_SET_FOR_DDL);
@@ -295,7 +314,34 @@ private static PhysicalPlan getQueryPlan(QueryContext context, String sql, Point
context.getOptions().setLocalOption(ExecConstants.RETURN_RESULT_SET_FOR_DDL, true);
}
- return handler.getPlan(sqlNode);
+ PhysicalPlan physicalPlan = handler.getPlan(sqlNode);
+ getQueryPlanCache.put(queryPlanCacheKey, physicalPlan);
+ return physicalPlan;
+ }
+
+ private static class QueryPlanCacheKey {
+ private final SqlNode sqlNode;
+
+ public QueryPlanCacheKey(SqlNode sqlNode) {
+ this.sqlNode = sqlNode;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ QueryPlanCacheKey cacheKey = (QueryPlanCacheKey) o;
+ return sqlNode.equalsDeep(cacheKey.sqlNode, Litmus.IGNORE);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(sqlNode);
+ }
}
private static boolean isAutoLimitShouldBeApplied(SqlNode sqlNode, int queryMaxRows) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index 6cc4d3bc4bc..428102b6a34 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -20,15 +20,10 @@
import java.io.IOException;
import java.util.Collection;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import com.fasterxml.jackson.databind.ser.PropertyFilter;
-import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter;
-import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
-import org.apache.drill.exec.util.Utilities;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Sets;
import org.apache.calcite.plan.RelOptCostImpl;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRule;
@@ -66,6 +61,7 @@
import org.apache.drill.common.logical.PlanProperties.PlanPropertiesBuilder;
import org.apache.drill.common.logical.PlanProperties.PlanType;
import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.cache.CustomCacheManager;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
@@ -104,13 +100,20 @@
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.StoragePlugin;
import org.apache.drill.exec.util.Pointer;
+import org.apache.drill.exec.util.Utilities;
import org.apache.drill.exec.work.foreman.ForemanSetupException;
import org.apache.drill.exec.work.foreman.SqlUnsupportedException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ser.PropertyFilter;
+import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter;
+import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
public class DefaultSqlHandler extends AbstractSqlHandler {
private static final Logger logger = LoggerFactory.getLogger(DefaultSqlHandler.class);
@@ -250,7 +253,6 @@ protected DrillRel convertToRawDrel(final RelNode relNode) throws SqlUnsupported
// hep is enabled and hep pruning is enabled.
intermediateNode2 = transform(PlannerType.HEP_BOTTOM_UP, PlannerPhase.PARTITION_PRUNING, transitiveClosureNode);
-
} else {
// Only hep is enabled
final RelNode intermediateNode =
@@ -361,16 +363,74 @@ protected RelNode transform(PlannerType plannerType, PlannerPhase phase, RelNode
* @param log Whether to log the planning phase.
* @return The transformed relnode.
*/
+
+
+ // A simple cache key class that uses the relevant parameters
+ public static class CacheKey {
+ private final PlannerPhase phase;
+ private PlannerType plannerType;
+ private RelNode input;
+ private RelTraitSet targetTraits;
+
+ public CacheKey(PlannerType plannerType, PlannerPhase phase, RelNode input, RelTraitSet targetTraits) {
+ this.plannerType = plannerType;
+ this.phase = phase;
+ this.input = input;
+ this.targetTraits = targetTraits;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CacheKey cacheKey = (CacheKey) o;
+ logger.info("Compare phase {} {}, {} ", phase.equals(cacheKey.phase), phase.name(), cacheKey.phase.name());
+ logger.info("Compare plannerType {} {} {}", plannerType.equals(cacheKey.plannerType), plannerType.name(), cacheKey.plannerType.name());
+ logger.info("Compare input {}", input.deepEquals(cacheKey.input));
+ return phase.name().equals(cacheKey.phase.name()) &&
+ plannerType.name().equals(cacheKey.plannerType.name()) &&
+ input.deepEquals(cacheKey.input) &&
+ targetTraits.equals(cacheKey.targetTraits);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(phase.name(), plannerType.name(), input.deepHashCode(), targetTraits);
+ }
+
+ }
+
+
protected RelNode transform(PlannerType plannerType, PlannerPhase phase, RelNode input, RelTraitSet targetTraits,
boolean log) {
final Stopwatch watch = Stopwatch.createStarted();
final RuleSet rules = config.getRules(phase, input);
final RelTraitSet toTraits = targetTraits.simplify();
+ final OptionManager options = context.getOptions();
+ final boolean planCacheEnabled = options.getOption(PlannerSettings.PLAN_CACHE);
+
+ CacheKey key = null;
+
+ if (planCacheEnabled) {
+ // Create a cache key based on the input parameters
+ key = new CacheKey(plannerType, phase, input, targetTraits);
+
+ RelNode cachedResult = CustomCacheManager.getTransformedPlan(key);
+ if (cachedResult != null) {
+ CustomCacheManager.logCacheStats();
+ return cachedResult;
+ }
+ }
final RelNode output;
switch (plannerType) {
case HEP_BOTTOM_UP:
case HEP: {
+ logger.info("DefaultSqlHandler.transform()");
final HepProgramBuilder hepPgmBldr = new HepProgramBuilder();
if (plannerType == PlannerType.HEP_BOTTOM_UP) {
hepPgmBldr.addMatchOrder(HepMatchOrder.BOTTOM_UP);
@@ -402,13 +462,24 @@ protected RelNode transform(PlannerType plannerType, PlannerPhase phase, RelNode
Preconditions.checkArgument(planner instanceof VolcanoPlanner,
"Cluster is expected to be constructed using VolcanoPlanner. Was actually of type %s.", planner.getClass()
.getName());
+ logger.info("DefaultSqlHandler.transform() program.run( before");
output = program.run(planner, input, toTraits,
ImmutableList.of(), ImmutableList.of());
+ logger.info("DefaultSqlHandler.transform() program.run( after");
break;
}
}
+ if (planCacheEnabled) {
+ logger.info("planCache enabled, storing transformedplan");
+ // Store the result in the cache before returning
+ CustomCacheManager.putTransformedPlan(key, output);
+ CustomCacheManager.logCacheStats();
+ } else {
+ logger.info("planCache disabled, not storing transformedplan");
+ }
+
if (log) {
log(plannerType, phase, output, logger, watch);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 3cee4096e09..11ffe93ecd3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -82,6 +82,10 @@ public static CaseInsensitiveMap createDefaultOptionDefinition
// here.
@SuppressWarnings("deprecation")
final OptionDefinition[] definitions = new OptionDefinition[]{
+ new OptionDefinition(PlannerSettings.PLAN_CACHE),
+ // new OptionDefinition(PlannerSettings.PLAN_CACHE_TTL),
+ // new OptionDefinition(PlannerSettings.MAX_CACHE_ENTRIES),
+
new OptionDefinition(PlannerSettings.CONSTANT_FOLDING),
new OptionDefinition(PlannerSettings.EXCHANGE),
new OptionDefinition(PlannerSettings.HASHAGG),
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index a099b96b123..0e5e0ba89b7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -17,19 +17,19 @@
*/
package org.apache.drill.exec.work.foreman;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.drill.common.util.JacksonUtils;
-import org.apache.drill.exec.work.filter.RuntimeFilterRouter;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.protobuf.InvalidProtocolBufferException;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
+import static org.apache.drill.exec.server.FailureUtils.EXIT_CODE_HEAP_OOM;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.List;
+
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.logical.LogicalPlan;
import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
+import org.apache.drill.common.util.JacksonUtils;
import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.cache.CustomCacheManager;
import org.apache.drill.exec.exception.OptimizerException;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.ops.QueryContext;
@@ -39,6 +39,7 @@
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.planner.fragment.Fragment;
import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.planner.sql.DirectPlan;
import org.apache.drill.exec.planner.sql.DrillSqlWorker;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
@@ -56,23 +57,27 @@
import org.apache.drill.exec.rpc.UserClientConnection;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.FailureUtils;
+import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.server.options.OptionSet;
import org.apache.drill.exec.testing.ControlsInjector;
import org.apache.drill.exec.testing.ControlsInjectorFactory;
import org.apache.drill.exec.util.Pointer;
import org.apache.drill.exec.work.QueryWorkUnit;
import org.apache.drill.exec.work.WorkManager.WorkerBee;
-import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException;
+import org.apache.drill.exec.work.filter.RuntimeFilterRouter;
import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException;
+import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException;
import org.apache.drill.exec.work.foreman.rm.QueryResourceManager;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.Date;
-import java.util.List;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.protobuf.InvalidProtocolBufferException;
-import static org.apache.drill.exec.server.FailureUtils.EXIT_CODE_HEAP_OOM;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
/**
* Foreman manages all the fragments (local and remote) for a single query where this
@@ -269,9 +274,11 @@ public void run() {
final String sql = queryRequest.getPlan();
// log query id, username and query text before starting any real work. Also, put
// them together such that it is easy to search based on query id
+ long start = new Date().getTime();
logger.info("Query text for query with id {} issued by {}: {}", queryIdString,
queryContext.getQueryUserName(), sql);
runSQL(sql);
+ logger.info("RunSQL is executed within {}", new Date().getTime() - start);
break;
case EXECUTION:
runFragment(queryRequest.getFragmentsList());
@@ -481,6 +488,7 @@ private void runFragment(List fragmentsList) throws ExecutionSetup
* Moves query to RUNNING state.
*/
private void startQueryProcessing() {
+ logger.info("Starting query processing");
enqueue();
runFragments();
queryStateProcessor.moveToState(QueryState.RUNNING, null);
@@ -589,9 +597,50 @@ private void logWorkUnit(QueryWorkUnit queryWorkUnit) {
queryId, queryWorkUnit.stringifyFragments()));
}
+ private PhysicalPlan getOrBuildPlan(String sql, Pointer textPlan, boolean planCacheEnabled)
+ throws ExecutionSetupException {
+
+ PhysicalPlan plan = null;
+
+ if (planCacheEnabled) {
+ logger.info("Cache enabled, checking entries");
+ plan = CustomCacheManager.getQueryPlan(sql);
+
+ if (plan == null) {
+ logger.info("Cache miss, generating new plan");
+ plan = DrillSqlWorker.getPlan(queryContext, sql, textPlan);
+
+ if (isCacheableQuery(sql)) {
+ CustomCacheManager.putQueryPlan(sql, plan);
+ CustomCacheManager.logCacheStats();
+ }
+ } else {
+ logger.info("Using cached plan");
+ }
+
+ } else {
+ plan = DrillSqlWorker.getPlan(queryContext, sql, textPlan);
+ }
+
+ return plan;
+ }
+
+ private boolean isCacheableQuery(String sql) {
+ return sql.trim().toUpperCase().startsWith("SELECT");
+ }
+
private void runSQL(final String sql) throws ExecutionSetupException {
final Pointer textPlan = new Pointer<>();
- final PhysicalPlan plan = DrillSqlWorker.getPlan(queryContext, sql, textPlan);
+ final OptionManager options = queryContext.getOptions();
+ final boolean planCacheEnabled = options.getOption(PlannerSettings.PLAN_CACHE);
+ if (planCacheEnabled) {
+ logger.info("PlanCache is enabled");
+ } else {
+ logger.info("PlanCache is disabled");
+ }
+
+ PhysicalPlan plan = getOrBuildPlan(sql, textPlan, planCacheEnabled);
+
runPhysicalPlan(plan, textPlan);
}
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 7541a99e2dd..836c09d525e 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -697,6 +697,12 @@ drill.exec.options: {
planner.width.max_per_node: 0,
planner.width.max_per_query: 1000,
+ planner.cache.enable: false,
+ planner.query.cache.ttl_minutes: 0,
+ planner.transform.cache.ttl_minutes: 0,
+ planner.query.cache.max_entries_amount: 100,
+ planner.transform.cache.max_entries_amount: 100,
+
prepare.statement.create_timeout_ms: 30000,
security.admin.user_groups: "%drill_process_user_groups%",
security.admin.users: "%drill_process_user%",
diff --git a/exec/jdbc-all/pom.xml b/exec/jdbc-all/pom.xml
index acb01a7b046..6b1c2202c37 100644
--- a/exec/jdbc-all/pom.xml
+++ b/exec/jdbc-all/pom.xml
@@ -428,6 +428,7 @@
antlr:*
com.beust:*
com.dropbox.*
+ com.github.ben-manes.caffeine:caffeine
com.github.stefanbirkner
com.google.code.findbugs:jsr305:*
com.googlecode.json-simple:*