Skip to content

Commit

Permalink
PHOENIX-7474 Migrate IndexTool tables and make sure they are created
Browse files Browse the repository at this point in the history
  • Loading branch information
richardantal authored and Richard Antal committed Dec 19, 2024
1 parent a75c4dc commit f2470b8
Show file tree
Hide file tree
Showing 15 changed files with 754 additions and 96 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.mapreduce.index;



import java.io.IOException;
import java.sql.Connection;

import java.sql.SQLException;
import java.util.UUID;


import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.client.*;
import org.apache.phoenix.coprocessorclient.MetaDataProtocol;
import org.apache.phoenix.query.QueryConstants;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.TableName;

import org.apache.phoenix.jdbc.PhoenixConnection;

import org.apache.phoenix.query.ConnectionQueryServices;

import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.util.ClientUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.phoenix.query.QueryConstants.SYSTEM_SCHEMA_NAME;

/**
* Utility class to create index tables and/or migrate them.
*
*/
public class IndexToolTableUtil extends Configured {
private static final Logger LOGGER = LoggerFactory.getLogger(IndexToolTableUtil.class);

public final static String OUTPUT_TABLE_NAME = "PHOENIX_INDEX_TOOL";
public static String SYSTEM_OUTPUT_TABLE_NAME = SchemaUtil.getTableName(SYSTEM_SCHEMA_NAME,
OUTPUT_TABLE_NAME);

public final static String RESULT_TABLE_NAME = "PHOENIX_INDEX_TOOL_RESULT";
public static String SYSTEM_RESULT_TABLE_NAME = SchemaUtil.getTableName(SYSTEM_SCHEMA_NAME,
RESULT_TABLE_NAME);

public static void setIndexToolTableName(Connection connection) throws Exception {
ConnectionQueryServices queryServices = connection.unwrap(PhoenixConnection.class).getQueryServices();
if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, queryServices.getConfiguration())) {
SYSTEM_OUTPUT_TABLE_NAME = SchemaUtil.getTableName(SYSTEM_SCHEMA_NAME, OUTPUT_TABLE_NAME).replace(
QueryConstants.NAME_SEPARATOR,
QueryConstants.NAMESPACE_SEPARATOR);
SYSTEM_RESULT_TABLE_NAME = SchemaUtil.getTableName(SYSTEM_SCHEMA_NAME, RESULT_TABLE_NAME).replace(
QueryConstants.NAME_SEPARATOR,
QueryConstants.NAMESPACE_SEPARATOR);
} else {
SYSTEM_OUTPUT_TABLE_NAME = SchemaUtil.getTableName(SYSTEM_SCHEMA_NAME, OUTPUT_TABLE_NAME);
SYSTEM_RESULT_TABLE_NAME = SchemaUtil.getTableName(SYSTEM_SCHEMA_NAME, RESULT_TABLE_NAME);
}
}

public static Table createResultTable(Connection connection) throws IOException, SQLException {
ConnectionQueryServices queryServices = connection.unwrap(PhoenixConnection.class).getQueryServices();
try (Admin admin = queryServices.getAdmin()) {
TableName resultTableName = TableName.valueOf(SYSTEM_RESULT_TABLE_NAME);
createSystemNamespaceTable(connection);
return createTable(admin, resultTableName);
}
}

public static Table createOutputTable(Connection connection) throws IOException, SQLException {
ConnectionQueryServices queryServices = connection.unwrap(PhoenixConnection.class).getQueryServices();
try (Admin admin = queryServices.getAdmin()) {
TableName outputTableName = TableName.valueOf(SYSTEM_OUTPUT_TABLE_NAME);
createSystemNamespaceTable(connection);
return createTable(admin, outputTableName);
}
}

