Skip to content

Commit e583bf5

Browse files
author
michael stack
committed
Remove the ref count protections
1 parent a095510 commit e583bf5

File tree

4 files changed

+41
-84
lines changed

4 files changed

+41
-84
lines changed

fdbrpc/FlowTransport.actor.cpp

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1851,24 +1851,8 @@ void FlowTransport::removePeerReference(const Endpoint& endpoint, bool isStream)
18511851
return;
18521852
Reference<Peer> peer = self->getPeer(endpoint.getPrimaryAddress());
18531853
if (peer) {
1854-
// Only decrement if the peer has active references (peerReferences > 0)
1855-
// This prevents race conditions and over-decrementing
1856-
if (peer->peerReferences > 0) {
1857-
peer->peerReferences--;
1858-
} else if (peer->peerReferences == 0) {
1859-
// Peer already has no references - this can happen in race conditions
1860-
// Don't decrement further as it would make the count invalid
1861-
TraceEvent(SevInfo, "PeerReferenceAlreadyZero")
1862-
.detail("Address", endpoint.getPrimaryAddress())
1863-
.detail("Token", endpoint.token);
1864-
} else if (peer->peerReferences == -1) {
1865-
// Peer was created but never had addPeerReference called
1866-
// This is expected in some race conditions, so don't decrement further
1867-
TraceEvent(SevInfo, "PeerReferenceNotInitialized")
1868-
.detail("Address", endpoint.getPrimaryAddress())
1869-
.detail("Token", endpoint.token);
1870-
} else {
1871-
// This should not happen with the fixes above, but keep the error for debugging
1854+
peer->peerReferences--;
1855+
if (peer->peerReferences < -1) {
18721856
TraceEvent(SevError, "InvalidPeerReferences")
18731857
.detail("References", peer->peerReferences)
18741858
.detail("Address", endpoint.getPrimaryAddress())

fdbrpc/include/fdbrpc/fdbrpc.h

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,21 +39,18 @@ class FlowReceiver : public NetworkMessageReceiver, public NonCopyable {
3939
Endpoint endpoint;
4040
bool m_isLocalEndpoint;
4141
bool m_stream;
42-
bool m_peerReferenceAdded; // Track if we've added a peer reference
43-
4442
protected:
45-
FlowReceiver() : m_isLocalEndpoint(false), m_stream(false), m_peerReferenceAdded(false) {}
43+
FlowReceiver() : m_isLocalEndpoint(false), m_stream(false) {}
4644

4745
FlowReceiver(Endpoint const& remoteEndpoint, bool stream)
48-
: endpoint(remoteEndpoint), m_isLocalEndpoint(false), m_stream(stream), m_peerReferenceAdded(false) {
46+
: endpoint(remoteEndpoint), m_isLocalEndpoint(false), m_stream(stream) {
4947
FlowTransport::transport().addPeerReference(endpoint, m_stream);
50-
m_peerReferenceAdded = true;
5148
}
5249

5350
~FlowReceiver() {
5451
if (m_isLocalEndpoint) {
5552
FlowTransport::transport().removeEndpoint(endpoint, this);
56-
} else if (m_peerReferenceAdded) { // Only remove if a reference was added
53+
} else {
5754
FlowTransport::transport().removePeerReference(endpoint, m_stream);
5855
}
5956
}
@@ -65,11 +62,9 @@ class FlowReceiver : public NetworkMessageReceiver, public NonCopyable {
6562
void setRemoteEndpoint(Endpoint const& remoteEndpoint, bool stream) {
6663
ASSERT(!m_isLocalEndpoint);
6764
ASSERT(!endpoint.isValid());
68-
ASSERT(!m_peerReferenceAdded); // Should not already have a reference
6965
endpoint = remoteEndpoint;
7066
m_stream = stream;
7167
FlowTransport::transport().addPeerReference(endpoint, m_stream);
72-
m_peerReferenceAdded = true;
7368
}
7469

7570
// If already a remote endpoint, returns that. Otherwise makes this

fdbserver/MockS3Server.actor.cpp

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -997,15 +997,15 @@ ACTOR Future<Void> startMockS3Server(NetworkAddress listenAddress) {
997997
// Unit Tests for MockS3Server
998998
TEST_CASE("/fdbserver/MockS3Server/parseS3Request/ValidBucketParameter") {
999999
MockS3ServerImpl server;
1000-
std::string resource = "/test?bucket=mybucket&region=us-east-1";
1000+
std::string resource = "/test?bucket=testbucket&region=us-east-1";
10011001
std::string bucket, object;
10021002
std::map<std::string, std::string> queryParams;
10031003

10041004
server.parseS3Request(resource, bucket, object, queryParams);
10051005

1006-
ASSERT(bucket == "mybucket");
1006+
ASSERT(bucket == "testbucket");
10071007
ASSERT(object == "");
1008-
ASSERT(queryParams["bucket"] == "mybucket");
1008+
ASSERT(queryParams["bucket"] == "testbucket");
10091009
ASSERT(queryParams["region"] == "us-east-1");
10101010

10111011
return Void();
@@ -1093,15 +1093,15 @@ TEST_CASE("/fdbserver/MockS3Server/parseS3Request/URLEncodedParameters") {
10931093

10941094
TEST_CASE("/fdbserver/MockS3Server/parseS3Request/EmptyPath") {
10951095
MockS3ServerImpl server;
1096-
std::string resource = "/?bucket=mybucket&region=us-east-1";
1096+
std::string resource = "/?bucket=testbucket&region=us-east-1";
10971097
std::string bucket, object;
10981098
std::map<std::string, std::string> queryParams;
10991099

11001100
server.parseS3Request(resource, bucket, object, queryParams);
11011101

1102-
ASSERT(bucket == "mybucket");
1102+
ASSERT(bucket == "testbucket");
11031103
ASSERT(object == "");
1104-
ASSERT(queryParams["bucket"] == "mybucket");
1104+
ASSERT(queryParams["bucket"] == "testbucket");
11051105
ASSERT(queryParams["region"] == "us-east-1");
11061106

11071107
return Void();
@@ -1125,14 +1125,14 @@ TEST_CASE("/fdbserver/MockS3Server/parseS3Request/OnlyBucketInPath") {
11251125

11261126
TEST_CASE("/fdbserver/MockS3Server/parseS3Request/MultipleParameters") {
11271127
MockS3ServerImpl server;
1128-
std::string resource = "/test?bucket=mybucket&region=us-east-1&version=1&encoding=utf8";
1128+
std::string resource = "/test?bucket=testbucket&region=us-east-1&version=1&encoding=utf8";
11291129
std::string bucket, object;
11301130
std::map<std::string, std::string> queryParams;
11311131

11321132
server.parseS3Request(resource, bucket, object, queryParams);
11331133

1134-
ASSERT(bucket == "mybucket");
1135-
ASSERT(queryParams["bucket"] == "mybucket");
1134+
ASSERT(bucket == "testbucket");
1135+
ASSERT(queryParams["bucket"] == "testbucket");
11361136
ASSERT(queryParams["region"] == "us-east-1");
11371137
ASSERT(queryParams["version"] == "1");
11381138
ASSERT(queryParams["encoding"] == "utf8");
@@ -1143,14 +1143,14 @@ TEST_CASE("/fdbserver/MockS3Server/parseS3Request/MultipleParameters") {
11431143

11441144
TEST_CASE("/fdbserver/MockS3Server/parseS3Request/ParametersWithoutValues") {
11451145
MockS3ServerImpl server;
1146-
std::string resource = "/test?bucket=mybucket&flag&region=us-east-1";
1146+
std::string resource = "/test?bucket=testbucket&flag&region=us-east-1";
11471147
std::string bucket, object;
11481148
std::map<std::string, std::string> queryParams;
11491149

11501150
server.parseS3Request(resource, bucket, object, queryParams);
11511151

1152-
ASSERT(bucket == "mybucket");
1153-
ASSERT(queryParams["bucket"] == "mybucket");
1152+
ASSERT(bucket == "testbucket");
1153+
ASSERT(queryParams["bucket"] == "testbucket");
11541154
ASSERT(queryParams["flag"] == ""); // Parameter without value should be empty string
11551155
ASSERT(queryParams["region"] == "us-east-1");
11561156

fdbserver/workloads/BackupCorrectness.actor.cpp

Lines changed: 24 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
9494
shareLogRange = getOption(options, "shareLogRange"_sr, false);
9595
defaultBackup = getOption(options, "defaultBackup"_sr, false);
9696
backupURL = getOption(options, "backupURL"_sr, "file://simfdb/backups/"_sr).toString();
97-
skipDirtyRestore = getOption(options, "skipDirtyRestore"_sr, true);
97+
skipDirtyRestore = getOption(options, "skipDirtyRestore"_sr, false);
9898
initSnapshotInterval = getOption(options, "initSnapshotInterval"_sr, 0);
9999
snapshotInterval = getOption(options, "snapshotInterval"_sr, 30);
100100

@@ -610,6 +610,8 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
610610
}
611611

612612
try {
613+
state Future<Void> startRestore = delay(self->restoreAfter);
614+
613615
// backup
614616
wait(delay(self->backupAfter));
615617

@@ -653,11 +655,8 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
653655
UidAndAbortedFlagT uidFlag = wait(keyBackedTag.getOrThrow(cx.getReference()));
654656
state UID logUid = uidFlag.first;
655657
state Key destUidValue = wait(BackupConfig(logUid).destUidValue().getD(cx.getReference()));
656-
// Ensure backup reached a restorable/completed state and fetch its container
657-
state Reference<IBackupContainer> lastBackupContainer;
658-
state UID lastBackupUID;
659-
state EBackupState waitState = wait(backupAgent.waitBackup(
660-
cx, self->backupTag.toString(), StopWhenDone::True, &lastBackupContainer, &lastBackupUID));
658+
state Reference<IBackupContainer> lastBackupContainer =
659+
wait(BackupConfig(logUid).backupContainer().getD(cx.getReference()));
661660

662661
// Occasionally start yet another backup that might still be running when we restore
663662
if (!self->locked && BUGGIFY) {
@@ -681,11 +680,8 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
681680
}
682681
}
683682

684-
// Wait for backup to complete, then add a small delay before restore
685-
TraceEvent("BARW_WaitingForBackupBeforeRestore", randomID).detail("BackupTag", printable(self->backupTag));
686-
687-
// Add a small delay after backup completion to ensure all metadata is written
688-
wait(delay(5.0));
683+
CODE_PROBE(!startRestore.isReady(), "Restore starts at specified time");
684+
wait(startRestore);
689685

690686
if (lastBackupContainer && self->performRestore) {
691687
if (!self->skipDirtyRestore && deterministicRandom()->random01() < 0.5) {
@@ -706,36 +702,25 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
706702
.detail("BackupTag", printable(self->backupTag));
707703
TraceEvent("BARW_RestoreBegin", randomID).detail("BackupTag", printable(self->backupTag));
708704

709-
state Reference<IBackupContainer> restoreContainer =
710-
IBackupContainer::openContainer(lastBackupContainer->getURL(),
711-
lastBackupContainer->getProxy(),
712-
lastBackupContainer->getEncryptionKeyFileName());
713-
state BackupDescription restoreDesc;
714-
state BackupDescription _initialRestoreDesc = wait(restoreContainer->describeBackup());
715-
restoreDesc = _initialRestoreDesc;
716-
// Wait until backup becomes restorable to avoid restore_invalid_version
717-
state int attempts = 0;
718-
loop {
719-
if (restoreDesc.maxRestorableVersion.present() || attempts >= 10000)
720-
break;
721-
wait(delay(1.0));
722-
state BackupDescription _loopRestoreDesc = wait(restoreContainer->describeBackup());
723-
restoreDesc = _loopRestoreDesc;
724-
attempts++;
725-
}
726-
if (!restoreDesc.maxRestorableVersion.present()) {
727-
TraceEvent("BARW_SkipRestoreNotRestorable").detail("BackupTag", printable(self->backupTag));
728-
return Void();
729-
}
730-
731-
// Double-check to prevent race condition
732-
if (!restoreDesc.maxRestorableVersion.present()) {
733-
TraceEvent("BARW_SkipRestoreNotRestorableRace").detail("BackupTag", printable(self->backupTag));
734-
return Void();
705+
auto container = IBackupContainer::openContainer(lastBackupContainer->getURL(),
706+
lastBackupContainer->getProxy(),
707+
lastBackupContainer->getEncryptionKeyFileName());
708+
BackupDescription desc = wait(container->describeBackup());
709+
710+
state Version targetVersion = -1;
711+
if (desc.maxRestorableVersion.present()) {
712+
if (deterministicRandom()->random01() < 0.1) {
713+
targetVersion = desc.minRestorableVersion.get();
714+
} else if (deterministicRandom()->random01() < 0.1) {
715+
targetVersion = desc.maxRestorableVersion.get();
716+
} else if (deterministicRandom()->random01() < 0.5) {
717+
targetVersion = (desc.minRestorableVersion.get() != desc.maxRestorableVersion.get())
718+
? deterministicRandom()->randomInt64(desc.minRestorableVersion.get(),
719+
desc.maxRestorableVersion.get())
720+
: desc.maxRestorableVersion.get();
721+
}
735722
}
736723

737-
state Version targetVersion = restoreDesc.maxRestorableVersion.get();
738-
739724
TraceEvent("BARW_RestoreDebug").detail("TargetVersion", targetVersion);
740725

741726
state std::vector<Future<Version>> restores;
@@ -895,13 +880,6 @@ struct BackupAndRestoreCorrectnessWorkload : TestWorkload {
895880
for (auto& restore : restores) {
896881
ASSERT(!restore.isError());
897882
}
898-
899-
// Re-describe the same container instance to print final description
900-
{
901-
state BackupDescription finalDesc = wait(restoreContainer->describeBackup());
902-
wait(finalDesc.resolveVersionTimes(cx));
903-
printf("BackupDescription:\n%s\n", finalDesc.toString().c_str());
904-
}
905883
}
906884

907885
if (extraBackup.isValid()) {

0 commit comments

Comments
 (0)