Skip to content

apply HIVE-1643: support range scans and non-key columns in HBase filter... #43

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: shark-0.11
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.Stack;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
Expand All @@ -41,15 +44,21 @@
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer;
import org.apache.hadoop.hive.ql.index.IndexSearchCondition;
import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler;
import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
Expand All @@ -64,6 +73,8 @@ public class HBaseStorageHandler extends DefaultStorageHandler

final static public String DEFAULT_PREFIX = "default.";

static final Log LOG = LogFactory.getLog(HiveHBaseTableInputFormat.class);

private Configuration hbaseConf;
private HBaseAdmin admin;

Expand Down Expand Up @@ -268,6 +279,7 @@ public void configureOutputJobProperties(
configureTableJobProperties(tableDesc, jobProperties);
}

@Override
public void configureTableJobProperties(
TableDesc tableDesc,
Map<String, String> jobProperties) {
Expand All @@ -292,6 +304,150 @@ public void configureTableJobProperties(
jobProperties.put(HBaseSerDe.HBASE_TABLE_NAME, tableName);
}

/**
*
* @param predicate - Predicate to be analyzed
* @param columns - List of columns in the table
* @param serde - HBaseSerde corresponding to the table
* @param colTypes - List of column types
* @return True if either all the predicates have either binary storage or contains an "=" as comparison operator
* has a string datatype , otherwise False. Since we can push "=" regardless of whether the column is binary
* or not and also binary-storage/string type columns with any comparison operator can be pushed. This method checks if
* predicate satisfies this condition.
*
*/
private boolean checkAllBinaryOrEquals(ExprNodeDesc predicate,List<String> columns,List<String> colTypes,HBaseSerDe serde){

String comparisonOp,columnName;
int colIndex;
Stack stack = new Stack<ExprNodeDesc>();
stack.push(predicate);

while(!stack.isEmpty()){
ExprNodeDesc top = (ExprNodeDesc)stack.pop();

if((top instanceof ExprNodeGenericFuncDesc) && (top.getChildren().size() == 2)){

ExprNodeDesc child1 = top.getChildren().get(0);
ExprNodeDesc child2 = top.getChildren().get(1);
if (((child1 instanceof ExprNodeColumnDesc)
&& (child2 instanceof ExprNodeConstantDesc)) || ((child2 instanceof ExprNodeColumnDesc)
&& (child1 instanceof ExprNodeConstantDesc)) ){

if (((ExprNodeGenericFuncDesc)top).getGenericUDF() instanceof GenericUDFBridge) {
GenericUDFBridge func = (GenericUDFBridge) ((ExprNodeGenericFuncDesc)top).getGenericUDF();
comparisonOp = func.getUdfName();
} else {
comparisonOp = ((ExprNodeGenericFuncDesc)top).getGenericUDF().getClass().getName();
}
columnName = (child1 instanceof ExprNodeColumnDesc)?((ExprNodeColumnDesc)child1).getColumn():((ExprNodeColumnDesc)child2).getColumn();
colIndex = columns.indexOf(columnName);

if(columnName.equals(columns.get(serde.getKeyColumnOffset())) || !(colTypes.get(colIndex).equalsIgnoreCase("string") || comparisonOp.equals("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual") || serde.getStorageFormatOfCol(colIndex).get(0))){
return false;
}
}
else{
stack.insertElementAt(child1,stack.size());
stack.insertElementAt(child2,stack.size());
}
}
}
return true;
}

/**
*
* @param predicate - predicate to be analyzed
* @param columns - List of columns as in serde
* @param serde - serde for the input table
* @return - List of size 2, with index 0 containing residual predicate remaining after processing
* this predicate and index 1 containing predicate that can be pushed. Returns null, if
* input predicate is null.
*/

List<ExprNodeDesc> checkNonKeyPredicates(ExprNodeDesc predicate,List<String> columns,List<String> colTypes,HBaseSerDe serde){

List retPred = new ArrayList<ExprNodeDesc>(2);
if(predicate == null) {
return null;
}
if(checkAllBinaryOrEquals(predicate, columns,colTypes,serde)){
retPred.add(null);
retPred.add(predicate);
return retPred;
}
else{
if(!FunctionRegistry.isOpAnd(predicate)){
ExprNodeDesc child1 = predicate.getChildren().get(0);
ExprNodeDesc child2 = predicate.getChildren().get(1);

if (((child1 instanceof ExprNodeColumnDesc)
&& !(((ExprNodeColumnDesc)child1).getColumn()).equals(columns.get(serde.getKeyColumnOffset())) &&(child2 instanceof ExprNodeConstantDesc)) || ((child2 instanceof ExprNodeColumnDesc)
&& !(((ExprNodeColumnDesc)child2).getColumn()).equals(columns.get(serde.getKeyColumnOffset())) &&(child1 instanceof ExprNodeConstantDesc))){
retPred.add(null);
retPred.add(predicate);
}
else{
retPred.add(predicate);
retPred.add(null);
}
return retPred;
}
else{
ExprNodeDesc child1 = predicate.getChildren().get(0);
ExprNodeDesc child2 = predicate.getChildren().get(1);

List<ExprNodeDesc> retChild1 = checkNonKeyPredicates(child1, columns,colTypes,serde);
List<ExprNodeDesc> retChild2 = checkNonKeyPredicates(child2, columns,colTypes,serde);

if((retChild1 == null) && (retChild2!=null)) {
return retChild2;
}
else if ((retChild2 == null) && (retChild1!= null)){
return retChild1;
}
else{
if(retChild1.get(0)==null && retChild2.get(0)!=null){
retPred.add(retChild2.get(0));
}
else if(retChild1.get(0)!=null && retChild2.get(0)==null){
retPred.add(retChild1.get(0));
}
else if(retChild1.get(0)==null && retChild2.get(0)==null){
retPred.add(null);
}
else{
List<ExprNodeDesc> temp = new ArrayList<ExprNodeDesc>();
temp.add(retChild1.get(0));
temp.add(retChild2.get(0));
retPred.add(new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo,FunctionRegistry.getGenericUDFForAnd(),temp));
}

if(retChild1.get(1)==null && retChild2.get(1)!=null){
retPred.add(retChild2.get(1));
}
else if(retChild1.get(1)!=null && retChild2.get(1)==null){
retPred.add(retChild1.get(1));
}
else if(retChild1.get(1)==null && retChild2.get(1)==null){
retPred.add(null);
}
else{
List<ExprNodeDesc> temp = new ArrayList<ExprNodeDesc>();
temp.add(retChild1.get(1));
temp.add(retChild2.get(1));
retPred.add(new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo,FunctionRegistry.getGenericUDFForAnd(),temp));
}
return retPred;
}

}

}
}


