Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-24384 Support write intent switches within colocation logic #5241

Merged
merged 23 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
d175c73
IGNITE-24384 Support write intent switches within colocation logic
rpuch Feb 17, 2025
ee798d8
IGNITE-24384 / update a TODO
rpuch Feb 19, 2025
dfd01d8
IGNITE-24384 / improve javadoc
rpuch Feb 19, 2025
b6629c9
IGNITE-24384 / disable per-table replicas startup and assignment even…
rpuch Feb 20, 2025
d62fce2
IGNITE-24384 / add a TODO
rpuch Feb 20, 2025
8bba7bd
IGNITE-24384 / add comment
rpuch Feb 20, 2025
711ea27
IGNITE-24384 / improve naming
rpuch Feb 20, 2025
5005ab5
IGNITE-24384 / improve javadoc
rpuch Feb 20, 2025
9b267c8
IGNITE-24384 / rename MutablePartitionEnlistment to OngoingTxPartitio…
rpuch Feb 20, 2025
a3a932c
IGNITE-24384 / move a class
rpuch Feb 20, 2025
2fa2f41
IGNITE-24384 / make scope under reliableCatalogVersion() as small as …
rpuch Feb 20, 2025
8cd050e
IGNITE-24384 / improve command timestamp calculation
rpuch Feb 20, 2025
d8a008c
IGNITE-24384 / improve formatting
rpuch Feb 20, 2025
05430e5
IGNITE-24384 / alternative approach for solving multiple reliableCata…
rpuch Feb 20, 2025
e0ff5c3
IGNITE-24384 / fix ItReplicaLifecycleTest.testCatalogCompaction()
rpuch Feb 20, 2025
117e0ce
IGNITE-24384 / remove "don't write committed configuration" hack
rpuch Feb 20, 2025
e7f16c5
IGNITE-24384 / add a TODO
rpuch Feb 21, 2025
bd61417
IGNITE-24384 / change structure of enlistments
rpuch Feb 21, 2025
425f2a3
IGNITE-24384 / only take timestamp once per batch
rpuch Feb 21, 2025
2710c83
IGNITE-24384 / add a comment
rpuch Feb 21, 2025
f0b024a
IGNITE-24384 / fix checkstyle violations
rpuch Feb 21, 2025
fa75ab7
Merge branch 'main' into ignite-24384
rpuch Feb 21, 2025
60b3c6b
IGNITE-24384 / fix ItInternalTableReadOnlyScanTest
rpuch Feb 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,24 @@

import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.hlc.HybridTimestampTracker;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.tx.FinishingPartitionEnlistment;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.InternalTxOptions;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.MutablePartitionEnlistment;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.impl.EnlistedPartitionGroup;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.tx.TransactionException;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -86,7 +87,7 @@ public UUID id() {
}

