diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java index cc1dbb5bdeaa..319588310e0e 100644 --- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java @@ -26,7 +26,10 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.Stack; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -41,15 +44,21 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer; import org.apache.hadoop.hive.ql.index.IndexSearchCondition; import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler; import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; @@ -64,6 +73,8 @@ public class HBaseStorageHandler extends DefaultStorageHandler final static public String DEFAULT_PREFIX = "default."; + static final Log LOG = LogFactory.getLog(HiveHBaseTableInputFormat.class); + private Configuration hbaseConf; private HBaseAdmin admin; @@ -268,6 +279,7 @@ public void configureOutputJobProperties( configureTableJobProperties(tableDesc, jobProperties); } + @Override public void configureTableJobProperties( TableDesc tableDesc, Map jobProperties) { @@ -292,6 +304,150 @@ public void configureTableJobProperties( jobProperties.put(HBaseSerDe.HBASE_TABLE_NAME, tableName); } + /** + * + * @param predicate - Predicate to be analyzed + * @param columns - List of columns in the table + * @param serde - HBaseSerde corresponding to the table + * @param colTypes - List of column types + * @return True if either all the predicates have either binary storage or contains an "=" as comparison operator + * has a string datatype , otherwise False. Since we can push "=" regardless of whether the column is binary + * or not and also binary-storage/string type columns with any comparison operator can be pushed. This method checks if + * predicate satisfies this condition. + * + */ + private boolean checkAllBinaryOrEquals(ExprNodeDesc predicate,List columns,List colTypes,HBaseSerDe serde){ + + String comparisonOp,columnName; + int colIndex; + Stack stack = new Stack(); + stack.push(predicate); + + while(!stack.isEmpty()){ + ExprNodeDesc top = (ExprNodeDesc)stack.pop(); + + if((top instanceof ExprNodeGenericFuncDesc) && (top.getChildren().size() == 2)){ + + ExprNodeDesc child1 = top.getChildren().get(0); + ExprNodeDesc child2 = top.getChildren().get(1); + if (((child1 instanceof ExprNodeColumnDesc) + && (child2 instanceof ExprNodeConstantDesc)) || ((child2 instanceof ExprNodeColumnDesc) + && (child1 instanceof ExprNodeConstantDesc)) ){ + + if (((ExprNodeGenericFuncDesc)top).getGenericUDF() instanceof GenericUDFBridge) { + GenericUDFBridge func = (GenericUDFBridge) ((ExprNodeGenericFuncDesc)top).getGenericUDF(); + comparisonOp = func.getUdfName(); + } else { + comparisonOp = ((ExprNodeGenericFuncDesc)top).getGenericUDF().getClass().getName(); + } + columnName = (child1 instanceof ExprNodeColumnDesc)?((ExprNodeColumnDesc)child1).getColumn():((ExprNodeColumnDesc)child2).getColumn(); + colIndex = columns.indexOf(columnName); + + if(columnName.equals(columns.get(serde.getKeyColumnOffset())) || !(colTypes.get(colIndex).equalsIgnoreCase("string") || comparisonOp.equals("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual") || serde.getStorageFormatOfCol(colIndex).get(0))){ + return false; + } + } + else{ + stack.insertElementAt(child1,stack.size()); + stack.insertElementAt(child2,stack.size()); + } + } + } + return true; + } + + /** + * + * @param predicate - predicate to be analyzed + * @param columns - List of columns as in serde + * @param serde - serde for the input table + * @return - List of size 2, with index 0 containing residual predicate remaining after processing + * this predicate and index 1 containing predicate that can be pushed. Returns null, if + * input predicate is null. + */ + + List checkNonKeyPredicates(ExprNodeDesc predicate,List columns,List colTypes,HBaseSerDe serde){ + + List retPred = new ArrayList(2); + if(predicate == null) { + return null; + } + if(checkAllBinaryOrEquals(predicate, columns,colTypes,serde)){ + retPred.add(null); + retPred.add(predicate); + return retPred; + } + else{ + if(!FunctionRegistry.isOpAnd(predicate)){ + ExprNodeDesc child1 = predicate.getChildren().get(0); + ExprNodeDesc child2 = predicate.getChildren().get(1); + + if (((child1 instanceof ExprNodeColumnDesc) + && !(((ExprNodeColumnDesc)child1).getColumn()).equals(columns.get(serde.getKeyColumnOffset())) &&(child2 instanceof ExprNodeConstantDesc)) || ((child2 instanceof ExprNodeColumnDesc) + && !(((ExprNodeColumnDesc)child2).getColumn()).equals(columns.get(serde.getKeyColumnOffset())) &&(child1 instanceof ExprNodeConstantDesc))){ + retPred.add(null); + retPred.add(predicate); + } + else{ + retPred.add(predicate); + retPred.add(null); + } + return retPred; + } + else{ + ExprNodeDesc child1 = predicate.getChildren().get(0); + ExprNodeDesc child2 = predicate.getChildren().get(1); + + List retChild1 = checkNonKeyPredicates(child1, columns,colTypes,serde); + List retChild2 = checkNonKeyPredicates(child2, columns,colTypes,serde); + + if((retChild1 == null) && (retChild2!=null)) { + return retChild2; + } + else if ((retChild2 == null) && (retChild1!= null)){ + return retChild1; + } + else{ + if(retChild1.get(0)==null && retChild2.get(0)!=null){ + retPred.add(retChild2.get(0)); + } + else if(retChild1.get(0)!=null && retChild2.get(0)==null){ + retPred.add(retChild1.get(0)); + } + else if(retChild1.get(0)==null && retChild2.get(0)==null){ + retPred.add(null); + } + else{ + List temp = new ArrayList(); + temp.add(retChild1.get(0)); + temp.add(retChild2.get(0)); + retPred.add(new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo,FunctionRegistry.getGenericUDFForAnd(),temp)); + } + + if(retChild1.get(1)==null && retChild2.get(1)!=null){ + retPred.add(retChild2.get(1)); + } + else if(retChild1.get(1)!=null && retChild2.get(1)==null){ + retPred.add(retChild1.get(1)); + } + else if(retChild1.get(1)==null && retChild2.get(1)==null){ + retPred.add(null); + } + else{ + List temp = new ArrayList(); + temp.add(retChild1.get(1)); + temp.add(retChild2.get(1)); + retPred.add(new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo,FunctionRegistry.getGenericUDFForAnd(),temp)); + } + return retPred; + } + + } + + } + } + + @Override public DecomposedPredicate decomposePredicate( JobConf jobConf, @@ -302,18 +458,21 @@ public DecomposedPredicate decomposePredicate( org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS); List columnNames = Arrays.asList(columnNameProperty.split(",")); + List colTypes = + Arrays.asList(jobConf.get(org.apache.hadoop.hive.serde.Constants.LIST_COLUMN_TYPES).split(",")); HBaseSerDe hbaseSerde = (HBaseSerDe) deserializer; int keyColPos = hbaseSerde.getKeyColumnOffset(); - String keyColType = jobConf.get(org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMN_TYPES). - split(",")[keyColPos]; + String keyColType = colTypes.get(keyColPos); IndexPredicateAnalyzer analyzer = HiveHBaseTableInputFormat.newIndexPredicateAnalyzer(columnNames.get(keyColPos), keyColType, hbaseSerde.getStorageFormatOfCol(keyColPos).get(0)); List searchConditions = new ArrayList(); - ExprNodeDesc residualPredicate = + ExprNodeDesc residualKeyPredicate = analyzer.analyzePredicate(predicate, searchConditions); + ExprNodeDesc pushedKeyPredicate; + int scSize = searchConditions.size(); if (scSize < 1 || 2 < scSize) { // Either there was nothing which could be pushed down (size = 0), @@ -322,21 +481,49 @@ public DecomposedPredicate decomposePredicate( // 1. key < 20 (size = 1) // 2. key = 20 (size = 1) // 3. key < 20 and key > 10 (size = 2) - return null; + pushedKeyPredicate = null; + residualKeyPredicate = predicate; } - if (scSize == 2 && + else if (scSize == 2 && (searchConditions.get(0).getComparisonOp() .equals("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual") || searchConditions.get(1).getComparisonOp() .equals("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual"))) { // If one of the predicates is =, then any other predicate with it is illegal. - return null; + pushedKeyPredicate = null; + residualKeyPredicate = predicate; + } + else{ + pushedKeyPredicate = analyzer.translateSearchConditions(searchConditions); } DecomposedPredicate decomposedPredicate = new DecomposedPredicate(); - decomposedPredicate.pushedPredicate = analyzer.translateSearchConditions( - searchConditions); - decomposedPredicate.residualPredicate = residualPredicate; + decomposedPredicate.pushedPredicate = null; + + // checking for non-key predicates that can be pushed in the residual + // predicate after processing for key column + List analyzedPredicates = checkNonKeyPredicates(residualKeyPredicate, columnNames,colTypes,hbaseSerde); + + if(analyzedPredicates != null){ + + if(pushedKeyPredicate == null && analyzedPredicates.get(1) != null){ + decomposedPredicate.pushedPredicate = analyzedPredicates.get(1); + } + else if(pushedKeyPredicate != null && analyzedPredicates.get(1) == null){ + decomposedPredicate.pushedPredicate = pushedKeyPredicate; + } + else if(pushedKeyPredicate != null && analyzedPredicates.get(1) != null){ + List temp = new ArrayList(); + temp.add(pushedKeyPredicate); + temp.add(analyzedPredicates.get(1)); + decomposedPredicate.pushedPredicate = new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo,FunctionRegistry.getGenericUDFForAnd(),temp); + } + decomposedPredicate.residualPredicate = analyzedPredicates.get(0); + } + else{ + decomposedPredicate.pushedPredicate = pushedKeyPredicate; + decomposedPredicate.residualPredicate = residualKeyPredicate; + } return decomposedPredicate; } } diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java index da855014925b..5c5da0eb6828 100644 --- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java +++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.commons.logging.Log; @@ -30,6 +31,9 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapred.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableInputFormatBase; @@ -38,12 +42,17 @@ import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hive.hbase.HBaseSerDe.ColumnMapping; import org.apache.hadoop.hive.ql.exec.ExprNodeConstantEvaluator; +import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer; import org.apache.hadoop.hive.ql.index.IndexSearchCondition; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.ByteStream; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; @@ -256,6 +265,7 @@ private TableSplit convertFilter( } ExprNodeDesc filterExpr = Utilities.deserializeExpression(filterExprSerialized, jobConf); + ExprNodeDesc nonKeyFilterExpr = null; String colName = jobConf.get(serdeConstants.LIST_COLUMNS).split(",")[iKey]; String colType = jobConf.get(serdeConstants.LIST_COLUMN_TYPES).split(",")[iKey]; @@ -266,18 +276,11 @@ private TableSplit convertFilter( ExprNodeDesc residualPredicate = analyzer.analyzePredicate(filterExpr, searchConditions); - // There should be no residual since we already negotiated - // that earlier in HBaseStorageHandler.decomposePredicate. + // If there is a residual predicate after processing key column, + // it belongs to non-key columns, since we bridged them with AND + // in StorageHandler. if (residualPredicate != null) { - throw new RuntimeException( - "Unexpected residual predicate " + residualPredicate.getExprString()); - } - - // There should be exactly one predicate since we already - // negotiated that also. - if (searchConditions.size() < 1 || searchConditions.size() > 2) { - throw new RuntimeException( - "Either one or two search conditions expected in push down"); + nonKeyFilterExpr = residualPredicate; } // Convert the search condition into a restriction on the HBase scan @@ -328,9 +331,148 @@ private TableSplit convertFilter( } scan.setStartRow(startRow); scan.setStopRow(stopRow); + + if(nonKeyFilterExpr != null){ + String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING); + List columnsMapping = null; + if (hbaseColumnsMapping == null) { + throw new IOException("hbase.columns.mapping required for HBase Table."); + } + try { + columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping); + } catch (SerDeException e) { + throw new IOException(e); + } + FilterList nonKeyFilters = buildNonKeyFilters(nonKeyFilterExpr,jobConf); + if(nonKeyFilters != null){ + scan.setFilter(nonKeyFilters); + } + } return tableSplit; } + public FilterList buildNonKeyFilters(ExprNodeDesc pred, JobConf jobConf) throws IOException{ + + if(pred == null) { + return null; + } + + if(pred instanceof ExprNodeGenericFuncDesc){ + // AND or OR types + FilterList filters=null, childFilters=null; + + assert(pred.getChildren().size()==2); + ExprNodeDesc child1 = pred.getChildren().get(0); + ExprNodeDesc child2 = pred.getChildren().get(1); + + if(FunctionRegistry.isOpOr(pred) || FunctionRegistry.isOpAnd(pred)) { + if(FunctionRegistry.isOpOr(pred)){ + filters = new FilterList(FilterList.Operator.MUST_PASS_ONE); + } + else if (FunctionRegistry.isOpAnd(pred)){ + filters = new FilterList(FilterList.Operator.MUST_PASS_ALL); + } + + if(child1 instanceof ExprNodeGenericFuncDesc) { + filters.addFilter(buildNonKeyFilters(child1,jobConf)); + } + if(child2 instanceof ExprNodeGenericFuncDesc) { + filters.addFilter(buildNonKeyFilters(child2,jobConf)); + } + return filters; + } + + else{ + + int colPos; + byte[] colFamily,colQualifier,constantVal; + ExprNodeConstantDesc constDesc = null; + ExprNodeColumnDesc colDesc = null; + ExprNodeConstantEvaluator eval; + Object writable; + IndexPredicateAnalyzer ip = new IndexPredicateAnalyzer(); + ExprNodeGenericFuncDesc funcDesc = (ExprNodeGenericFuncDesc) pred; + String comparisonOp; + CompareFilter.CompareOp operator = null; + + List columns = Arrays.asList(jobConf.get(org.apache.hadoop.hive.serde.Constants.LIST_COLUMNS).split(",")); + String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING); + List columnsMapping = null; + if (hbaseColumnsMapping == null) { + throw new IOException("hbase.columns.mapping required for HBase Table."); + } + try { + columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping); + } catch (SerDeException e) { + throw new IOException(e); + } + + + if (funcDesc.getGenericUDF() instanceof GenericUDFBridge) { + GenericUDFBridge func = (GenericUDFBridge) funcDesc.getGenericUDF(); + comparisonOp = func.getUdfName(); + } else { + comparisonOp = funcDesc.getGenericUDF().getClass().getName(); + } + + if ((child1 instanceof ExprNodeColumnDesc) + && (child2 instanceof ExprNodeConstantDesc)){ + colDesc = (ExprNodeColumnDesc)child1; + constDesc = (ExprNodeConstantDesc)child2; + } + else if ((child2 instanceof ExprNodeColumnDesc) + && (child1 instanceof ExprNodeConstantDesc)){ + colDesc = (ExprNodeColumnDesc)child2; + constDesc = (ExprNodeConstantDesc)child1; + } + colPos = columns.indexOf(colDesc.getColumn()); + colFamily = columnsMapping.get(colPos).familyNameBytes; + colQualifier = columnsMapping.get(colPos).qualifierNameBytes; + eval= new ExprNodeConstantEvaluator(constDesc); + PrimitiveObjectInspector objInspector; + try{ + objInspector = (PrimitiveObjectInspector)eval.initialize(null); + writable = eval.evaluate(null); + } catch (ClassCastException cce) { + throw new IOException("Currently only primitve types are supported. Found: " + + constDesc.getTypeString()); + } catch (HiveException e) { + throw new IOException(e); + } + + + + constantVal = getConstantVal(writable, objInspector, getStorageFormatOfKey(columnsMapping.get(colPos).mappingSpec,jobConf.get(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, "string"))); + + if("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual".equals(comparisonOp)){ + operator = CompareFilter.CompareOp.EQUAL; + + } else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan".equals(comparisonOp)){ + operator = CompareFilter.CompareOp.LESS; + + } else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan" + .equals(comparisonOp)) { + operator = CompareFilter.CompareOp.GREATER_OR_EQUAL; + + } else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan" + .equals(comparisonOp)){ + operator = CompareFilter.CompareOp.GREATER; + + } else if ("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan" + .equals(comparisonOp)){ + operator = CompareFilter.CompareOp.LESS_OR_EQUAL; + } else { + throw new IOException(comparisonOp + " is not a supported comparison operator"); + } + LOG.debug("Adding filter:"+Bytes.toString(colFamily)+":"+Bytes.toString(colQualifier)+":"+operator+":"+Bytes.toString(constantVal)); + return new FilterList(new SingleColumnValueFilter(colFamily,colQualifier,operator,constantVal)); + + } + } + assert(false); + return null; + } + private byte[] getConstantVal(Object writable, PrimitiveObjectInspector poi, boolean isKeyBinary) throws IOException{ diff --git a/hbase-handler/src/test/queries/positive/hbase_pushdown.q b/hbase-handler/src/test/queries/positive/hbase_pushdown.q index 69a4536c0463..cb0baa3ecbb3 100644 --- a/hbase-handler/src/test/queries/positive/hbase_pushdown.q +++ b/hbase-handler/src/test/queries/positive/hbase_pushdown.q @@ -51,3 +51,40 @@ set hive.optimize.ppd.storage=false; -- with pushdown disabled explain select * from hbase_pushdown where key=90; + + +-- Nonkey predicate push down tests + +CREATE TABLE hbase_pushdown2(key string, v1 string, v2 string, v3 int, v4 string, v5 int) +STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' +WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf1:string,cf2:string,cf1:int,cf3:string2,cf3:int"); + +-- Only key predicate without any non-key predicates + +explain extended select * from hbase_pushdown2 where key < "ab" and key >"cd" +explain extended select * from hbase_pushdown2 where key < "ab" + +-- no pushdown +explain extended select * from hbase_pushdown2 where key < "ab" and key >"cd" and key="efg" + +-- Only non key predicates without any key predicates + +-- should push the left predicate and ignore the right one +explain extended select * from hbase_pushdown2 where (v1 > "ab" and v2 < "cd") and (v3 <5 or v4 <"ff") + +explain extended select * from hbase_pushdown2 where (v1 > "ab" and v2 < "cd") and (v3 >5 and (v4<"abc" and v5=6)) + +-- Nested ANDs and ORs + +-- pushes everything since everything is either =/string/binary +explain extended select * from hbase_pushdown2 where (v1 > "ab" and v2 < "cd") and (v3=5 and (v4<"abc" or v5=6)) + +-- doesnt push the OR predicate +explain extended select * from hbase_pushdown2 where (v1 > "ab" and v2 < "cd") and (v3=5 and (v4<"abc" or v5>6)) + +-- Both Key and Non-Key Predicates - Keys are used for ranges and Non-Keys for filters + +explain extended select * from hbase_pushdown2 where (((key < "a5") and v1 > "ab") and v2 < "cd") and (v3 >5 and (v4<"abc" and v5=6)) + +-- No key based ranges are possible , only non-key predicates are pushed down +explain extended select * from hbase_pushdown2 where (((key < "a5") and v1 > "ab") and v2 < "cd") and ((key="c3" and v3 >5) and (v4<"abc" and v5=6))