Skip to content

Commit

Permalink
PHOENIX-7478 HBase 3 compatibility changes: Replace ClusterConnection…
Browse files Browse the repository at this point in the history
… with Connection API

Co-authored-by: Istvan Toth <[email protected]>
  • Loading branch information
szucsvillo and stoty committed Jan 8, 2025
1 parent a51a57a commit bcca4f7
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
Expand Down Expand Up @@ -771,7 +770,13 @@ public ConnectionQueryServices getChildQueryServices(ImmutableBytesWritable tena

@Override
public void clearTableRegionCache(TableName tableName) throws SQLException {
((ClusterConnection)connection).clearRegionCache(tableName);
try {
connection.getRegionLocator(tableName).clearRegionLocationCache();
} catch (IOException e) {
LOGGER.info("Exception while clearing table region cache", e);
//TODO allow passing cause to TableNotFoundException
throw new TableNotFoundException(tableName.toString());
}
}

public byte[] getNextRegionStartKey(HRegionLocation regionLocation, byte[] currentKey,
Expand Down Expand Up @@ -875,8 +880,7 @@ public List<HRegionLocation> getTableRegions(final byte[] tableName, final byte[
currentKey = startRowKey;
do {
HRegionLocation regionLocation =
((ClusterConnection) connection).getRegionLocation(table,
currentKey, false);
connection.getRegionLocator(table).getRegionLocation(currentKey, false);
currentKey =
getNextRegionStartKey(regionLocation, currentKey, prevRegionLocation);
locations.add(regionLocation);
Expand Down Expand Up @@ -2179,8 +2183,9 @@ private MetaDataMutationResult metaDataCoprocessorExec(String tableName, byte[]
long startTime = EnvironmentEdgeManager.currentTimeMillis();
while (true) {
if (retried) {
((ClusterConnection) connection).relocateRegion(
SchemaUtil.getPhysicalName(systemTableName, this.getProps()), tableKey);
connection.getRegionLocator(SchemaUtil.getPhysicalName(
systemTableName, this.getProps()))
.getRegionLocation(tableKey, true);
}

Table ht = this.getTable(SchemaUtil.getPhysicalName(systemTableName, this.getProps()).getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@
import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Delete;
Expand Down Expand Up @@ -6573,7 +6572,7 @@ public MutationState changePermissions(ChangePermsStatement changePermsStatement
LOGGER.info(changePermsStatement.toString());

try(Admin admin = connection.getQueryServices().getAdmin()) {
ClusterConnection clusterConnection = (ClusterConnection) admin.getConnection();
org.apache.hadoop.hbase.client.Connection hConnection = admin.getConnection();

if (changePermsStatement.getSchemaName() != null) {
// SYSTEM.CATALOG doesn't have any entry for "default" HBase namespace, hence we will bypass the check
Expand All @@ -6583,7 +6582,7 @@ public MutationState changePermissions(ChangePermsStatement changePermsStatement
connection);
}

changePermsOnSchema(clusterConnection, changePermsStatement);
changePermsOnSchema(hConnection, changePermsStatement);
} else if (changePermsStatement.getTableName() != null) {
PTable inputTable = connection.getTable(SchemaUtil.
normalizeFullTableName(changePermsStatement.getTableName().toString()));
Expand All @@ -6593,11 +6592,11 @@ public MutationState changePermissions(ChangePermsStatement changePermsStatement

// Changing perms on base table and update the perms for global and view indexes
// Views and local indexes are not physical tables and hence update perms is not needed
changePermsOnTables(clusterConnection, admin, changePermsStatement, inputTable);
changePermsOnTables(hConnection, admin, changePermsStatement, inputTable);
} else {

// User can be given perms at the global level
changePermsOnUser(clusterConnection, changePermsStatement);
changePermsOnUser(hConnection, changePermsStatement);
}

} catch (SQLException e) {
Expand All @@ -6612,20 +6611,25 @@ public MutationState changePermissions(ChangePermsStatement changePermsStatement
return new MutationState(0, 0, connection);
}

private void changePermsOnSchema(ClusterConnection clusterConnection, ChangePermsStatement changePermsStatement) throws Throwable {
private void changePermsOnSchema(org.apache.hadoop.hbase.client.Connection hConnection,
ChangePermsStatement changePermsStatement) throws Throwable {
if (changePermsStatement.isGrantStatement()) {
AccessControlClient.grant(clusterConnection, changePermsStatement.getSchemaName(), changePermsStatement.getName(), changePermsStatement.getPermsList());
AccessControlClient.grant(hConnection, changePermsStatement.getSchemaName(),
changePermsStatement.getName(), changePermsStatement.getPermsList());
} else {
AccessControlClient.revoke(clusterConnection, changePermsStatement.getSchemaName(), changePermsStatement.getName(), Permission.Action.values());
AccessControlClient.revoke(hConnection, changePermsStatement.getSchemaName(),
changePermsStatement.getName(), Permission.Action.values());
}
}

private void changePermsOnTables(ClusterConnection clusterConnection, Admin admin, ChangePermsStatement changePermsStatement, PTable inputTable) throws Throwable {
private void changePermsOnTables(org.apache.hadoop.hbase.client.Connection hConnection,
Admin admin, ChangePermsStatement changePermsStatement,
PTable inputTable) throws Throwable {

org.apache.hadoop.hbase.TableName tableName = SchemaUtil.getPhysicalTableName
(inputTable.getPhysicalName().getBytes(), inputTable.isNamespaceMapped());

changePermsOnTable(clusterConnection, changePermsStatement, tableName);
changePermsOnTable(hConnection, changePermsStatement, tableName);

boolean schemaInconsistency = false;
List<PTable> inconsistentTables = null;
Expand All @@ -6646,7 +6650,7 @@ private void changePermsOnTables(ClusterConnection clusterConnection, Admin admi
LOGGER.info("Updating permissions for Index Table: " +
indexTable.getName() + " Base Table: " + inputTable.getName());
tableName = SchemaUtil.getPhysicalTableName(indexTable.getPhysicalName().getBytes(), indexTable.isNamespaceMapped());
changePermsOnTable(clusterConnection, changePermsStatement, tableName);
changePermsOnTable(hConnection, changePermsStatement, tableName);
}

if (schemaInconsistency) {
Expand All @@ -6664,7 +6668,7 @@ private void changePermsOnTables(ClusterConnection clusterConnection, Admin admi
if (viewIndexTableExists) {
LOGGER.info("Updating permissions for View Index Table: " +
Bytes.toString(viewIndexTableBytes) + " Base Table: " + inputTable.getName());
changePermsOnTable(clusterConnection, changePermsStatement, tableName);
changePermsOnTable(hConnection, changePermsStatement, tableName);
} else {
if (inputTable.isMultiTenant()) {
LOGGER.error("View Index Table not found for MultiTenant Table: " + inputTable.getName());
Expand All @@ -6675,23 +6679,28 @@ private void changePermsOnTables(ClusterConnection clusterConnection, Admin admi
}
}

private void changePermsOnTable(ClusterConnection clusterConnection, ChangePermsStatement changePermsStatement, org.apache.hadoop.hbase.TableName tableName)
private void changePermsOnTable(org.apache.hadoop.hbase.client.Connection hConnection,
ChangePermsStatement changePermsStatement,
org.apache.hadoop.hbase.TableName tableName)
throws Throwable {
if (changePermsStatement.isGrantStatement()) {
AccessControlClient.grant(clusterConnection, tableName, changePermsStatement.getName(),
AccessControlClient.grant(hConnection, tableName, changePermsStatement.getName(),
null, null, changePermsStatement.getPermsList());
} else {
AccessControlClient.revoke(clusterConnection, tableName, changePermsStatement.getName(),
AccessControlClient.revoke(hConnection, tableName, changePermsStatement.getName(),
null, null, Permission.Action.values());
}
}

private void changePermsOnUser(ClusterConnection clusterConnection, ChangePermsStatement changePermsStatement)
private void changePermsOnUser(org.apache.hadoop.hbase.client.Connection hConnection,
ChangePermsStatement changePermsStatement)
throws Throwable {
if (changePermsStatement.isGrantStatement()) {
AccessControlClient.grant(clusterConnection, changePermsStatement.getName(), changePermsStatement.getPermsList());
AccessControlClient.grant(hConnection, changePermsStatement.getName(),
changePermsStatement.getPermsList());
} else {
AccessControlClient.revoke(clusterConnection, changePermsStatement.getName(), Permission.Action.values());
AccessControlClient.revoke(hConnection, changePermsStatement.getName(),
Permission.Action.values());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
Expand All @@ -56,12 +57,9 @@
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.phoenix.cache.GlobalCache;
import org.apache.phoenix.compile.MutationPlan;
import org.apache.phoenix.compile.PostDDLCompiler;
Expand Down Expand Up @@ -677,22 +675,26 @@ public static PhoenixConnection getRebuildIndexConnection(Configuration config)
}

public static boolean tableRegionsOnline(Configuration conf, PTable table) {
try (ClusterConnection hcon =
(ClusterConnection) ConnectionFactory.createConnection(conf)) {
List<HRegionLocation> locations = hcon.locateRegions(
org.apache.hadoop.hbase.TableName.valueOf(table.getPhysicalName().getBytes()));

for (HRegionLocation loc : locations) {
try (Connection hcon = ConnectionFactory.createConnection(conf)) {
Admin admin = hcon.getAdmin();
List<RegionInfo> regionInfos = admin.getRegions(TableName.valueOf(
table.getPhysicalName().getBytes()));
// This makes Number of Regions RPC calls sequentially.
// For large tables this can be slow.
for (RegionInfo regionInfo : regionInfos) {
try {
ServerName sn = loc.getServerName();
if (sn == null) continue;

AdminProtos.AdminService.BlockingInterface admin = hcon.getAdmin(sn);
HBaseRpcController controller = hcon.getRpcControllerFactory().newController();
org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.getRegionInfo(controller,
admin, loc.getRegion().getRegionName());
} catch (RemoteException e) {
LOGGER.debug("Cannot get region " + loc.getRegion().getEncodedName() + " info due to error:" + e);
// We don't actually care about the compaction state, we are only calling this
// because this will trigger a call to the RS (from master), and we want to make
// sure that all RSs are available
// There are only a few methods in HBase 3.0 that are directly calling the RS,
// this is one of them.
admin.getCompactionStateForRegion(regionInfo.getRegionName());
// This used to make a direct RPC call to the region, but HBase 3 makes that
// very hard (needs reflection, or a bridge class in the same package),
// and it's not necessary for checking the RS liveness
} catch (IOException e) {
LOGGER.debug("Cannot get region " + regionInfo.getEncodedName()
+ " info due to error:" + e);
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
Expand Down Expand Up @@ -259,7 +258,7 @@ public void testRecoveryRegionPostOpen() throws Exception {
scan = new Scan();
primaryTable.close();
primaryTable = hbaseConn.getTable(TableName.valueOf(DATA_TABLE_NAME));
((ClusterConnection)hbaseConn).clearRegionLocationCache();
hbaseConn.clearRegionLocationCache();
resultScanner = primaryTable.getScanner(scan);
count = 0;
for (Result result : resultScanner) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -108,8 +107,8 @@ public void testSplitWithCachedMeta() throws Exception {
admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
Configuration configuration = conn.unwrap(PhoenixConnection.class).getQueryServices().getConfiguration();
org.apache.hadoop.hbase.client.Connection hbaseConn = ConnectionFactory.createConnection(configuration);
((ClusterConnection)hbaseConn).clearRegionCache(TableName.valueOf(tableName));
RegionLocator regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(tableName));
regionLocator.clearRegionLocationCache();
int nRegions = regionLocator.getAllRegionLocations().size();
admin.split(tn, ByteUtil.concat(Bytes.toBytes(tenantId), Bytes.toBytes("00A3")));
int retryCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -135,8 +134,8 @@ public void testSplitWithCachedMeta() throws Exception {
conn.unwrap(PhoenixConnection.class).getQueryServices().getConfiguration();
org.apache.hadoop.hbase.client.Connection hbaseConn =
ConnectionFactory.createConnection(configuration);
((ClusterConnection) hbaseConn).clearRegionCache(TableName.valueOf(tableName));
RegionLocator regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(tableName));
regionLocator.clearRegionLocationCache();
int nRegions = regionLocator.getAllRegionLocations().size();
admin.split(tn, ByteUtil.concat(Bytes.toBytes(tenantId), Bytes.toBytes("00A3")));
int retryCount = 0;
Expand Down

0 comments on commit bcca4f7

Please sign in to comment.