@Override
public DecomposedPredicate decomposePredicate(
JobConf jobConf,
Expand All @@ -302,18 +458,21 @@ public DecomposedPredicate decomposePredicate(
org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMNS);
List<String> columnNames =
Arrays.asList(columnNameProperty.split(","));
List<String> colTypes =
Arrays.asList(jobConf.get(org.apache.hadoop.hive.serde.Constants.LIST_COLUMN_TYPES).split(","));

HBaseSerDe hbaseSerde = (HBaseSerDe) deserializer;
int keyColPos = hbaseSerde.getKeyColumnOffset();
String keyColType = jobConf.get(org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMN_TYPES).
split(",")[keyColPos];
String keyColType = colTypes.get(keyColPos);
IndexPredicateAnalyzer analyzer =
HiveHBaseTableInputFormat.newIndexPredicateAnalyzer(columnNames.get(keyColPos), keyColType,
hbaseSerde.getStorageFormatOfCol(keyColPos).get(0));
List<IndexSearchCondition> searchConditions =
new ArrayList<IndexSearchCondition>();
ExprNodeDesc residualPredicate =
ExprNodeDesc residualKeyPredicate =
analyzer.analyzePredicate(predicate, searchConditions);
ExprNodeDesc pushedKeyPredicate;

int scSize = searchConditions.size();
if (scSize < 1 || 2 < scSize) {
// Either there was nothing which could be pushed down (size = 0),
Expand All @@ -322,21 +481,49 @@ public DecomposedPredicate decomposePredicate(
// 1. key < 20 (size = 1)
// 2. key = 20 (size = 1)
// 3. key < 20 and key > 10 (size = 2)
return null;
pushedKeyPredicate = null;
residualKeyPredicate = predicate;
}
if (scSize == 2 &&
else if (scSize == 2 &&
(searchConditions.get(0).getComparisonOp()
.equals("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual") ||
searchConditions.get(1).getComparisonOp()
.equals("org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual"))) {
// If one of the predicates is =, then any other predicate with it is illegal.
return null;
pushedKeyPredicate = null;
residualKeyPredicate = predicate;
}
else{
pushedKeyPredicate = analyzer.translateSearchConditions(searchConditions);
}

DecomposedPredicate decomposedPredicate = new DecomposedPredicate();
decomposedPredicate.pushedPredicate = analyzer.translateSearchConditions(
searchConditions);
decomposedPredicate.residualPredicate = residualPredicate;
decomposedPredicate.pushedPredicate = null;

// checking for non-key predicates that can be pushed in the residual
// predicate after processing for key column
List<ExprNodeDesc> analyzedPredicates = checkNonKeyPredicates(residualKeyPredicate, columnNames,colTypes,hbaseSerde);

if(analyzedPredicates != null){

if(pushedKeyPredicate == null && analyzedPredicates.get(1) != null){
decomposedPredicate.pushedPredicate = analyzedPredicates.get(1);
}
else if(pushedKeyPredicate != null && analyzedPredicates.get(1) == null){
decomposedPredicate.pushedPredicate = pushedKeyPredicate;
}
else if(pushedKeyPredicate != null && analyzedPredicates.get(1) != null){
List<ExprNodeDesc> temp = new ArrayList<ExprNodeDesc>();
temp.add(pushedKeyPredicate);
temp.add(analyzedPredicates.get(1));
decomposedPredicate.pushedPredicate = new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo,FunctionRegistry.getGenericUDFForAnd(),temp);
}
decomposedPredicate.residualPredicate = analyzedPredicates.get(0);
}
else{
decomposedPredicate.pushedPredicate = pushedKeyPredicate;
decomposedPredicate.residualPredicate = residualKeyPredicate;
}
return decomposedPredicate;
}
}
Loading