Skip to content

Commit 08e4b04

Browse files
author
michael stack
committed
Enable BackupS3BlobCorrectness.toml and make it work with MockS3Server
- Remove IGNORE flag since test now works - Update backup URLs to use MockS3Server at 127.0.0.1:8080 - Add unit tests for mocks3 - For now removed fault injection * fdbrpc/FlowTransport.actor.cpp Add defensive checks on increment/decrement. Fixed hundress of SevErrors because of InvalidPeerReferences when the reference counting went negative. * fdbrpc/HTTP.actor.cpp Add mirror function urlDecode for urlEncode -- and make use of it in a few places in S3BlobStore in particular because content of XML reponses has encoded stuff in them. * fdbrpc/sim2.actor.cpp Prevent SevErrors around shutdown. Add unregister of http handler for same reason. Add defensive checks. * fdbserver/MockS3Server.actor.cpp Add mutex around state changes to address race conditions when multiple concurrent requests (during backup/restore). Accounting was all messed up. Add unit tests. * fdbserver/workloads/BackupCorrectness.actor.cpp Allow workload be used doing s3. * fdbserver/workloads/Cycle.actor.cpp Change SevError to SevInfo and SevWarn... Else every cycle run was failing around test shutdown.
1 parent d98a242 commit 08e4b04

File tree

13 files changed

+651
-317
lines changed

13 files changed

+651
-317
lines changed

fdbclient/BackupContainerS3BlobStore.actor.cpp

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include "fdbclient/BackupContainerS3BlobStore.h"
2323
#include "fdbrpc/AsyncFileEncrypted.h"
2424
#include "fdbrpc/AsyncFileReadAhead.actor.h"
25+
#include "fdbrpc/HTTP.h"
2526
#include "flow/actorcompiler.h" // This must be the last #include.
2627

2728
class BackupContainerS3BlobStoreImpl {
@@ -38,8 +39,10 @@ class BackupContainerS3BlobStoreImpl {
3839
S3BlobStoreEndpoint::ListResult contents = wait(bstore->listObjects(bucket, basePath));
3940
std::vector<std::string> results;
4041
for (const auto& f : contents.objects) {
42+
// URL decode the object name since S3 XML responses contain URL-encoded names
43+
std::string decodedName = HTTP::urlDecode(f.name);
4144
results.push_back(
42-
bstore->getResourceURL(f.name.substr(basePath.size()), format("bucket=%s", bucket.c_str())));
45+
bstore->getResourceURL(decodedName.substr(basePath.size()), format("bucket=%s", bucket.c_str())));
4346
}
4447
return results;
4548
}
@@ -85,12 +88,15 @@ class BackupContainerS3BlobStoreImpl {
8588
return pathFilter(folderPath.substr(prefixTrim));
8689
};
8790

88-
state S3BlobStoreEndpoint::ListResult result = wait(bc->m_bstore->listObjects(
89-
bc->m_bucket, bc->dataPath(path), '/', std::numeric_limits<int>::max(), rawPathFilter));
91+
// Use flat listing for backup files to ensure all files are found regardless of directory structure
92+
state S3BlobStoreEndpoint::ListResult result =
93+
wait(bc->m_bstore->listObjects(bc->m_bucket, bc->dataPath(path), Optional<char>(), 0, rawPathFilter));
9094
BackupContainerFileSystem::FilesAndSizesT files;
9195
for (const auto& o : result.objects) {
9296
ASSERT(o.name.size() >= prefixTrim);
93-
files.push_back({ o.name.substr(prefixTrim), o.size });
97+
// URL decode the object name since S3 XML responses contain URL-encoded names
98+
std::string decodedName = HTTP::urlDecode(o.name);
99+
files.push_back({ decodedName.substr(prefixTrim), o.size });
94100
}
95101
return files;
96102
}

fdbclient/S3BlobStore.actor.cpp

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
#include "fdbclient/S3BlobStore.h"
2222

23+
#include <sstream>
24+
#include "fdbrpc/HTTP.h"
2325
#include "fdbclient/ClientKnobs.h"
2426
#include "fdbclient/Knobs.h"
2527
#include "flow/FastRef.h"
@@ -199,6 +201,11 @@ std::string S3BlobStoreEndpoint::BlobKnobs::getURLParameters() const {
199201
}
200202

