Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PHOENIX-7321 : Rename PhoenixIndexBuilderHelper to AtomicUpsertHelper #1924

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.apache.phoenix.expression.LiteralExpression;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.PhoenixIndexBuilderHelper;
import org.apache.phoenix.index.AtomicUpsertHelper;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.jdbc.PhoenixConnection;
Expand Down Expand Up @@ -883,7 +883,7 @@ public MutationPlan compile(UpsertStatement upsert) throws SQLException {
.build().buildException();
}
if (onDupKeyPairs.isEmpty()) { // ON DUPLICATE KEY IGNORE
onDupKeyBytesToBe = PhoenixIndexBuilderHelper.serializeOnDupKeyIgnore();
onDupKeyBytesToBe = AtomicUpsertHelper.serializeOnDupKeyIgnore();
} else { // ON DUPLICATE KEY UPDATE;
onDupKeyBytesToBe = getOnDuplicateKeyBytes(table, context, onDupKeyPairs, resolver);
}
Expand Down Expand Up @@ -959,7 +959,7 @@ public int getPosition() {
}
PTable onDupKeyTable = PTableImpl.builderWithColumns(table, updateColumns).build();
onDupKeyBytesToBe =
PhoenixIndexBuilderHelper.serializeOnDupKeyUpdate(onDupKeyTable, updateExpressions);
AtomicUpsertHelper.serializeOnDupKeyUpdate(onDupKeyTable, updateExpressions);
return onDupKeyBytesToBe;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.IndexMetaDataCacheClient;
import org.apache.phoenix.index.PhoenixIndexBuilderHelper;
import org.apache.phoenix.index.AtomicUpsertHelper;
import org.apache.phoenix.index.PhoenixIndexFailurePolicyHelper;
import org.apache.phoenix.index.PhoenixIndexFailurePolicyHelper.MutateCommand;
import org.apache.phoenix.index.PhoenixIndexMetaData;
Expand Down Expand Up @@ -845,7 +845,7 @@ private void generateMutations(final TableRef tableRef, final long mutationTimes
// TODO: use our ServerCache
for (Mutation mutation : rowMutations) {
if (onDupKeyBytes != null) {
mutation.setAttribute(PhoenixIndexBuilderHelper.ATOMIC_OP_ATTRIB, onDupKeyBytes);
mutation.setAttribute(AtomicUpsertHelper.ATOMIC_OP_ATTRIB, onDupKeyBytes);
}
}
rowMutationsPertainingToIndex = rowMutations;
Expand Down Expand Up @@ -1103,7 +1103,7 @@ static MutationBytes calculateMutationSize(List<Mutation> mutations,
} else if (mutation instanceof Put) {
upsertsize += temp;
upsertCounter++;
if (mutation.getAttribute(PhoenixIndexBuilderHelper.ATOMIC_OP_ATTRIB) != null) {
if (mutation.getAttribute(AtomicUpsertHelper.ATOMIC_OP_ATTRIB) != null) {
atomicUpsertsize += temp;
}
allDeletesMutations = false;
Expand Down Expand Up @@ -2374,7 +2374,7 @@ boolean join(RowMutationState newRow) {
}
// Concatenate ON DUPLICATE KEY bytes to allow multiple
// increments of the same row in the same commit batch.
this.onDupKeyBytes = PhoenixIndexBuilderHelper.combineOnDupKey(this.onDupKeyBytes, newRow.onDupKeyBytes);
this.onDupKeyBytes = AtomicUpsertHelper.combineOnDupKey(this.onDupKeyBytes, newRow.onDupKeyBytes);
statementIndexes = joinSortedIntArrays(statementIndexes, newRow.getStatementIndexes());
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.util.TrustedByteArrayOutputStream;

public final class PhoenixIndexBuilderHelper {
public final class AtomicUpsertHelper {
private static final byte[] ON_DUP_KEY_IGNORE_BYTES = new byte[] {1}; // boolean true
private static final int ON_DUP_KEY_HEADER_BYTE_SIZE = Bytes.SIZEOF_SHORT + Bytes.SIZEOF_BOOLEAN;
public static final String ATOMIC_OP_ATTRIB = "_ATOMIC_OP_ATTRIB";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.expression.CaseExpression;
import org.apache.phoenix.index.PhoenixIndexBuilderHelper;
import org.apache.phoenix.index.AtomicUpsertHelper;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
import org.apache.phoenix.thirdparty.com.google.common.collect.ArrayListMultimap;
Expand All @@ -58,7 +58,6 @@
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
Expand Down Expand Up @@ -141,7 +140,7 @@

import static org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.applyNew;
import static org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.removeColumn;
import static org.apache.phoenix.index.PhoenixIndexBuilderHelper.ATOMIC_OP_ATTRIB;
import static org.apache.phoenix.index.AtomicUpsertHelper.ATOMIC_OP_ATTRIB;
import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;

/**
Expand Down Expand Up @@ -1564,7 +1563,7 @@ private List<Mutation> generateOnDupMutations(BatchMutateContext context, Put at
Pair<Put, Put> dataRowState = context.dataRowStates.get(rowKeyPtr);
Put currentDataRowState = dataRowState != null ? dataRowState.getFirst() : null;

if (PhoenixIndexBuilderHelper.isDupKeyIgnore(opBytes)) {
if (AtomicUpsertHelper.isDupKeyIgnore(opBytes)) {
if (currentDataRowState == null) {
// new row
mutations.add(atomicPut);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp, Ind

@Override
public boolean isAtomicOp(Mutation m) {
return m.getAttribute(PhoenixIndexBuilderHelper.ATOMIC_OP_ATTRIB) != null;
return m.getAttribute(AtomicUpsertHelper.ATOMIC_OP_ATTRIB) != null;
}

private static void transferCells(Mutation source, Mutation target) {
Expand All @@ -123,11 +123,11 @@ private static List<Mutation> convertIncrementToPutInSingletonList(Increment inc

@Override
public List<Mutation> executeAtomicOp(Increment inc) throws IOException {
byte[] opBytes = inc.getAttribute(PhoenixIndexBuilderHelper.ATOMIC_OP_ATTRIB);
byte[] opBytes = inc.getAttribute(AtomicUpsertHelper.ATOMIC_OP_ATTRIB);
if (opBytes == null) { // Unexpected
return null;
}
inc.setAttribute(PhoenixIndexBuilderHelper.ATOMIC_OP_ATTRIB, null);
inc.setAttribute(AtomicUpsertHelper.ATOMIC_OP_ATTRIB, null);
Put put = null;
Delete delete = null;
// We cannot neither use the time stamp in the Increment to set the Get time range
Expand All @@ -140,7 +140,7 @@ public List<Mutation> executeAtomicOp(Increment inc) throws IOException {
long ts = HConstants.LATEST_TIMESTAMP;
byte[] rowKey = inc.getRow();
final Get get = new Get(rowKey);
if (PhoenixIndexBuilderHelper.isDupKeyIgnore(opBytes)) {
if (AtomicUpsertHelper.isDupKeyIgnore(opBytes)) {
get.setFilter(new FirstKeyOnlyFilter());
try (RegionScanner scanner = this.env.getRegion().getScanner(new Scan(get))) {
List<Cell> cells = new ArrayList<>();
Expand Down