|
20 | 20 | import java.io.IOException;
|
21 | 21 | import java.util.Map;
|
22 | 22 |
|
| 23 | +import org.apache.drill.common.AutoCloseables; |
23 | 24 | import org.apache.drill.common.exceptions.DrillRuntimeException;
|
24 | 25 | import org.apache.drill.exec.exception.StoreException;
|
25 | 26 | import org.apache.drill.exec.store.hbase.DrillHBaseConstants;
|
26 | 27 | import org.apache.drill.exec.store.sys.PersistentStore;
|
27 | 28 | import org.apache.drill.exec.store.sys.PersistentStoreConfig;
|
28 | 29 | import org.apache.drill.exec.store.sys.PersistentStoreRegistry;
|
29 | 30 | import org.apache.drill.exec.store.sys.store.provider.BasePersistentStoreProvider;
|
| 31 | +import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting; |
| 32 | +import org.apache.drill.shaded.guava.com.google.common.collect.Maps; |
30 | 33 | import org.apache.hadoop.conf.Configuration;
|
31 | 34 | import org.apache.hadoop.hbase.HBaseConfiguration;
|
32 |
| -import org.apache.hadoop.hbase.HColumnDescriptor; |
33 | 35 | import org.apache.hadoop.hbase.HConstants;
|
34 |
| -import org.apache.hadoop.hbase.HTableDescriptor; |
35 | 36 | import org.apache.hadoop.hbase.TableName;
|
36 | 37 | import org.apache.hadoop.hbase.client.Admin;
|
| 38 | +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; |
37 | 39 | import org.apache.hadoop.hbase.client.Connection;
|
38 | 40 | import org.apache.hadoop.hbase.client.ConnectionFactory;
|
| 41 | +import org.apache.hadoop.hbase.client.Durability; |
39 | 42 | import org.apache.hadoop.hbase.client.Table;
|
| 43 | +import org.apache.hadoop.hbase.client.TableDescriptor; |
| 44 | +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; |
| 45 | +import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; |
| 46 | +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; |
40 | 47 | import org.apache.hadoop.hbase.util.Bytes;
|
41 | 48 |
|
42 |
| -import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting; |
43 |
| - |
44 | 49 | public class HBasePersistentStoreProvider extends BasePersistentStoreProvider {
|
45 | 50 | private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBasePersistentStoreProvider.class);
|
46 | 51 |
|
47 |
| - static final byte[] FAMILY = Bytes.toBytes("s"); |
| 52 | + public static final byte[] FAMILY_NAME = Bytes.toBytes("s"); |
| 53 | + |
| 54 | + public static final byte[] QUALIFIER_NAME = Bytes.toBytes("d"); |
48 | 55 |
|
49 |
| - static final byte[] QUALIFIER = Bytes.toBytes("d"); |
| 56 | + private static final String HBASE_CLIENT_ID = "drill-hbase-persistent-store-client"; |
50 | 57 |
|
51 | 58 | private final TableName hbaseTableName;
|
52 | 59 |
|
| 60 | + private Table hbaseTable; |
| 61 | + |
53 | 62 | private Configuration hbaseConf;
|
54 | 63 |
|
55 |
| - private Connection connection; |
| 64 | + private final Map<String, Object> tableConfig; |
56 | 65 |
|
57 |
| - private Table hbaseTable; |
| 66 | + private final Map<String, Object> columnConfig; |
58 | 67 |
|
| 68 | + private Connection connection; |
| 69 | + |
| 70 | + @SuppressWarnings("unchecked") |
59 | 71 | public HBasePersistentStoreProvider(PersistentStoreRegistry registry) {
|
60 |
| - @SuppressWarnings("unchecked") |
61 |
| - final Map<String, Object> config = (Map<String, Object>) registry.getConfig().getAnyRef(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_CONFIG); |
62 |
| - this.hbaseConf = HBaseConfiguration.create(); |
63 |
| - this.hbaseConf.set(HConstants.HBASE_CLIENT_INSTANCE_ID, "drill-hbase-persistent-store-client"); |
64 |
| - if (config != null) { |
65 |
| - for (Map.Entry<String, Object> entry : config.entrySet()) { |
66 |
| - this.hbaseConf.set(entry.getKey(), String.valueOf(entry.getValue())); |
| 72 | + final Map<String, Object> hbaseConfig = (Map<String, Object>) registry.getConfig().getAnyRef(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_CONFIG); |
| 73 | + if (registry.getConfig().hasPath(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_TABLE_CONFIG)) { |
| 74 | + tableConfig = (Map<String, Object>) registry.getConfig().getAnyRef(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_TABLE_CONFIG); |
| 75 | + } else { |
| 76 | + tableConfig = Maps.newHashMap(); |
| 77 | + } |
| 78 | + if (registry.getConfig().hasPath(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_COLUMN_CONFIG)) { |
| 79 | + columnConfig = (Map<String, Object>) registry.getConfig().getAnyRef(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_COLUMN_CONFIG); |
| 80 | + } else { |
| 81 | + columnConfig = Maps.newHashMap(); |
| 82 | + } |
| 83 | + hbaseConf = HBaseConfiguration.create(); |
| 84 | + hbaseConf.set(HConstants.HBASE_CLIENT_INSTANCE_ID, HBASE_CLIENT_ID); |
| 85 | + if (hbaseConfig != null) { |
| 86 | + for (Map.Entry<String, Object> entry : hbaseConfig.entrySet()) { |
| 87 | + hbaseConf.set(entry.getKey(), String.valueOf(entry.getValue())); |
67 | 88 | }
|
68 | 89 | }
|
69 |
| - this.hbaseTableName = TableName.valueOf(registry.getConfig().getString(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_TABLE)); |
| 90 | + logger.info("Received the hbase config is {}", hbaseConfig); |
| 91 | + if (!tableConfig.isEmpty()) { |
| 92 | + logger.info("Received the table config is {}", tableConfig); |
| 93 | + } |
| 94 | + if (!columnConfig.isEmpty()) { |
| 95 | + logger.info("Received the column config is {}", columnConfig); |
| 96 | + } |
| 97 | + hbaseTableName = TableName.valueOf(registry.getConfig().getString(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_TABLE)); |
70 | 98 | }
|
71 | 99 |
|
72 | 100 | @VisibleForTesting
|
73 | 101 | public HBasePersistentStoreProvider(Configuration conf, String storeTableName) {
|
| 102 | + this.tableConfig = Maps.newHashMap(); |
| 103 | + this.columnConfig = Maps.newHashMap(); |
74 | 104 | this.hbaseConf = conf;
|
75 | 105 | this.hbaseTableName = TableName.valueOf(storeTableName);
|
76 | 106 | }
|
77 | 107 |
|
78 |
| - |
| 108 | + @VisibleForTesting |
| 109 | + public HBasePersistentStoreProvider(Map<String, Object> tableConfig, Map<String, Object> columnConfig, Configuration conf, String storeTableName) { |
| 110 | + this.tableConfig = tableConfig; |
| 111 | + this.columnConfig = columnConfig; |
| 112 | + this.hbaseConf = conf; |
| 113 | + this.hbaseTableName = TableName.valueOf(storeTableName); |
| 114 | + } |
79 | 115 |
|
80 | 116 | @Override
|
81 | 117 | public <V> PersistentStore<V> getOrCreateStore(PersistentStoreConfig<V> config) throws StoreException {
|
82 |
| - switch(config.getMode()){ |
| 118 | + switch (config.getMode()) { |
83 | 119 | case BLOB_PERSISTENT:
|
84 | 120 | case PERSISTENT:
|
85 |
| - return new HBasePersistentStore<>(config, this.hbaseTable); |
86 |
| - |
| 121 | + return new HBasePersistentStore<>(config, hbaseTable); |
87 | 122 | default:
|
88 |
| - throw new IllegalStateException(); |
| 123 | + throw new IllegalStateException("Unknown persistent mode"); |
89 | 124 | }
|
90 | 125 | }
|
91 | 126 |
|
92 |
| - |
93 | 127 | @Override
|
94 | 128 | public void start() throws IOException {
|
| 129 | + // Create the column family builder |
| 130 | + ColumnFamilyDescriptorBuilder columnFamilyBuilder = ColumnFamilyDescriptorBuilder |
| 131 | + .newBuilder(FAMILY_NAME) |
| 132 | + .setMaxVersions(1); |
| 133 | + // Append the config to column family |
| 134 | + verifyAndSetColumnConfig(columnConfig, columnFamilyBuilder); |
| 135 | + // Create the table builder |
| 136 | + TableDescriptorBuilder tableBuilder = TableDescriptorBuilder |
| 137 | + .newBuilder(hbaseTableName) |
| 138 | + .setColumnFamily(columnFamilyBuilder.build()); |
| 139 | + // Append the config to table |
| 140 | + verifyAndSetTableConfig(tableConfig, tableBuilder); |
95 | 141 | this.connection = ConnectionFactory.createConnection(hbaseConf);
|
96 |
| - |
97 | 142 | try(Admin admin = connection.getAdmin()) {
|
98 | 143 | if (!admin.tableExists(hbaseTableName)) {
|
99 |
| - HTableDescriptor desc = new HTableDescriptor(hbaseTableName); |
100 |
| - desc.addFamily(new HColumnDescriptor(FAMILY).setMaxVersions(1)); |
101 |
| - admin.createTable(desc); |
| 144 | + // Go to create the table |
| 145 | + admin.createTable(tableBuilder.build()); |
| 146 | + logger.info("The HBase table of persistent store created : {}", hbaseTableName); |
102 | 147 | } else {
|
103 |
| - HTableDescriptor desc = admin.getTableDescriptor(hbaseTableName); |
104 |
| - if (!desc.hasFamily(FAMILY)) { |
| 148 | + TableDescriptor table = admin.getDescriptor(hbaseTableName); |
| 149 | + if (!admin.isTableEnabled(hbaseTableName)) { |
| 150 | + admin.enableTable(hbaseTableName); // In case the table is disabled |
| 151 | + } |
| 152 | + if (!table.hasColumnFamily(FAMILY_NAME)) { |
105 | 153 | throw new DrillRuntimeException("The HBase table " + hbaseTableName
|
106 | 154 | + " specified as persistent store exists but does not contain column family: "
|
107 |
| - + (Bytes.toString(FAMILY))); |
| 155 | + + (Bytes.toString(FAMILY_NAME))); |
108 | 156 | }
|
| 157 | + logger.info("The HBase table of persistent store is loaded : {}", hbaseTableName); |
109 | 158 | }
|
110 | 159 | }
|
111 | 160 |
|
112 | 161 | this.hbaseTable = connection.getTable(hbaseTableName);
|
113 | 162 | }
|
114 | 163 |
|
115 |
| - @Override |
116 |
| - public synchronized void close() { |
117 |
| - if (this.hbaseTable != null) { |
118 |
| - try { |
119 |
| - this.hbaseTable.close(); |
120 |
| - this.hbaseTable = null; |
121 |
| - } catch (IOException e) { |
122 |
| - logger.warn("Caught exception while closing HBase table.", e); |
| 164 | + /** |
| 165 | + * Verify the configuration of HBase table and |
| 166 | + * add them to the table builder. |
| 167 | + * @param config Received the table config |
| 168 | + * @param builder HBase table builder |
| 169 | + */ |
| 170 | + private void verifyAndSetTableConfig(Map<String, Object> config, TableDescriptorBuilder builder) { |
| 171 | + for (Map.Entry<String, Object> entry : config.entrySet()) { |
| 172 | + switch (entry.getKey().toUpperCase()) { |
| 173 | + case TableDescriptorBuilder.DURABILITY: |
| 174 | + Durability durability = Durability.valueOf(((String) entry.getValue()).toUpperCase()); |
| 175 | + builder.setDurability(durability); |
| 176 | + break; |
| 177 | + case TableDescriptorBuilder.COMPACTION_ENABLED: |
| 178 | + builder.setCompactionEnabled((Boolean) entry.getValue()); |
| 179 | + break; |
| 180 | + case TableDescriptorBuilder.SPLIT_ENABLED: |
| 181 | + builder.setSplitEnabled((Boolean) entry.getValue()); |
| 182 | + break; |
| 183 | + case TableDescriptorBuilder.FLUSH_POLICY: |
| 184 | + builder.setFlushPolicyClassName((String) entry.getValue()); |
| 185 | + break; |
| 186 | + case TableDescriptorBuilder.SPLIT_POLICY: |
| 187 | + builder.setRegionSplitPolicyClassName((String) entry.getValue()); |
| 188 | + break; |
| 189 | + case TableDescriptorBuilder.MAX_FILESIZE: |
| 190 | + builder.setMaxFileSize((Integer) entry.getValue()); |
| 191 | + break; |
| 192 | + case TableDescriptorBuilder.MEMSTORE_FLUSHSIZE: |
| 193 | + builder.setMemStoreFlushSize((Integer) entry.getValue()); |
| 194 | + break; |
| 195 | + default: |
| 196 | + break; |
123 | 197 | }
|
124 | 198 | }
|
125 |
| - if (this.connection != null && !this.connection.isClosed()) { |
126 |
| - try { |
127 |
| - this.connection.close(); |
128 |
| - } catch (IOException e) { |
129 |
| - logger.warn("Caught exception while closing HBase connection.", e); |
| 199 | + } |
| 200 | + |
| 201 | + /** |
| 202 | + * Verify the configuration of HBase column family and |
| 203 | + * add them to the column family builder. |
| 204 | + * @param config Received the column config |
| 205 | + * @param builder HBase column family builder |
| 206 | + */ |
| 207 | + private void verifyAndSetColumnConfig(Map<String, Object> config, ColumnFamilyDescriptorBuilder builder) { |
| 208 | + for (Map.Entry<String, Object> entry : config.entrySet()) { |
| 209 | + switch (entry.getKey().toUpperCase()) { |
| 210 | + case ColumnFamilyDescriptorBuilder.MAX_VERSIONS: |
| 211 | + builder.setMaxVersions((Integer) entry.getValue()); |
| 212 | + break; |
| 213 | + case ColumnFamilyDescriptorBuilder.TTL: |
| 214 | + builder.setTimeToLive((Integer) entry.getValue()); |
| 215 | + break; |
| 216 | + case ColumnFamilyDescriptorBuilder.COMPRESSION: |
| 217 | + Algorithm algorithm = Algorithm.valueOf(((String) entry.getValue()).toUpperCase()); |
| 218 | + builder.setCompressionType(algorithm); |
| 219 | + break; |
| 220 | + case ColumnFamilyDescriptorBuilder.BLOCKCACHE: |
| 221 | + builder.setBlockCacheEnabled((Boolean) entry.getValue()); |
| 222 | + break; |
| 223 | + case ColumnFamilyDescriptorBuilder.BLOCKSIZE: |
| 224 | + builder.setBlocksize((Integer) entry.getValue()); |
| 225 | + break; |
| 226 | + case ColumnFamilyDescriptorBuilder.DATA_BLOCK_ENCODING: |
| 227 | + DataBlockEncoding encoding = DataBlockEncoding.valueOf(((String) entry.getValue()).toUpperCase()); |
| 228 | + builder.setDataBlockEncoding(encoding); |
| 229 | + break; |
| 230 | + case ColumnFamilyDescriptorBuilder.IN_MEMORY: |
| 231 | + builder.setInMemory((Boolean) entry.getValue()); |
| 232 | + break; |
| 233 | + case ColumnFamilyDescriptorBuilder.DFS_REPLICATION: |
| 234 | + builder.setDFSReplication(((Integer) entry.getValue()).shortValue()); |
| 235 | + break; |
| 236 | + default: |
| 237 | + break; |
130 | 238 | }
|
131 |
| - this.connection = null; |
132 | 239 | }
|
133 | 240 | }
|
134 | 241 |
|
| 242 | + @Override |
| 243 | + public synchronized void close() { |
| 244 | + if (hbaseTable != null) { |
| 245 | + AutoCloseables.closeSilently(hbaseTable); |
| 246 | + } |
| 247 | + if (connection != null && !connection.isClosed()) { |
| 248 | + AutoCloseables.closeSilently(connection); |
| 249 | + } |
| 250 | + logger.info("The HBase connection of persistent store closed."); |
| 251 | + } |
135 | 252 | }
|
0 commit comments