Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-22620 Implement ReplicaSafeTimeSyncRequest processing for zone replica #5258

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -306,4 +306,8 @@ void setPrimaryReplica(Node node, @Nullable ZonePartitionId zonePartitionId) {
assertThat(primaryReplicaChangeFuture, willCompleteSuccessfully());
}
}

long idleSafeTimePropagationDuration() {
return replicationConfiguration.idleSafeTimePropagationDuration().value();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,15 @@
import org.apache.ignite.internal.cluster.management.configuration.NodeAttributesConfiguration;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.partition.replicator.fixtures.Node;
import org.apache.ignite.internal.partitiondistribution.Assignment;
import org.apache.ignite.internal.partitiondistribution.Assignments;
import org.apache.ignite.internal.partitiondistribution.PartitionDistributionUtils;
import org.apache.ignite.internal.partitiondistribution.TokenizedAssignments;
import org.apache.ignite.internal.partitiondistribution.TokenizedAssignmentsImpl;
import org.apache.ignite.internal.replicator.Replica;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.schema.BinaryRow;
Expand Down Expand Up @@ -656,6 +660,41 @@ public void testScanCloseReplicaRequest() throws Exception {
assertDoesNotThrow(tx::commit);
}

@Test
public void testReplicaSafeTimeSyncRequest() throws Exception {
// Prepare a single node cluster.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a single node cluster))

startCluster(2);
Node node0 = getNode(0);
List<Set<Assignment>> assignments = PartitionDistributionUtils.calculateAssignments(
cluster.stream().map(n -> n.name).collect(toList()), 1, 1);

List<TokenizedAssignments> tokenizedAssignments = assignments.stream()
.map(a -> new TokenizedAssignmentsImpl(a, Integer.MIN_VALUE))
.collect(toList());

placementDriver.setPrimary(node0.clusterService.topologyService().localMember());
placementDriver.setAssignments(tokenizedAssignments);

// Prepare a zone.
String zoneName = "test_zone";
createZone(node0, zoneName, 1, 2);
int zoneId = DistributionZonesTestUtil.getZoneId(node0.catalogManager, zoneName, node0.hybridClock.nowLong());
int partId = 0;

// Create a table to work with.
createTable(node0, zoneName, "test_table");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please adjust the test in a following way.

  1. Create zone
  2. ensure that safeTime was successfully adjusted. // It should be possible to adjust timestamp even if there's no tables in zone.
  3. Create table
  4. await table creation and ensure that safeTime was successfully adjusted.


HybridTimestamp node0safeTimeBefore = node0.currentSafeTimeForZonePartition(zoneId, partId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currentSafeTimeForZonePartition may fail with Missing resources for zone if you will call

    ZonePartitionResources getZonePartitionResources(ZonePartitionId zonePartitionId) {
        ZoneResources zoneResources = resourcesByZoneId.get(zonePartitionId.zoneId());

fast enough.

Node node1 = getNode(1);
HybridTimestamp node1safeTimeBefore = node1.currentSafeTimeForZonePartition(zoneId, partId);

waitForCondition(
() -> node0safeTimeBefore.compareTo(node0.currentSafeTimeForZonePartition(zoneId, partId)) < 0
&& node1safeTimeBefore.compareTo(node1.currentSafeTimeForZonePartition(zoneId, partId)) < 0,
idleSafeTimePropagationDuration() * 2
);
}

private static RemotelyTriggeredResource getVersionedStorageCursor(Node node, FullyQualifiedResourceId cursorId) {
return node.resourcesRegistry.resources().get(cursorId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.apache.ignite.internal.hlc.ClockWaiter;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.hlc.HybridTimestampTracker;
import org.apache.ignite.internal.index.IndexManager;
import org.apache.ignite.internal.lang.IgniteInternalException;
Expand Down Expand Up @@ -169,6 +170,7 @@
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
import org.apache.ignite.internal.util.SafeTimeValuesTracker;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
Expand Down Expand Up @@ -854,6 +856,10 @@ public TxStatePartitionStorage txStatePartitionStorage(int zoneId, int partition
return partitionReplicaLifecycleManager.txStatePartitionStorage(zoneId, partitionId);
}

public HybridTimestamp currentSafeTimeForZonePartition(int zoneId, int partId) {
return partitionReplicaLifecycleManager.currentSafeTimeForZonePartition(zoneId, partId);
}

public DataStorageManager dataStorageManager() {
return dataStorageMgr;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1444,4 +1444,11 @@ private <T> CompletableFuture<T> executeUnderZoneWriteLock(int zoneId, Supplier<
public TxStatePartitionStorage txStatePartitionStorage(int zoneId, int partitionId) {
return requireNonNull(zoneResourcesManager.txStatePartitionStorage(zoneId, partitionId));
}

@TestOnly
public HybridTimestamp currentSafeTimeForZonePartition(int zoneId, int partId) {
return requireNonNull(zoneResourcesManager.getZonePartitionResources(new ZonePartitionId(zoneId, partId)))
.raftListener()
.currentSafeTime();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.partition.replicator.handlers.MinimumActiveTxTimeReplicaRequestHandler;
import org.apache.ignite.internal.partition.replicator.handlers.ReplicaSafeTimeSyncRequestHandler;
import org.apache.ignite.internal.partition.replicator.handlers.TxFinishReplicaRequestHandler;
import org.apache.ignite.internal.partition.replicator.handlers.VacuumTxStateReplicaRequestHandler;
import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyReplicaRequest;
Expand Down Expand Up @@ -71,6 +72,7 @@ public class ZonePartitionReplicaListener implements ReplicaListener {
private final TxFinishReplicaRequestHandler txFinishReplicaRequestHandler;
private final MinimumActiveTxTimeReplicaRequestHandler minimumActiveTxTimeReplicaRequestHandler;
private final VacuumTxStateReplicaRequestHandler vacuumTxStateReplicaRequestHandler;
private final ReplicaSafeTimeSyncRequestHandler replicaSafeTimeSyncRequestHandler;

/**
* The constructor.
Expand Down Expand Up @@ -109,6 +111,8 @@ public ZonePartitionReplicaListener(
raftCommandApplicator);

vacuumTxStateReplicaRequestHandler = new VacuumTxStateReplicaRequestHandler(raftCommandApplicator);

replicaSafeTimeSyncRequestHandler = new ReplicaSafeTimeSyncRequestHandler(clockService, raftCommandApplicator);
}

@Override
Expand Down Expand Up @@ -209,7 +213,7 @@ private CompletableFuture<?> processZoneReplicaRequest(
} else if (request instanceof UpdateMinimumActiveTxBeginTimeReplicaRequest) {
return minimumActiveTxTimeReplicaRequestHandler.handle((UpdateMinimumActiveTxBeginTimeReplicaRequest) request);
} else if (request instanceof ReplicaSafeTimeSyncRequest) {
LOG.debug("Non table request is not supported by the zone partition yet " + request);
// return replicaSafeTimeSyncRequestHandler.handle((ReplicaSafeTimeSyncRequest) request);
} else {
LOG.warn("Non table request is not supported by the zone partition yet " + request);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.partition.replicator.handlers;

import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.partition.replicator.ReplicationRaftCommandApplicator;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;

/**
* Handler for {@link ReplicaSafeTimeSyncRequest}.
*/
public class ReplicaSafeTimeSyncRequestHandler {
/** Factory for creating replica command messages. */
private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory();

/** Applicator that applies RAFT command that is created by this handler. */
private final ReplicationRaftCommandApplicator commandApplicator;

/** Clock service. */
private final ClockService clockService;

/**
* Creates a new instance of ReplicaSafeTimeSyncRequestHandler.
*
* @param clockService Clock service.
* @param commandApplicator Applicator that applies RAFT command that is created by this handler.
*/
public ReplicaSafeTimeSyncRequestHandler(
ClockService clockService,
ReplicationRaftCommandApplicator commandApplicator
) {
this.clockService = clockService;
this.commandApplicator = commandApplicator;
}

/**
* Handles {@link ReplicaSafeTimeSyncRequest}.
*
* @param request Request to handle.
* @return Future that will be completed when the request is handled.
*/
public CompletableFuture<?> handle(ReplicaSafeTimeSyncRequest request) {
return commandApplicator.applyCommandWithExceptionHandling(
REPLICA_MESSAGES_FACTORY.safeTimeSyncCommand().initiatorTime(clockService.now()).build()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.replicator.command.SafeTimePropagatingCommand;
import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
import org.apache.ignite.internal.replicator.message.PrimaryReplicaChangeCommand;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.message.VacuumTxStatesCommand;
Expand All @@ -53,6 +54,7 @@
import org.apache.ignite.internal.util.SafeTimeValuesTracker;
import org.apache.ignite.internal.util.TrackerClosedException;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

/**
* RAFT listener for the zone partition.
Expand Down Expand Up @@ -169,6 +171,8 @@ private void processWriteCommand(CommandClosure<WriteCommand> clo) {
// This is a hack for tests, this command is not issued in production because no zone-wide placement driver exists yet.
// FIXME: https://issues.apache.org/jira/browse/IGNITE-24374
result = processCrossTableProcessorsCommand(command, commandIndex, commandTerm, safeTimestamp);
} else if (command instanceof SafeTimeSyncCommand) {
result = processCrossTableProcessorsCommand(command, commandIndex, commandTerm, safeTimestamp);
} else if (command instanceof TableAwareCommand) {
TablePartitionId tablePartitionId = ((TableAwareCommand) command).tablePartitionId().asTablePartitionId();

Expand Down Expand Up @@ -327,4 +331,9 @@ private static class CommittedConfiguration {
this.lastAppliedTerm = lastAppliedTerm;
}
}

@TestOnly
public HybridTimestamp currentSafeTime() {
return safeTimeTracker.current();
}
}