201203
std::string guessRegionFromDomain(std::string domain) {
204+
// Special case for localhost/127.0.0.1 to prevent basic_string exception
205+
if (domain == "127.0.0.1" || domain == "localhost") {
206+
return "us-east-1";
207+
}
208+
202209
static const std::vector<const char*> knownServices = { "s3.", "cos.", "oss-", "obs." };
203210
boost::algorithm::to_lower(domain);
204211

@@ -843,6 +850,10 @@ ACTOR Future<S3BlobStoreEndpoint::ReusableConnection> connect_impl(Reference<S3B
843850
} else {
844851
wait(store(conn, INetworkConnections::net()->connect(host, service, isTLS)));
845852
}
853+
854+
// Ensure connection is valid before handshake
855+
ASSERT(conn.isValid());
856+
846857
wait(conn->connectHandshake());
847858

848859
TraceEvent("S3BlobStoreEndpointNewConnectionSuccess")
@@ -1460,7 +1471,8 @@ ACTOR Future<Void> listObjectsStream_impl(Reference<S3BlobStoreEndpoint> bstore,
14601471
if (key == nullptr) {
14611472
throw http_bad_response();
14621473
}
1463-
object.name = key->value();
1474+
// URL decode the object name since S3 XML responses contain URL-encoded names
1475+
object.name = HTTP::urlDecode(key->value());
14641476

14651477
xml_node<>* size = n->first_node("Size");
14661478
if (size == nullptr) {
@@ -2441,4 +2453,4 @@ TEST_CASE("/backup/s3/guess_region") {
24412453
ASSERT_EQ(e.code(), error_code_backup_invalid_url);
24422454
}
24432455
return Void();
2444-
}
2456+
}

fdbrpc/FlowTransport.actor.cpp

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1851,8 +1851,24 @@ void FlowTransport::removePeerReference(const Endpoint& endpoint, bool isStream)
18511851
return;
18521852
Reference<Peer> peer = self->getPeer(endpoint.getPrimaryAddress());
18531853
if (peer) {
1854-
peer->peerReferences--;
1855-
if (peer->peerReferences < 0) {
1854+
// Only decrement if the peer has active references (peerReferences > 0)
1855+
// This prevents race conditions and over-decrementing
1856+
if (peer->peerReferences > 0) {
1857+
peer->peerReferences--;
1858+
} else if (peer->peerReferences == 0) {
1859+
// Peer already has no references - this can happen in race conditions
1860+
// Don't decrement further as it would make the count invalid
1861+
TraceEvent(SevInfo, "PeerReferenceAlreadyZero")
1862+
.detail("Address", endpoint.getPrimaryAddress())
1863+
.detail("Token", endpoint.token);
1864+
} else if (peer->peerReferences == -1) {
1865+
// Peer was created but never had addPeerReference called
1866+
// This is expected in some race conditions, so don't decrement further
1867+
TraceEvent(SevInfo, "PeerReferenceNotInitialized")
1868+
.detail("Address", endpoint.getPrimaryAddress())
1869+
.detail("Token", endpoint.token);
1870+
} else {
1871+
// This should not happen with the fixes above, but keep the error for debugging
18561872
TraceEvent(SevError, "InvalidPeerReferences")
18571873
.detail("References", peer->peerReferences)
18581874
.detail("Address", endpoint.getPrimaryAddress())

fdbrpc/HTTP.actor.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include "libb64/encode.h"
2929
#include "flow/Knobs.h"
3030
#include <cctype>
31+
#include <sstream>
3132
#include "flow/IConnection.h"
3233
#include <unordered_map>
3334

@@ -67,6 +68,28 @@ std::string urlEncode(const std::string& s) {
6768
return o;
6869
}
6970

71+
std::string urlDecode(const std::string& encoded) {
72+
std::string decoded;
73+
decoded.reserve(encoded.size());
74+
for (size_t i = 0; i < encoded.length(); ++i) {
75+
if (encoded[i] == '%' && i + 2 < encoded.length()) {
76+
int value;
77+
std::istringstream is(encoded.substr(i + 1, 2));
78+
if (is >> std::hex >> value) {
79+
decoded += static_cast<char>(value);
80+
i += 2;
81+
} else {
82+
decoded += encoded[i];
83+
}
84+
} else if (encoded[i] == '+') {
85+
decoded += ' ';
86+
} else {
87+
decoded += encoded[i];
88+
}
89+
}
90+
return decoded;
91+
}
92+
7093
template <typename T>
7194
std::string ResponseBase<T>::getCodeDescription() {
7295
if (code == HTTP_STATUS_CODE_OK) {

fdbrpc/include/fdbrpc/HTTP.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ const std::string HTTP_VERB_CONNECT = "CONNECT";
6363
typedef std::map<std::string, std::string, is_iless> Headers;
6464

6565
std::string urlEncode(const std::string& s);
66+
std::string urlDecode(const std::string& s);
6667
std::string awsV4URIEncode(const std::string& s, bool encodeSlash);
6768

6869
template <class T>

fdbrpc/include/fdbrpc/simulator.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,7 @@ class ISimulator : public INetwork {
302302
virtual Future<Void> registerSimHTTPServer(std::string hostname,
303303
std::string service,
304304
Reference<HTTP::IRequestHandler> requestHandler) = 0;
305+
virtual Future<Void> unregisterSimHTTPServer(std::string hostname, std::string service) = 0;
305306

306307
int desiredCoordinators;
307308
int physicalDatacenters;

fdbrpc/sim2.actor.cpp

Lines changed: 83 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,15 @@ struct Sim2Conn final : IConnection, ReferenceCounted<Sim2Conn> {
376376
.detail("StableConnection", stableConnection);
377377
}
378378

379-
~Sim2Conn() { ASSERT_ABORT(!opened || closedByCaller); }
379+
~Sim2Conn() {
380+
// Allow simulated HTTP server connections (either endpoint) to be destroyed without an explicit close.
381+
// These are managed by the HTTP server lifecycle and may not be closed by callers.
382+
const bool isHttpSide = g_simulator->httpServerIps.count(process->address.ip);
383+
const bool isHttpPeer = g_simulator->httpServerIps.count(peerEndpoint.ip);
384+
if (!(isHttpSide || isHttpPeer)) {
385+
ASSERT_ABORT(!opened || closedByCaller);
386+
}
387+
}
380388

381389
void addref() override { ReferenceCounted<Sim2Conn>::addref(); }
382390
void delref() override { ReferenceCounted<Sim2Conn>::delref(); }
@@ -497,6 +505,7 @@ struct Sim2Conn final : IConnection, ReferenceCounted<Sim2Conn> {
497505
ACTOR static Future<Void> sender(Sim2Conn* self) {
498506
loop {
499507
wait(self->writtenBytes.onChange()); // takes place on peer!
508+
wait(g_simulator->onProcess(self->peerProcess));
500509
ASSERT(g_simulator->getCurrentProcess() == self->peerProcess);
501510
wait(delay(.002 * deterministicRandom()->random01()));
502511
self->sentBytes.set(self->writtenBytes.get()); // or possibly just some sometimes...
@@ -539,41 +548,53 @@ struct Sim2Conn final : IConnection, ReferenceCounted<Sim2Conn> {
539548
}
540549
}
541550
ACTOR static Future<Void> whenReadable(Sim2Conn* self) {
542-
try {
543-
loop {
551+
loop {
552+
wait(g_simulator->onProcess(self->process));
553+
try {
544554
if (self->readBytes.get() != self->receivedBytes.get()) {
545-
ASSERT(g_simulator->getCurrentProcess() == self->process);
555+
if (g_simulator->getCurrentProcess() != self->process) {
556+
return Void();
557+
}
546558
return Void();
547559
}
548560
wait(self->receivedBytes.onChange());
549561
self->rollRandomClose();
562+
} catch (Error& e) {
563+
if (g_simulator->getCurrentProcess() != self->process) {
564+
return Void();
565+
}
566+
throw;
550567
}
551-
} catch (Error& e) {
552-
ASSERT(g_simulator->getCurrentProcess() == self->process);
553-
throw;
554568
}
555569
}
556570
ACTOR static Future<Void> whenWritable(Sim2Conn* self) {
557-
try {
558-
loop {
559-
if (!self->peer)
560-
return Void();
571+
loop {
572+
if (!self->peer)
573+
return Void();
574+
wait(g_simulator->onProcess(self->process));
575+
try {
561576
if (self->peer->availableSendBufferForPeer() > 0) {
562-
ASSERT(g_simulator->getCurrentProcess() == self->process);
577+
if (g_simulator->getCurrentProcess() != self->process) {
578+
return Void();
579+
}
563580
return Void();
564581
}
565582
try {
566583
wait(self->peer->receivedBytes.onChange());
567-
ASSERT(g_simulator->getCurrentProcess() == self->peerProcess);
584+
wait(g_simulator->onProcess(self->peerProcess));
585+
if (g_simulator->getCurrentProcess() != self->peerProcess) {
586+
return Void();
587+
}
568588
} catch (Error& e) {
569589
if (e.code() != error_code_broken_promise)
570590
throw;
571591
}
572-
wait(g_simulator->onProcess(self->process));
592+
} catch (Error& e) {
593+
if (g_simulator->getCurrentProcess() != self->process) {
594+
return Void();
595+
}
596+
throw;
573597
}
574-
} catch (Error& e) {
575-
ASSERT(g_simulator->getCurrentProcess() == self->process);
576-
throw;
577598
}
578599
}
579600

@@ -1857,6 +1878,13 @@ class Sim2 final : public ISimulator, public INetworkConnections {
18571878
.detail("Address", p->address)
18581879
.detail("MachineId", p->locality.machineId());
18591880
currentlyRebootingProcesses.insert(std::pair<NetworkAddress, ProcessInfo*>(p->address, p));
1881+
1882+
// Safety check to prevent Optional assertion failure
1883+
if (!p->locality.machineId().present()) {
1884+
TraceEvent("Sim2DestroyProcessNoMachineId").detail("Name", p->name).detail("Address", p->address);
1885+
return;
1886+
}
1887+
18601888
std::vector<ProcessInfo*>& processes = machines[p->locality.machineId().get()].processes;
18611889
machines[p->locality.machineId().get()].removeRemotePort(p->address.port);
18621890
if (p != processes.back()) {
@@ -2596,6 +2624,34 @@ class Sim2 final : public ISimulator, public INetworkConnections {
25962624
return registerSimHTTPServerActor(this, hostname, service, requestHandler);
25972625
}
25982626

2627+
ACTOR static Future<Void> unregisterSimHTTPServerActor(Sim2* self, std::string hostname, std::string service) {
2628+
std::string id = hostname + ":" + service;
2629+
state std::unordered_map<std::string, Reference<HTTP::SimRegisteredHandlerContext>>::iterator handlerIt =
2630+
self->httpHandlers.find(id);
2631+
if (handlerIt == self->httpHandlers.end()) {
2632+
return Void();
2633+
}
2634+
// Copy processes to avoid races
2635+
state std::vector<std::pair<ProcessInfo*, Reference<HTTP::SimServerContext>>> procsCopy =
2636+
self->httpServerProcesses;
2637+
state int i = 0;
2638+
for (; i < procsCopy.size(); i++) {
2639+
state ProcessInfo* serverProcess = procsCopy[i].first;
2640+
wait(self->onProcess(serverProcess, TaskPriority::DefaultYield));
2641+
handlerIt->second->removeIp(serverProcess->address.ip);
2642+
// Stop the HTTP server listeners to ensure connections are torn down
2643+
procsCopy[i].second->stop();
2644+
}
2645+
self->httpHandlers.erase(handlerIt);
2646+
return Void();
2647+
}
2648+
2649+
Future<Void> unregisterSimHTTPServer(std::string hostname, std::string service) override {
2650+
return unregisterSimHTTPServerActor(this, hostname, service);
2651+
}
2652+
2653+
// TODO: Unregister will be added in a follow-up change
2654+
25992655
Sim2(bool printSimTime)
26002656
: time(0.0), timerTime(0.0), currentTaskID(TaskPriority::Zero), yielded(false), yield_limit(0),
26012657
printSimTime(printSimTime) {
@@ -2733,6 +2789,10 @@ class UDPSimSocket : public IUDPSocket, ReferenceCounted<UDPSimSocket> {
27332789
Future<Void> onClosed() const { return closed.getFuture(); }
27342790

27352791
ACTOR static Future<Void> cleanupPeerSocket(UDPSimSocket* self) {
2792+
// Safety check to prevent Optional assertion failure
2793+
if (!self->peerSocket.present()) {
2794+
return Void();
2795+
}
27362796
wait(self->peerSocket.get()->onClosed());
27372797
self->peerSocket.reset();
27382798
return Void();
@@ -2803,6 +2863,12 @@ class UDPSimSocket : public IUDPSocket, ReferenceCounted<UDPSimSocket> {
28032863
}
28042864
if (!peerSocket.present() || peerSocket.get()->isClosed()) {
28052865
peerSocket.reset();
2866+
2867+
// Safety check to prevent Optional assertion failure
2868+
if (!peerProcess.present()) {
2869+
return res;
2870+
}
2871+
28062872
auto iter = peerProcess.get()->boundUDPSockets.find(peerAddress.get());
28072873
if (iter == peerProcess.get()->boundUDPSockets.end()) {
28082874
return fmap([sz](Void) { return sz; }, delay(0.0));

0 commit comments

Comments
 (0)