Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PHOENIX-7507 Update ROW_KEY_MATCHER column type to be VARBINARY_ENCODED #2060

Merged
merged 1 commit into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ public abstract class MetaDataProtocol extends MetaDataService {
public static final int MIN_CLIENT_RETRY_INDEX_WRITES = VersionUtil.encodeVersion("4", "14", "0");
public static final int MIN_TX_CLIENT_SIDE_MAINTENANCE = VersionUtil.encodeVersion("4", "14", "0");
public static final int MIN_PENDING_DISABLE_INDEX = VersionUtil.encodeVersion("4", "14", "0");
// The minimum client version that allows VARBINARY_ENCODED columns
public static final int MIN_VERSION_ALLOW_VBE_COLUMNS = VersionUtil.encodeVersion("5", "3", "0");
// Version below which we should turn off essential column family.
public static final int ESSENTIAL_FAMILY_VERSION_THRESHOLD = VersionUtil.encodeVersion("0", "94", "7");
/** Version below which we fall back on the generic KeyValueBuilder */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ enum JoinType {INNER, LEFT_OUTER}
MAX_LOOKBACK_AGE + " BIGINT, \n" +
CDC_INCLUDE_TABLE + " VARCHAR, \n" +
TTL + " VARCHAR, \n" +
ROW_KEY_MATCHER + " VARBINARY, \n" +
ROW_KEY_MATCHER + " VARBINARY_ENCODED, \n" +
// Column metadata (will be null for table row)
DATA_TYPE + " INTEGER," +
COLUMN_SIZE + " INTEGER," +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3655,7 +3655,7 @@ public boolean isViewReferenced() {

if ((rowKeyMatcher == null) ||
Bytes.compareTo(rowKeyMatcher, HConstants.EMPTY_BYTE_ARRAY) == 0) {
tableUpsert.setNull(38, Types.VARBINARY);
tableUpsert.setNull(38, PDataType.VARBINARY_ENCODED_TYPE);
} else {
tableUpsert.setBytes(38, rowKeyMatcher);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.coprocessor.generated.DynamicColumnMetaDataProtos;
import org.apache.phoenix.coprocessor.generated.PTableProtos;
import org.apache.phoenix.coprocessorclient.MetaDataProtocol;
import org.apache.phoenix.exception.DataExceedsCapacityException;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.LiteralExpression;
Expand All @@ -77,6 +78,7 @@
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.parse.ParseNode;
Expand Down Expand Up @@ -122,80 +124,17 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import javax.annotation.Nonnull;

import javax.annotation.Nonnull;

import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compile.ExpressionCompiler;
import org.apache.phoenix.compile.FromCompiler;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.coprocessor.generated.DynamicColumnMetaDataProtos;
import org.apache.phoenix.coprocessor.generated.PTableProtos;
import org.apache.phoenix.exception.DataExceedsCapacityException;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.LiteralExpression;
import org.apache.phoenix.expression.SingleCellConstructorExpression;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.parse.ParseNode;
import org.apache.phoenix.parse.SQLParser;
import org.apache.phoenix.protobuf.ProtobufUtil;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.RowKeySchema.RowKeySchemaBuilder;
import org.apache.phoenix.schema.transform.TransformMaintainer;
import org.apache.phoenix.schema.types.PBinary;
import org.apache.phoenix.schema.types.PChar;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PDouble;
import org.apache.phoenix.schema.types.PFloat;
import org.apache.phoenix.schema.types.PVarbinaryEncoded;
import org.apache.phoenix.schema.types.PVarchar;

import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.thirdparty.com.google.common.base.Objects;
import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
import org.apache.phoenix.thirdparty.com.google.common.collect.ArrayListMultimap;
import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableSortedMap;
import org.apache.phoenix.thirdparty.com.google.common.collect.ListMultimap;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.SizedUtil;
import org.apache.phoenix.util.TrustedByteArrayOutputStream;

/**
*
Expand Down Expand Up @@ -2218,6 +2157,10 @@ public static PTable createFromProto(PTableProtos.PTable table) {
}

public static PTableProtos.PTable toProto(PTable table) {
return toProto(table, MetaDataProtocol.MIN_VERSION_ALLOW_VBE_COLUMNS);
}

public static PTableProtos.PTable toProto(PTable table, long clientVersion) {
PTableProtos.PTable.Builder builder = PTableProtos.PTable.newBuilder();
if (table.getTenantId() != null) {
builder.setTenantId(ByteStringer.wrap(table.getTenantId().getBytes()));
Expand Down Expand Up @@ -2259,17 +2202,32 @@ public static PTableProtos.PTable toProto(PTable table) {
}
List<PColumn> columns = table.getColumns();
int columnSize = columns.size();
for (int i = offset; i < columnSize; i++) {
PColumn column = columns.get(i);
builder.addColumns(PColumnImpl.toProto(column));
// check whether we need the backward compatibility check.
boolean
checkBackwardCompatibility =
(clientVersion < MetaDataProtocol.MIN_VERSION_ALLOW_VBE_COLUMNS)
&& (table.getSchemaName().getString()
.equalsIgnoreCase(PhoenixDatabaseMetaData.SYSTEM_SCHEMA_NAME));
if (checkBackwardCompatibility) {
for (int i = offset; i < columnSize; i++) {
PColumn column = columns.get(i);
builder.addColumns(PColumnImpl.toProto(getBackwardCompatibleColumn(
column,
table.getTableName().getString())));
}
} else {
for (int i = offset; i < columnSize; i++) {
PColumn column = columns.get(i);
builder.addColumns(PColumnImpl.toProto(column));
}
}
List<PTable> indexes = table.getIndexes();
for (PTable curIndex : indexes) {
builder.addIndexes(toProto(curIndex));
builder.addIndexes(toProto(curIndex, clientVersion));
}
PTable transformingNewTable = table.getTransformingNewTable();
if (transformingNewTable != null) {
builder.setTransformingNewTable(toProto(transformingNewTable));
builder.setTransformingNewTable(toProto(transformingNewTable, clientVersion));
}
builder.setIsImmutableRows(table.isImmutableRows());
// TODO remove this field in 5.0 release
Expand Down Expand Up @@ -2516,6 +2474,46 @@ public Map<PTableKey, Long> getAncestorLastDDLTimestampMap() {
return ancestorLastDDLTimestampMap;
}

// Helper method for creating backward compatible PColumn object for columns that introduced the
// VARBINARY_ENCODED field in release version 5.3.0.
// Without this adjustment the older clients would get the following exception -
// "Error: org.apache.phoenix.schema.IllegalDataException: java.sql.SQLException:
// ERROR 201 (22000): Illegal data. Unsupported sql type: VARBINARY_ENCODED"
// The following columns were introduced as part of the 5.3.0 release
// SYSTEM.CATALOG.ROW_KEY_MATCHER
// SYSTEM.CDC_STREAM.PARTITION_START_KEY
// SYSTEM.CDC_STREAM.PARTITION_END_KEY
private static PColumn getBackwardCompatibleColumn(PColumn column, String tableName) {

// SYSTEM.CATALOG.ROW_KEY_MATCHER
if ((tableName.equalsIgnoreCase(PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE)
&& column.getName().getString().equalsIgnoreCase(PhoenixDatabaseMetaData.ROW_KEY_MATCHER))
// SYSTEM.CDC_STREAM.PARTITION_START_KEY
|| (tableName.equalsIgnoreCase(PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_TABLE)
&& column.getName().getString().equalsIgnoreCase(PhoenixDatabaseMetaData.PARTITION_START_KEY))
// SYSTEM.CDC_STREAM.PARTITION_END_KEY
|| (tableName.equalsIgnoreCase(PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_TABLE)
&& column.getName().getString().equalsIgnoreCase(PhoenixDatabaseMetaData.PARTITION_END_KEY))) {
return new PColumnImpl(column.getName(),
column.getFamilyName(),
PVarbinary.INSTANCE,
column.getMaxLength(),
column.getScale(),
column.isNullable(),
column.getPosition(),
column.getSortOrder(),
column.getArraySize(),
column.getViewConstant(),
column.isViewReferenced(),
column.getExpressionStr(),
column.isRowTimestamp(),
column.isDynamic(),
column.getColumnQualifierBytes(),
column.getTimestamp());
}
return column;
}

private void buildIndexWhereExpression(PhoenixConnection connection) throws SQLException {
PhoenixPreparedStatement
pstmt =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ public void getTable(RpcController controller, GetTableRequest request,
// the PTable of views and indexes on views might get updated because a column is added to one of
// their parents (this won't change the timestamp)
if (table.getType() != PTableType.TABLE || table.getTimeStamp() != tableTimeStamp) {
builder.setTable(PTableImpl.toProto(table));
builder.setTable(PTableImpl.toProto(table, request.getClientVersion()));
}
done.run(builder.build());
} catch (Throwable t) {
Expand Down Expand Up @@ -973,11 +973,10 @@ private void addColumnToTable(List<Cell> results, PName colName, PName famName,
PDataType dataType =
PDataType.fromTypeId(PInteger.INSTANCE.getCodec().decodeInt(
dataTypeKv.getValueArray(), dataTypeKv.getValueOffset(), SortOrder.getDefault()));

if (maxLength == null && dataType == PBinary.INSTANCE) {
dataType = PVarbinary.INSTANCE; // For
dataType = PVarbinary.INSTANCE; // For Backward compatibility
}
// backward
// compatibility.
Cell sortOrderKv = colKeyValues[SORT_ORDER_INDEX];
SortOrder sortOrder =
sortOrderKv == null ? SortOrder.getDefault() : SortOrder.fromSystemValue(PInteger.INSTANCE
Expand Down Expand Up @@ -1666,7 +1665,7 @@ private PTable getTableFromCells(List<Cell> tableCellList, List<List<Cell>> allC
|| (oldTable != null &&
oldTable.getBucketNum() != null && oldTable.getBucketNum() > 0);
addColumnToTable(columnCellList, colName, famName, colKeyValues, columns,
isSalted, baseColumnCount, isRegularView, columnTimestamp);
isSalted, baseColumnCount, isRegularView, columnTimestamp);
}
}
if (tableType == INDEX && ! isThisAViewIndex) {
Expand Down Expand Up @@ -2395,14 +2394,14 @@ public void createTable(RpcController controller, CreateTableRequest request,
if (!isTableDeleted(table)) {
builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_ALREADY_EXISTS);
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
builder.setTable(PTableImpl.toProto(table));
builder.setTable(PTableImpl.toProto(table, clientVersion));
done.run(builder.build());
return;
}
} else {
builder.setReturnCode(MetaDataProtos.MutationCode.NEWER_TABLE_FOUND);
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
builder.setTable(PTableImpl.toProto(table));
builder.setTable(PTableImpl.toProto(table, clientVersion));
done.run(builder.build());
return;
}
Expand Down Expand Up @@ -2823,7 +2822,7 @@ public void createTable(RpcController controller, CreateTableRequest request,
PTable newTable = buildTable(tableKey, cacheKey, region,
clientTimeStamp, clientVersion);
if (newTable != null) {
builder.setTable(PTableImpl.toProto(newTable));
builder.setTable(PTableImpl.toProto(newTable, clientVersion));
}

done.run(builder.build());
Expand Down Expand Up @@ -4594,7 +4593,7 @@ public void updateIndexState(RpcController controller, UpdateIndexStateRequest r
builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_ALREADY_EXISTS);
builder.setMutationTime(currentTime);
if (returnTable != null) {
builder.setTable(PTableImpl.toProto(returnTable));
builder.setTable(PTableImpl.toProto(returnTable, request.getClientVersion()));
}
done.run(builder.build());
return;
Expand Down
Loading