Skip to content

Commit

Permalink
IGNITE-24571 Add methods for saving configuration and leases in the T…
Browse files Browse the repository at this point in the history
…X storage
  • Loading branch information
sashapolo committed Feb 20, 2025
1 parent a26350d commit a40eba7
Show file tree
Hide file tree
Showing 13 changed files with 494 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
import org.apache.ignite.internal.storage.lease.LeaseInfo;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;

/**
* Small abstractions for TX storages that includes only methods, mandatory for the snapshot storage.
Expand Down Expand Up @@ -50,6 +53,14 @@ public interface PartitionTxStateAccess {
/** Returns the last applied term of this storage. */
long lastAppliedTerm();

/**
* Returns committed RAFT group configuration corresponding to the write command with the highest applied index, {@code null} if it was
* never saved.
*/
@Nullable RaftGroupConfiguration committedGroupConfiguration();

@Nullable LeaseInfo leaseInfo();

/**
* Prepares the TX storage for rebalance with the same guarantees and requirements as {@link PartitionMvStorageAccess#startRebalance}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,20 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
import org.apache.ignite.internal.raft.RaftGroupConfigurationConverter;
import org.apache.ignite.internal.storage.lease.LeaseInfo;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;

/** Adapter from {@link TxStatePartitionStorage} to {@link PartitionTxStateAccess}. */
public class PartitionTxStateAccessImpl implements PartitionTxStateAccess {
private final TxStatePartitionStorage storage;

private final RaftGroupConfigurationConverter raftGroupConfigurationConverter = new RaftGroupConfigurationConverter();

public PartitionTxStateAccessImpl(TxStatePartitionStorage storage) {
this.storage = storage;
}
Expand All @@ -52,6 +58,16 @@ public long lastAppliedTerm() {
return storage.lastAppliedTerm();
}

@Override
public @Nullable RaftGroupConfiguration committedGroupConfiguration() {
return raftGroupConfigurationConverter.fromBytes(storage.committedGroupConfiguration());
}

@Override
public @Nullable LeaseInfo leaseInfo() {
return storage.leaseInfo();
}

@Override
public CompletableFuture<Void> startRebalance() {
return storage.startRebalance();
Expand All @@ -64,6 +80,6 @@ public CompletableFuture<Void> abortRebalance() {

@Override
public CompletableFuture<Void> finishRebalance(RaftSnapshotPartitionMeta partitionMeta) {
return storage.finishRebalance(partitionMeta.lastAppliedIndex(), partitionMeta.lastAppliedTerm());
return storage.finishRebalance(partitionMeta.toMvPartitionMeta(partitionMeta.raftGroupConfig()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@
import java.util.UUID;
import org.apache.ignite.internal.partition.replicator.network.raft.PartitionSnapshotMeta;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
import org.apache.ignite.internal.raft.RaftGroupConfigurationSerializer;
import org.apache.ignite.internal.storage.engine.MvPartitionMeta;
import org.apache.ignite.internal.storage.engine.PrimitivePartitionMeta;
import org.apache.ignite.internal.versioned.VersionedSerialization;
import org.jetbrains.annotations.Nullable;

/**
* Partition metadata for {@link PartitionMvStorageAccess}.
*/
public class RaftSnapshotPartitionMeta extends PrimitivePartitionMeta {
private final RaftGroupConfiguration raftGroupConfig;
private final byte[] raftGroupConfig;

/** Constructs an {@link RaftSnapshotPartitionMeta} from a {@link PartitionSnapshotMeta} . */
public static RaftSnapshotPartitionMeta fromSnapshotMeta(PartitionSnapshotMeta meta, RaftGroupConfiguration raftGroupConfig) {
Expand All @@ -53,11 +55,11 @@ private RaftSnapshotPartitionMeta(
) {
super(lastAppliedIndex, lastAppliedTerm, leaseStartTime, primaryReplicaNodeId, primaryReplicaNodeName);

this.raftGroupConfig = raftGroupConfig;
this.raftGroupConfig = VersionedSerialization.toBytes(raftGroupConfig, RaftGroupConfigurationSerializer.INSTANCE);
}

/** Returns replication group config. */
public RaftGroupConfiguration raftGroupConfig() {
public byte[] raftGroupConfig() {
return raftGroupConfig;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,7 @@ public CompletableFuture<Void> abortRebalance() {

@Override
public CompletableFuture<Void> finishRebalance(RaftSnapshotPartitionMeta partitionMeta) {
byte[] configBytes = raftGroupConfigurationConverter.toBytes(partitionMeta.raftGroupConfig());

return mvTableStorage.finishRebalancePartition(partitionId(), partitionMeta.toMvPartitionMeta(configBytes))
return mvTableStorage.finishRebalancePartition(partitionId(), partitionMeta.toMvPartitionMeta(partitionMeta.raftGroupConfig()))
.thenAccept(unused -> mvGc.addStorage(tablePartitionId(), gcUpdateHandler));
}

Expand Down
1 change: 1 addition & 0 deletions modules/transactions/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ dependencies {
annotationProcessor libs.auto.service

api project(':ignite-configuration-system')
api project(':ignite-storage-api')

implementation project(':ignite-api')
implementation project(':ignite-core')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.storage.engine.MvPartitionMeta;
import org.apache.ignite.internal.storage.lease.LeaseInfo;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.util.Cursor;
Expand Down Expand Up @@ -139,10 +141,10 @@ public CompletableFuture<Void> abortRebalance() {
}

@Override
public CompletableFuture<Void> finishRebalance(long lastAppliedIndex, long lastAppliedTerm) {
public CompletableFuture<Void> finishRebalance(MvPartitionMeta partitionMeta) {
assertThreadAllowsToWrite();

return storage.finishRebalance(lastAppliedIndex, lastAppliedTerm);
return storage.finishRebalance(partitionMeta);
}

@Override
Expand All @@ -151,4 +153,28 @@ public CompletableFuture<Void> clear() {

return storage.clear();
}

@Override
public void committedGroupConfiguration(byte[] config, long index, long term) {
assertThreadAllowsToWrite();

storage.committedGroupConfiguration(config, index, term);
}

@Override
public byte @Nullable [] committedGroupConfiguration() {
return storage.committedGroupConfiguration();
}

@Override
public @Nullable LeaseInfo leaseInfo() {
return storage.leaseInfo();
}

@Override
public void leaseInfo(LeaseInfo leaseInfo, long index, long term) {
assertThreadAllowsToWrite();

storage.leaseInfo(leaseInfo, index, term);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.storage.engine.MvPartitionMeta;
import org.apache.ignite.internal.storage.lease.LeaseInfo;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.util.Cursor;
Expand Down Expand Up @@ -51,7 +53,7 @@ public interface TxStatePartitionStorage extends ManuallyCloseable {
/**
* Puts the tx meta into the storage. WARNING: this method should be used only within the rebalance, because it doesn't update
* the index and the term in the storage. Index and term are updated after the rebalance is finished,
* see {@link #finishRebalance(long, long)}.
* see {@link #finishRebalance}.
*
* @param txId Tx id.
* @param txMeta Tx meta.
Expand Down Expand Up @@ -160,8 +162,8 @@ public interface TxStatePartitionStorage extends ManuallyCloseable {
*
* <p>This method must be called before every rebalance of transaction state storage and ends with a call to one of the methods:
* <ul>
* <li>{@link #abortRebalance()} - in case of errors or cancellation of rebalance;</li>
* <li>{@link #finishRebalance(long, long)} - in case of successful completion of rebalance.</li>
* <li>{@link #abortRebalance} - in case of errors or cancellation of rebalance;</li>
* <li>{@link #finishRebalance} - in case of successful completion of rebalance.</li>
* </ul>
*
* <p>If the {@link #lastAppliedIndex()} is {@link #REBALANCE_IN_PROGRESS} after a node restart, then the storage needs to be
Expand Down Expand Up @@ -189,20 +191,20 @@ public interface TxStatePartitionStorage extends ManuallyCloseable {
CompletableFuture<Void> abortRebalance();

/**
* Completes rebalance for transaction state storage: updates {@link #lastAppliedIndex()} and {@link #lastAppliedTerm()}.
* Completes rebalance for transaction state storage: updates {@link #lastAppliedIndex}, {@link #lastAppliedTerm},
* {@link #committedGroupConfiguration} and {@link #leaseInfo}.
*
* <p>After calling this method, methods for writing and reading will be available.
*
* <p>If rebalance has not started, then an IgniteInternalException with {@link Transactions#TX_STATE_STORAGE_REBALANCE_ERR}
* will be thrown
*
* @param lastAppliedIndex Last applied index.
* @param lastAppliedTerm Last applied term.
* @param partitionMeta Metadata of the partition.
* @return Future of the finish rebalance for transaction state storage.
* @throws IgniteInternalException with {@link Transactions#TX_STATE_STORAGE_REBALANCE_ERR} error code in case when the operation
* has failed.
*/
CompletableFuture<Void> finishRebalance(long lastAppliedIndex, long lastAppliedTerm);
CompletableFuture<Void> finishRebalance(MvPartitionMeta partitionMeta);

/**
* Clears transaction state storage. After the cleaning is completed, the storage will be fully available.
Expand All @@ -221,4 +223,25 @@ public interface TxStatePartitionStorage extends ManuallyCloseable {
* another reason.
*/
CompletableFuture<Void> clear();

/**
* Updates the replication protocol group configuration.
*/
void committedGroupConfiguration(byte[] config, long index, long term);

/**
* Byte representation of the committed replication protocol group configuration corresponding to the write command with the highest
* index applied to the storage or {@code null} if it was never saved.
*/
byte @Nullable [] committedGroupConfiguration();

/**
* Updates the current lease information.
*/
void leaseInfo(LeaseInfo leaseInfo, long index, long term);

/**
* Returns the current lease information of {@code null} if it was never saved.
*/
@Nullable LeaseInfo leaseInfo();
}
Loading

0 comments on commit a40eba7

Please sign in to comment.