public static void createSystemNamespaceTable(Connection connection) throws IOException, SQLException {
ConnectionQueryServices queryServices = connection.unwrap(PhoenixConnection.class).getQueryServices();
if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, queryServices.getConfiguration())) {
try (Admin admin = queryServices.getAdmin()) {
if (!ClientUtil.isHBaseNamespaceAvailable(admin, SYSTEM_SCHEMA_NAME)) {
NamespaceDescriptor namespaceDescriptor =
NamespaceDescriptor.create(SYSTEM_SCHEMA_NAME).build();
admin.createNamespace(namespaceDescriptor);
}
}
}
}

@VisibleForTesting
private static Table createTable(Admin admin, TableName tableName) throws IOException {
if (!admin.tableExists(tableName)) {
ColumnFamilyDescriptor columnDescriptor =
ColumnFamilyDescriptorBuilder
.newBuilder(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES)
.setTimeToLive(MetaDataProtocol.DEFAULT_LOG_TTL)
.build();
TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(columnDescriptor).build();
try {
admin.createTable(tableDescriptor);
} catch (TableExistsException e) {
LOGGER.warn("Table exists, ignoring", e);
}
}
return admin.getConnection().getTable(tableName);
}


public static void createNewIndexToolTables(Connection connection) throws Exception {
setIndexToolTableName(connection);

migrateTable(connection, OUTPUT_TABLE_NAME);
migrateTable(connection, RESULT_TABLE_NAME);
}