@Override
public IgniteBiTuple<ClusterNode, Long> enlistedNodeAndConsistencyToken(ReplicationGroupId replicationGroupId) {
public MutablePartitionEnlistment enlistedPartition(ReplicationGroupId replicationGroupId) {
return null;
}

Expand All @@ -106,11 +107,13 @@ public TablePartitionId commitPartition() {
}

@Override
public IgniteBiTuple<ClusterNode, Long> enlist(
public void enlist(
ReplicationGroupId replicationGroupId,
int tableId,
IgniteBiTuple<ClusterNode, Long> nodeAndConsistencyToken) {
return null;
ClusterNode primaryNode,
long consistencyToken
) {
// No-op.
}

@Override
Expand Down Expand Up @@ -205,8 +208,7 @@ public CompletableFuture<Void> finish(
HybridTimestampTracker timestampTracker,
TablePartitionId commitPartition,
boolean commit,
Map<ReplicationGroupId, IgniteBiTuple<ClusterNode, Long>> enlistedGroups,
Set<Integer> enlistedTableIds,
Map<ReplicationGroupId, MutablePartitionEnlistment> enlistedGroups,
UUID txId
) {
return nullCompletedFuture();
Expand All @@ -215,7 +217,7 @@ public CompletableFuture<Void> finish(
@Override
public CompletableFuture<Void> cleanup(
ReplicationGroupId commitPartitionId,
Map<ReplicationGroupId, String> enlistedPartitions,
Map<ReplicationGroupId, FinishingPartitionEnlistment> enlistedPartitions,
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
UUID txId
Expand All @@ -226,7 +228,7 @@ public CompletableFuture<Void> cleanup(
@Override
public CompletableFuture<Void> cleanup(
TablePartitionId commitPartitionId,
Collection<ReplicationGroupId> enlistedPartitions,
Collection<EnlistedPartitionGroup> enlistedPartitions,
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
UUID txId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,23 @@ protected static Set<String> readStringSet(IgniteDataInput in) throws IOExceptio

return result;
}

protected static void writeVarIntSet(Set<Integer> partitionIds, IgniteDataOutput out) throws IOException {
out.writeVarInt(partitionIds.size());

for (int partitionId : partitionIds) {
out.writeVarInt(partitionId);
}
}

protected static Set<Integer> readVarIntSet(IgniteDataInput in) throws IOException {
int length = in.readVarIntAsInt();

Set<Integer> set = new HashSet<>(IgniteUtils.capacity(length));
for (int i = 0; i < length; i++) {
set.add(in.readVarIntAsInt());
}

return set;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.partition.replicator;

import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.createZoneWithStorageProfile;
import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.getZoneId;
import static org.apache.ignite.internal.lang.IgniteSystemProperties.COLOCATION_FEATURE_FLAG;
import static org.apache.ignite.internal.sql.SqlCommon.DEFAULT_SCHEMA_NAME;
import static org.apache.ignite.internal.table.TableTestUtils.getTableId;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import static org.apache.ignite.sql.ColumnType.INT32;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import java.util.stream.IntStream;
import org.apache.ignite.internal.catalog.commands.ColumnParams;
import org.apache.ignite.internal.cluster.management.configuration.NodeAttributesConfiguration;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.apache.ignite.internal.configuration.SystemLocalConfiguration;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
import org.apache.ignite.internal.network.NodeFinder;
import org.apache.ignite.internal.network.StaticNodeFinder;
import org.apache.ignite.internal.partition.replicator.fixtures.Node;
import org.apache.ignite.internal.partition.replicator.fixtures.TestPlacementDriver;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
import org.apache.ignite.internal.replicator.message.PrimaryReplicaChangeCommand;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.schema.configuration.GcConfiguration;
import org.apache.ignite.internal.storage.configurations.StorageConfiguration;
import org.apache.ignite.internal.table.TableTestUtils;
import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.InjectExecutorService;
import org.apache.ignite.internal.testframework.SystemPropertiesExtension;
import org.apache.ignite.internal.testframework.WithSystemProperty;
import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(ConfigurationExtension.class)
@ExtendWith(ExecutorServiceExtension.class)
@ExtendWith(SystemPropertiesExtension.class)
@WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "true")
abstract class AbstractZoneReplicationTest extends IgniteAbstractTest {
private static final int BASE_PORT = 20_000;

protected static final String TEST_ZONE_NAME = "TEST_ZONE";

protected static final String TEST_TABLE_NAME1 = "TEST_TABLE_1";

protected static final String TEST_TABLE_NAME2 = "TEST_TABLE_2";

private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory();

@InjectConfiguration
private static TransactionConfiguration txConfiguration;

@InjectConfiguration
private static RaftConfiguration raftConfiguration;

@InjectConfiguration
private static SystemLocalConfiguration systemConfiguration;

@InjectConfiguration
private static NodeAttributesConfiguration nodeAttributesConfiguration;

@InjectConfiguration
private static ReplicationConfiguration replicationConfiguration;

@InjectConfiguration
private static MetaStorageConfiguration metaStorageConfiguration;

@InjectConfiguration("mock.profiles = {" + DEFAULT_STORAGE_PROFILE + ".engine = aipersist, test.engine=test}")
private static StorageConfiguration storageConfiguration;

@InjectConfiguration
private static GcConfiguration gcConfiguration;

@InjectExecutorService
private static ScheduledExecutorService scheduledExecutorService;

protected final List<Node> cluster = new ArrayList<>();

private final TestPlacementDriver placementDriver = new TestPlacementDriver();

private NodeFinder nodeFinder;

private TestInfo testInfo;

@BeforeEach
void setUp(TestInfo testInfo) {
this.testInfo = testInfo;
}

@AfterEach
void tearDown() throws Exception {
closeAll(cluster.parallelStream().map(node -> node::stop));
}

protected void startCluster(int size) throws Exception {
List<NetworkAddress> addresses = IntStream.range(0, size)
.mapToObj(i -> new NetworkAddress("localhost", BASE_PORT + i))
.collect(toList());

nodeFinder = new StaticNodeFinder(addresses);

IntStream.range(0, size)
.mapToObj(i -> newNode(addresses.get(i), nodeFinder))
.forEach(cluster::add);

cluster.parallelStream().forEach(Node::start);

Node node0 = cluster.get(0);

node0.cmgManager.initCluster(List.of(node0.name), List.of(node0.name), "cluster");

setPrimaryReplica(node0, null);

cluster.forEach(Node::waitWatches);

assertThat(
allOf(cluster.stream().map(n -> n.cmgManager.onJoinReady()).toArray(CompletableFuture[]::new)),
willCompleteSuccessfully()
);

assertTrue(waitForCondition(
() -> {
CompletableFuture<LogicalTopologySnapshot> logicalTopologyFuture = node0.cmgManager.logicalTopology();

assertThat(logicalTopologyFuture, willCompleteSuccessfully());

return logicalTopologyFuture.join().nodes().size() == cluster.size();
},
30_000
));
}

protected Node addNodeToCluster(Function<ReplicaRequest, ReplicationGroupId> requestConverter) {
Node node = newNode(new NetworkAddress("localhost", BASE_PORT + cluster.size()), nodeFinder);

node.setRequestConverter(requestConverter);

cluster.add(node);

node.start();

node.waitWatches();

assertThat(node.cmgManager.onJoinReady(), willCompleteSuccessfully());

return node;
}

private Node newNode(NetworkAddress address, NodeFinder nodeFinder) {
return new Node(
testInfo,
address,
nodeFinder,
workDir,
placementDriver,
systemConfiguration,
raftConfiguration,
nodeAttributesConfiguration,
storageConfiguration,
metaStorageConfiguration,
replicationConfiguration,
txConfiguration,
scheduledExecutorService,
null,
gcConfiguration
);
}

protected int createZone(String zoneName, int partitions, int replicas) {
return createZoneWithProfile(zoneName, partitions, replicas, DEFAULT_STORAGE_PROFILE);
}

protected int createZoneWithProfile(String zoneName, int partitions, int replicas, String profile) {
Node node = cluster.get(0);

createZoneWithStorageProfile(
node.catalogManager,
zoneName,
partitions,
replicas,
profile
);

return getZoneId(node.catalogManager, zoneName, node.hybridClock.nowLong());
}

protected int createTable(String zoneName, String tableName) {
Node node = cluster.get(0);

TableTestUtils.createTable(
node.catalogManager,
DEFAULT_SCHEMA_NAME,
zoneName,
tableName,
List.of(
ColumnParams.builder().name("key").type(INT32).build(),
ColumnParams.builder().name("val").type(INT32).nullable(true).build()
),
List.of("key")
);

return getTableId(node.catalogManager, tableName, node.hybridClock.nowLong());
}

protected final void setupTableIdToZoneIdConverter(ZonePartitionId zonePartitionId, TablePartitionId... tablePartitionIds) {
Function<ReplicaRequest, ReplicationGroupId> requestConverter = requestConverter(zonePartitionId, tablePartitionIds);

cluster.forEach(node -> node.setRequestConverter(requestConverter));
}

protected static Function<ReplicaRequest, ReplicationGroupId> requestConverter(
ZonePartitionId zonePartitionId, TablePartitionId... tablePartitionIds
) {
Set<ReplicationGroupId> tablePartitionIdsSet = Set.of(tablePartitionIds);

return request -> {
ReplicationGroupId replicationGroupId = request.groupId().asReplicationGroupId();

if (tablePartitionIdsSet.contains(replicationGroupId) && !(request instanceof WriteIntentSwitchReplicaRequest)) {
return zonePartitionId;
} else {
return replicationGroupId;
}
};
}

protected void setPrimaryReplica(Node node, @Nullable ZonePartitionId zonePartitionId) {
ClusterNode newPrimaryReplicaNode = node.clusterService.topologyService().localMember();

HybridTimestamp leaseStartTime = node.hybridClock.now();

placementDriver.setPrimary(newPrimaryReplicaNode, leaseStartTime);

if (zonePartitionId != null) {
PrimaryReplicaChangeCommand cmd = REPLICA_MESSAGES_FACTORY.primaryReplicaChangeCommand()
.primaryReplicaNodeId(newPrimaryReplicaNode.id())
.primaryReplicaNodeName(newPrimaryReplicaNode.name())
.leaseStartTime(leaseStartTime.longValue())
.build();

CompletableFuture<Void> primaryReplicaChangeFuture = node.replicaManager
.replica(zonePartitionId)
.thenCompose(replica -> replica.raftClient().run(cmd));

assertThat(primaryReplicaChangeFuture, willCompleteSuccessfully());
}
}
}
Loading