Skip to content

Commit 6186251

Browse files
committed
Add the column family and namespace options
1 parent 8a6e8d8 commit 6186251

File tree

4 files changed

+37
-16
lines changed

4 files changed

+37
-16
lines changed

contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/DrillHBaseConstants.java

+4
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,12 @@ public interface DrillHBaseConstants {
3636

3737
MajorType COLUMN_TYPE = Types.optional(MinorType.VARBINARY);
3838

39+
String SYS_STORE_PROVIDER_HBASE_NAMESPACE = "drill.exec.sys.store.provider.hbase.namespace";
40+
3941
String SYS_STORE_PROVIDER_HBASE_TABLE = "drill.exec.sys.store.provider.hbase.table";
4042

43+
String SYS_STORE_PROVIDER_HBASE_FAMILY = "drill.exec.sys.store.provider.hbase.family";
44+
4145
String SYS_STORE_PROVIDER_HBASE_CONFIG = "drill.exec.sys.store.provider.hbase.config";
4246

4347
String SYS_STORE_PROVIDER_HBASE_TABLE_CONFIG = "drill.exec.sys.store.provider.hbase.table_config";

contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java

+9-8
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
*/
1818
package org.apache.drill.exec.store.hbase.config;
1919

20-
import static org.apache.drill.exec.store.hbase.config.HBasePersistentStoreProvider.FAMILY_NAME;
2120
import static org.apache.drill.exec.store.hbase.config.HBasePersistentStoreProvider.QUALIFIER_NAME;
2221

2322
import java.io.IOException;
@@ -48,17 +47,19 @@ public class HBasePersistentStore<V> extends BasePersistentStore<V> {
4847
private final String hbaseTableName;
4948

5049
private final String tableName;
50+
private final byte[] familyName;
5151
private final byte[] tableNameStartKey;
5252
private final byte[] tableNameStopKey;
5353

54-
public HBasePersistentStore(PersistentStoreConfig<V> config, Table table) {
54+
public HBasePersistentStore(PersistentStoreConfig<V> config, Table table, byte[] family) {
5555
this.tableName = config.getName() + '\0';
5656
this.tableNameStartKey = Bytes.toBytes(tableName); // "tableName\x00"
5757
this.tableNameStopKey = this.tableNameStartKey.clone();
5858
this.tableNameStopKey[tableNameStartKey.length-1] = 1;
5959
this.config = config;
6060
this.hbaseTable = table;
6161
this.hbaseTableName = table.getName().getNameAsString();
62+
this.familyName = family;
6263
}
6364

6465
@Override
@@ -70,7 +71,7 @@ public PersistentStoreMode getMode() {
7071
public boolean contains(String key) {
7172
try {
7273
Get get = new Get(row(key));
73-
get.addColumn(FAMILY_NAME, QUALIFIER_NAME);
74+
get.addColumn(familyName, QUALIFIER_NAME);
7475
return hbaseTable.exists(get);
7576
} catch (IOException e) {
7677
throw UserException
@@ -82,7 +83,7 @@ public boolean contains(String key) {
8283

8384
@Override
8485
public V get(String key) {
85-
return get(key, FAMILY_NAME);
86+
return get(key, familyName);
8687
}
8788

8889
protected synchronized V get(String key, byte[] family) {
@@ -103,7 +104,7 @@ protected synchronized V get(String key, byte[] family) {
103104

104105
@Override
105106
public void put(String key, V value) {
106-
put(key, FAMILY_NAME, value);
107+
put(key, familyName, value);
107108
}
108109

109110
protected synchronized void put(String key, byte[] family, V value) {
@@ -122,8 +123,8 @@ protected synchronized void put(String key, byte[] family, V value) {
122123
public synchronized boolean putIfAbsent(String key, V value) {
123124
try {
124125
Put put = new Put(row(key));
125-
put.addColumn(FAMILY_NAME, QUALIFIER_NAME, bytes(value));
126-
return hbaseTable.checkAndPut(put.getRow(), FAMILY_NAME, QUALIFIER_NAME, null /*absent*/, put);
126+
put.addColumn(familyName, QUALIFIER_NAME, bytes(value));
127+
return hbaseTable.checkAndPut(put.getRow(), familyName, QUALIFIER_NAME, null /*absent*/, put);
127128
} catch (IOException e) {
128129
throw UserException.dataReadError(e)
129130
.message("Caught error while putting row '%s' into table '%s'", key, hbaseTableName)
@@ -183,7 +184,7 @@ private class Iter implements Iterator<Entry<String, V>> {
183184
Iter(int take) {
184185
try {
185186
Scan scan = new Scan(tableNameStartKey, tableNameStopKey);
186-
scan.addColumn(FAMILY_NAME, QUALIFIER_NAME);
187+
scan.addColumn(familyName, QUALIFIER_NAME);
187188
scan.setCaching(Math.min(take, 100));
188189
scan.setBatch(take); // set batch size
189190
scanner = hbaseTable.getScanner(scan);

contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStoreProvider.java

+22-6
Original file line numberDiff line numberDiff line change
@@ -49,14 +49,16 @@
4949
public class HBasePersistentStoreProvider extends BasePersistentStoreProvider {
5050
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBasePersistentStoreProvider.class);
5151

52-
public static final byte[] FAMILY_NAME = Bytes.toBytes("s");
52+
public static final byte[] DEFAULT_FAMILY_NAME = Bytes.toBytes("s");
5353

5454
public static final byte[] QUALIFIER_NAME = Bytes.toBytes("d");
5555

5656
private static final String HBASE_CLIENT_ID = "drill-hbase-persistent-store-client";
5757

5858
private final TableName hbaseTableName;
5959

60+
private final byte[] family;
61+
6062
private Table hbaseTable;
6163

6264
private Configuration hbaseConf;
@@ -94,7 +96,19 @@ public HBasePersistentStoreProvider(PersistentStoreRegistry registry) {
9496
if (!columnConfig.isEmpty()) {
9597
logger.info("Received the column config is {}", columnConfig);
9698
}
97-
hbaseTableName = TableName.valueOf(registry.getConfig().getString(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_TABLE));
99+
String tableName = registry.getConfig().getString(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_TABLE);
100+
if (registry.getConfig().hasPath(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_NAMESPACE)) {
101+
String namespaceStr = registry.getConfig().getString(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_NAMESPACE);
102+
hbaseTableName = TableName.valueOf(namespaceStr.concat(":").concat(tableName));
103+
} else {
104+
hbaseTableName = TableName.valueOf(tableName);
105+
}
106+
if (registry.getConfig().hasPath(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_FAMILY)) {
107+
String familyStr = registry.getConfig().getString(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_FAMILY);
108+
family = Bytes.toBytes(familyStr);
109+
} else { // The default name
110+
family = DEFAULT_FAMILY_NAME;
111+
}
98112
}
99113

100114
@VisibleForTesting
@@ -103,6 +117,7 @@ public HBasePersistentStoreProvider(Configuration conf, String storeTableName) {
103117
this.columnConfig = Maps.newHashMap();
104118
this.hbaseConf = conf;
105119
this.hbaseTableName = TableName.valueOf(storeTableName);
120+
this.family = DEFAULT_FAMILY_NAME;
106121
}
107122

108123
@VisibleForTesting
@@ -111,14 +126,15 @@ public HBasePersistentStoreProvider(Map<String, Object> tableConfig, Map<String,
111126
this.columnConfig = columnConfig;
112127
this.hbaseConf = conf;
113128
this.hbaseTableName = TableName.valueOf(storeTableName);
129+
this.family = DEFAULT_FAMILY_NAME;
114130
}
115131

116132
@Override
117133
public <V> PersistentStore<V> getOrCreateStore(PersistentStoreConfig<V> config) throws StoreException {
118134
switch (config.getMode()) {
119135
case BLOB_PERSISTENT:
120136
case PERSISTENT:
121-
return new HBasePersistentStore<>(config, hbaseTable);
137+
return new HBasePersistentStore<>(config, hbaseTable, family);
122138
default:
123139
throw new IllegalStateException("Unknown persistent mode");
124140
}
@@ -128,7 +144,7 @@ public <V> PersistentStore<V> getOrCreateStore(PersistentStoreConfig<V> config)
128144
public void start() throws IOException {
129145
// Create the column family builder
130146
ColumnFamilyDescriptorBuilder columnFamilyBuilder = ColumnFamilyDescriptorBuilder
131-
.newBuilder(FAMILY_NAME)
147+
.newBuilder(family)
132148
.setMaxVersions(1);
133149
// Append the config to column family
134150
verifyAndSetColumnConfig(columnConfig, columnFamilyBuilder);
@@ -149,10 +165,10 @@ public void start() throws IOException {
149165
if (!admin.isTableEnabled(hbaseTableName)) {
150166
admin.enableTable(hbaseTableName); // In case the table is disabled
151167
}
152-
if (!table.hasColumnFamily(FAMILY_NAME)) {
168+
if (!table.hasColumnFamily(family)) {
153169
throw new DrillRuntimeException("The HBase table " + hbaseTableName
154170
+ " specified as persistent store exists but does not contain column family: "
155-
+ (Bytes.toString(FAMILY_NAME)));
171+
+ (Bytes.toString(family)));
156172
}
157173
logger.info("The HBase table of persistent store is loaded : {}", hbaseTableName);
158174
}

contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,8 @@ public void testStoreTableAttributes() throws Exception {
104104
assertTrue("The max size of hfile must be " + MAX_FILESIZE, tableDescriptor.getMaxFileSize() == MAX_FILESIZE);
105105
assertTrue("The memstore size must be " + MEMSTORE_FLUSHSIZE, tableDescriptor.getMemStoreFlushSize() == MEMSTORE_FLUSHSIZE);
106106
// Column Family verify
107-
assertTrue("The column family not found", tableDescriptor.hasColumnFamily(HBasePersistentStoreProvider.FAMILY_NAME));
108-
ColumnFamilyDescriptor columnDescriptor = tableDescriptor.getColumnFamily(HBasePersistentStoreProvider.FAMILY_NAME);
107+
assertTrue("The column family not found", tableDescriptor.hasColumnFamily(HBasePersistentStoreProvider.DEFAULT_FAMILY_NAME));
108+
ColumnFamilyDescriptor columnDescriptor = tableDescriptor.getColumnFamily(HBasePersistentStoreProvider.DEFAULT_FAMILY_NAME);
109109
assertTrue("The max number of versions must be " + MAX_VERSIONS, columnDescriptor.getMaxVersions() == MAX_VERSIONS);
110110
assertTrue("The time-to-live must be " + TTL, columnDescriptor.getTimeToLive() == TTL);
111111
// TODO native snappy* library not available

0 commit comments

Comments
 (0)