private static void migrateTable(Connection connection, String tableName) throws Exception {
if (!tableName.equals(OUTPUT_TABLE_NAME) && !tableName.equals(RESULT_TABLE_NAME)) {
LOGGER.info("Only migrating PHOENIX_INDEX_TOOL tables!");
} else {
ConnectionQueryServices queryServices = connection.unwrap(PhoenixConnection.class).getQueryServices();
try (Admin admin = queryServices.getAdmin()) {
TableName oldTableName = TableName.valueOf(tableName);
String newTableNameString = tableName.equals(OUTPUT_TABLE_NAME) ?
SYSTEM_OUTPUT_TABLE_NAME : SYSTEM_RESULT_TABLE_NAME;

TableName newTableName = TableName.valueOf(newTableNameString);

if (admin.tableExists(oldTableName)) {
String snapshotName = tableName + "_" + UUID.randomUUID();
admin.disableTable(oldTableName);
admin.snapshot(snapshotName, oldTableName);
admin.cloneSnapshot(snapshotName, newTableName);
admin.deleteSnapshot(snapshotName);
admin.deleteTable(oldTableName);
} else {
createTable(admin, newTableName);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,8 @@
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;

import org.apache.phoenix.mapreduce.index.IndexToolTableUtil;

public class ConnectionQueryServicesImpl extends DelegateQueryServices implements ConnectionQueryServices {
private static final Logger LOGGER =
LoggerFactory.getLogger(ConnectionQueryServicesImpl.class);
Expand Down Expand Up @@ -4025,6 +4027,12 @@ private void createOtherSystemTables(PhoenixConnection metaConnection) throws SQ
try {
metaConnection.createStatement().executeUpdate(getCDCStreamDDL());
} catch (TableAlreadyExistsException ignore) {}
try {
// check if we have old PHOENIX_INDEX_TOOL tables
// move data to the new tables under System, or simply create the new tables
IndexToolTableUtil.createNewIndexToolTables(metaConnection);

} catch (Exception ignore) {}
}

/**
Expand Down Expand Up @@ -4588,6 +4596,13 @@ public void upgradeSystemTables(final String url, final Properties props) throws
// with SYSTEM Namespace
createSchemaIfNotExistsSystemNSMappingEnabled(metaConnection);

try {
// check if we have old PHOENIX_INDEX_TOOL tables
// move data to the new tables under System, or simply create the new tables
IndexToolTableUtil.createNewIndexToolTables(metaConnection);

} catch (Exception ignore) {}

clearUpgradeRequired();
success = true;
} catch (UpgradeInProgressException | UpgradeNotRequiredException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import java.util.Map;
import java.util.UUID;

import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
import org.apache.phoenix.hbase.index.AbstractValueGetter;
Expand Down Expand Up @@ -985,10 +987,22 @@ private void setupIndexAndDataTable(Connection connection) throws SQLException,
indexTable, qDataTable));
}
qSchemaName = SchemaUtil.normalizeIdentifier(schemaName);

pIndexTable = connection.unwrap(PhoenixConnection.class).getTable(
SchemaUtil.getQualifiedTableName(schemaName, indexTable));
if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, getConf())) {
pIndexTable = connection.unwrap(PhoenixConnection.class).getTable(
SchemaUtil.getQualifiedTableName(schemaName, indexTable).replace(
QueryConstants.NAME_SEPARATOR,
QueryConstants.NAMESPACE_SEPARATOR));
}

indexType = pIndexTable.getIndexType();
qIndexTable = SchemaUtil.getQualifiedTableName(schemaName, indexTable);
if (SchemaUtil.isNamespaceMappingEnabled(PTableType.SYSTEM, getConf())) {
qIndexTable = qIndexTable.replace(QueryConstants.NAME_SEPARATOR, QueryConstants.NAMESPACE_SEPARATOR);
}

if (IndexType.LOCAL.equals(indexType)) {
isLocalIndexBuild = true;
if (useSnapshot) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,12 @@

import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;

import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessorclient.MetaDataProtocol;
import org.apache.phoenix.hbase.index.table.HTableFactory;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.jdbc.PhoenixConnection;
Expand Down Expand Up @@ -60,8 +52,8 @@ public class IndexVerificationOutputRepository implements AutoCloseable {
IndexTool.IndexDisableLoggingType.NONE;
private boolean shouldLogBeyondMaxLookback = true;

public final static String OUTPUT_TABLE_NAME = "PHOENIX_INDEX_TOOL";
public final static byte[] OUTPUT_TABLE_NAME_BYTES = Bytes.toBytes(OUTPUT_TABLE_NAME);
public final static String OUTPUT_TABLE_NAME = IndexToolTableUtil.SYSTEM_OUTPUT_TABLE_NAME;
public final static byte[] OUTPUT_TABLE_NAME_BYTES = Bytes.toBytes(IndexToolTableUtil.SYSTEM_OUTPUT_TABLE_NAME);
public final static byte[] OUTPUT_TABLE_COLUMN_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;

public final static String DATA_TABLE_NAME = "DTName";
Expand Down Expand Up @@ -177,26 +169,7 @@ private static byte[] generatePartialOutputTableRowKey(long ts, byte[] indexTabl
}

public void createOutputTable(Connection connection) throws IOException, SQLException {
ConnectionQueryServices queryServices = connection.unwrap(PhoenixConnection.class).getQueryServices();
try (Admin admin = queryServices.getAdmin()) {
TableName outputTableName = TableName.valueOf(OUTPUT_TABLE_NAME);
if (!admin.tableExists(outputTableName)) {
ColumnFamilyDescriptor columnDescriptor =
ColumnFamilyDescriptorBuilder
.newBuilder(OUTPUT_TABLE_COLUMN_FAMILY)
.setTimeToLive(MetaDataProtocol.DEFAULT_LOG_TTL)
.build();
TableDescriptor tableDescriptor = TableDescriptorBuilder
.newBuilder(TableName.valueOf(OUTPUT_TABLE_NAME))
.setColumnFamily(columnDescriptor).build();
try {
admin.createTable(tableDescriptor);
} catch (TableExistsException e) {
LOGGER.warn("Table exists, ignoring", e);
}
outputTable = admin.getConnection().getTable(outputTableName);
}
}
outputTable = IndexToolTableUtil.createOutputTable(connection);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,18 @@

import org.apache.hadoop.hbase.Cell;

import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.IndexToolVerificationResult;
import org.apache.phoenix.coprocessorclient.MetaDataProtocol;
import org.apache.phoenix.hbase.index.table.HTableFactory;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.util.ByteUtil;
import org.slf4j.Logger;
Expand All @@ -58,8 +49,8 @@ public class IndexVerificationResultRepository implements AutoCloseable {
private Table indexTable;
public static final String ROW_KEY_SEPARATOR = "|";
public static final byte[] ROW_KEY_SEPARATOR_BYTE = Bytes.toBytes(ROW_KEY_SEPARATOR);
public final static String RESULT_TABLE_NAME = "PHOENIX_INDEX_TOOL_RESULT";
public final static byte[] RESULT_TABLE_NAME_BYTES = Bytes.toBytes(RESULT_TABLE_NAME);
public static String RESULT_TABLE_NAME = IndexToolTableUtil.SYSTEM_RESULT_TABLE_NAME;
public static byte[] RESULT_TABLE_NAME_BYTES = Bytes.toBytes(IndexToolTableUtil.SYSTEM_RESULT_TABLE_NAME);
public final static byte[] RESULT_TABLE_COLUMN_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
public final static String SCANNED_DATA_ROW_COUNT = "ScannedDataRowCount";
public final static byte[] SCANNED_DATA_ROW_COUNT_BYTES = Bytes.toBytes(SCANNED_DATA_ROW_COUNT);
Expand Down Expand Up @@ -152,37 +143,18 @@ public IndexVerificationResultRepository(){
}

public IndexVerificationResultRepository(Connection conn, byte[] indexNameBytes) throws SQLException {
resultTable = getTable(conn, RESULT_TABLE_NAME_BYTES);
resultTable = getTable(conn, Bytes.toBytes(IndexToolTableUtil.SYSTEM_RESULT_TABLE_NAME));
indexTable = getTable(conn, indexNameBytes);
}

public IndexVerificationResultRepository(byte[] indexName,
HTableFactory hTableFactory) throws IOException {
resultTable = hTableFactory.getTable(new ImmutableBytesPtr(RESULT_TABLE_NAME_BYTES));
resultTable = hTableFactory.getTable(new ImmutableBytesPtr(Bytes.toBytes(IndexToolTableUtil.SYSTEM_RESULT_TABLE_NAME)));
indexTable = hTableFactory.getTable(new ImmutableBytesPtr(indexName));
}

public void createResultTable(Connection connection) throws IOException, SQLException {
ConnectionQueryServices queryServices = connection.unwrap(PhoenixConnection.class).getQueryServices();
try (Admin admin = queryServices.getAdmin()) {
TableName resultTableName = TableName.valueOf(RESULT_TABLE_NAME);
if (!admin.tableExists(resultTableName)) {
ColumnFamilyDescriptor columnDescriptor =
ColumnFamilyDescriptorBuilder
.newBuilder(RESULT_TABLE_COLUMN_FAMILY)
.setTimeToLive(MetaDataProtocol.DEFAULT_LOG_TTL)
.build();
TableDescriptor tableDescriptor =
TableDescriptorBuilder.newBuilder(resultTableName)
.setColumnFamily(columnDescriptor).build();
try {
admin.createTable(tableDescriptor);
} catch (TableExistsException e) {
LOGGER.warn("Table exists, ignoring", e);
}
resultTable = admin.getConnection().getTable(resultTableName);
}
}
resultTable = IndexToolTableUtil.createResultTable(connection);
}

private static byte[] generatePartialResultTableRowKey(long ts, byte[] indexTableName) {
Expand Down Expand Up @@ -356,7 +328,7 @@ private IndexToolVerificationResult aggregateVerificationResult(

public IndexToolVerificationResult getVerificationResult(Connection conn,
long ts, byte[] indexTableName) throws IOException, SQLException {
try (Table hTable = getTable(conn, RESULT_TABLE_NAME_BYTES)) {
try (Table hTable = getTable(conn, Bytes.toBytes(IndexToolTableUtil.SYSTEM_RESULT_TABLE_NAME))) {
byte[] startRowKey = generatePartialResultTableRowKey(ts,
indexTableName);
byte[] stopRowKey = ByteUtil.calculateTheClosestNextRowKeyForPrefix(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.task.Task;
import org.apache.phoenix.schema.transform.Transform;
import org.apache.phoenix.util.SchemaUtil;
Expand Down
Loading

0 comments on commit f2470b8

Please sign in to comment.