Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-24522 Fix races on Raft snapshot creation and adding a table to a zone. #5242

Merged
merged 5 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ public PartitionSnapshotStorageFactory(
* Adds a given table partition storage to the snapshot storage, managed by this factory.
*/
public void addMvPartition(int tableId, PartitionMvStorageAccess partition) {
// FIXME: there are possible races with table creation, see https://issues.apache.org/jira/browse/IGNITE-24522
PartitionMvStorageAccess prev = partitionsByTableId.put(tableId, partition);

assert prev == null : "Partition storage for table ID " + tableId + " already exists.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing;

import java.util.Collection;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import java.util.Iterator;
import java.util.List;
import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
import org.apache.ignite.internal.storage.RowId;
import org.jetbrains.annotations.Nullable;
Expand All @@ -29,22 +31,29 @@
class MvPartitionDeliveryState {
private final Iterator<PartitionMvStorageAccess> partitionStoragesIterator;

/**
* Current row ID within the current partition storage. Can be {@code null} only if the snapshot has delivered all possible data.
*/
/** Current row ID within the current partition storage. */
@Nullable
private RowId currentRowId;

/**
* Current partition storage. Can be {@code null} only if the snapshot has delivered all possible data.
*/
/** Current partition storage. */
@Nullable
private PartitionMvStorageAccess currentPartitionStorage;

MvPartitionDeliveryState(Collection<PartitionMvStorageAccess> partitionStorages) {
private final IntSet tableIds;

private boolean isStarted = false;

/**
* Creates a new state. The state is initially positioned before the first row of the first storage.
*
* @param partitionStorages Partition storages to iterate over. They <b>must</b> be sorted by table ID in ascending order.
*/
MvPartitionDeliveryState(List<PartitionMvStorageAccess> partitionStorages) {
this.partitionStoragesIterator = partitionStorages.iterator();

advance();
tableIds = new IntOpenHashSet(partitionStorages.size());

partitionStorages.forEach(storage -> tableIds.add(storage.tableId()));
}

RowId currentRowId() {
Expand All @@ -63,14 +72,30 @@ int currentTableId() {
return currentPartitionStorage().tableId();
}

/**
* Returns {@code true} if the given table ID is in the range of tables that this state iterates over.
*/
boolean isGoingToBeDelivered(int tableId) {
return tableIds.contains(tableId);
}

/**
* Returns {@code true} if all data for the snapshot has been retrieved.
*/
boolean isExhausted() {
return currentPartitionStorage == null;
return currentPartitionStorage == null && !partitionStoragesIterator.hasNext();
}

/**
* Returns {@code true} if the {@link #advance()} method has been called at least once.
*/
boolean hasIterationStarted() {
return isStarted;
}

void advance() {
isStarted = true;

while (true) {
if (currentPartitionStorage == null) {
if (!partitionStoragesIterator.hasNext()) {
Expand Down
Loading