Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions fdbclient/BackupContainerS3BlobStore.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "fdbclient/BackupContainerS3BlobStore.h"
#include "fdbrpc/AsyncFileEncrypted.h"
#include "fdbrpc/AsyncFileReadAhead.actor.h"
#include "fdbrpc/HTTP.h"
#include "flow/actorcompiler.h" // This must be the last #include.

class BackupContainerS3BlobStoreImpl {
Expand All @@ -38,8 +39,10 @@ class BackupContainerS3BlobStoreImpl {
S3BlobStoreEndpoint::ListResult contents = wait(bstore->listObjects(bucket, basePath));
std::vector<std::string> results;
for (const auto& f : contents.objects) {
// URL decode the object name since S3 XML responses contain URL-encoded names
std::string decodedName = HTTP::urlDecode(f.name);
results.push_back(
bstore->getResourceURL(f.name.substr(basePath.size()), format("bucket=%s", bucket.c_str())));
bstore->getResourceURL(decodedName.substr(basePath.size()), format("bucket=%s", bucket.c_str())));
}
return results;
}
Expand Down Expand Up @@ -85,12 +88,15 @@ class BackupContainerS3BlobStoreImpl {
return pathFilter(folderPath.substr(prefixTrim));
};

state S3BlobStoreEndpoint::ListResult result = wait(bc->m_bstore->listObjects(
bc->m_bucket, bc->dataPath(path), '/', std::numeric_limits<int>::max(), rawPathFilter));
// Use flat listing for backup files to ensure all files are found regardless of directory structure
state S3BlobStoreEndpoint::ListResult result =
wait(bc->m_bstore->listObjects(bc->m_bucket, bc->dataPath(path), Optional<char>(), 0, rawPathFilter));
BackupContainerFileSystem::FilesAndSizesT files;
for (const auto& o : result.objects) {
ASSERT(o.name.size() >= prefixTrim);
files.push_back({ o.name.substr(prefixTrim), o.size });
// URL decode the object name since S3 XML responses contain URL-encoded names
std::string decodedName = HTTP::urlDecode(o.name);
files.push_back({ decodedName.substr(prefixTrim), o.size });
}
return files;
}
Expand Down
16 changes: 14 additions & 2 deletions fdbclient/S3BlobStore.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

#include "fdbclient/S3BlobStore.h"

#include <sstream>
#include "fdbrpc/HTTP.h"
#include "fdbclient/ClientKnobs.h"
#include "fdbclient/Knobs.h"
#include "flow/FastRef.h"
Expand Down Expand Up @@ -199,6 +201,11 @@ std::string S3BlobStoreEndpoint::BlobKnobs::getURLParameters() const {
}

std::string guessRegionFromDomain(std::string domain) {
// Special case for localhost/127.0.0.1 to prevent basic_string exception
if (domain == "127.0.0.1" || domain == "localhost") {
return "us-east-1";
}

static const std::vector<const char*> knownServices = { "s3.", "cos.", "oss-", "obs." };
boost::algorithm::to_lower(domain);

Expand Down Expand Up @@ -843,6 +850,10 @@ ACTOR Future<S3BlobStoreEndpoint::ReusableConnection> connect_impl(Reference<S3B
} else {
wait(store(conn, INetworkConnections::net()->connect(host, service, isTLS)));
}

// Ensure connection is valid before handshake
ASSERT(conn.isValid());

wait(conn->connectHandshake());

TraceEvent("S3BlobStoreEndpointNewConnectionSuccess")
Expand Down Expand Up @@ -1460,7 +1471,8 @@ ACTOR Future<Void> listObjectsStream_impl(Reference<S3BlobStoreEndpoint> bstore,
if (key == nullptr) {
throw http_bad_response();
}
object.name = key->value();
// URL decode the object name since S3 XML responses contain URL-encoded names
object.name = HTTP::urlDecode(key->value());

xml_node<>* size = n->first_node("Size");
if (size == nullptr) {
Expand Down Expand Up @@ -2441,4 +2453,4 @@ TEST_CASE("/backup/s3/guess_region") {
ASSERT_EQ(e.code(), error_code_backup_invalid_url);
}
return Void();
}
}
35 changes: 25 additions & 10 deletions fdbrpc/FlowTransport.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1163,9 +1163,9 @@ ACTOR static void deliver(TransportData* self,
g_network->setCurrentTask(priority);
}

auto receiver = self->endpoints.get(destination.token);
if (receiver && (isTrustedPeer || receiver->isPublic())) {
if (!checkCompatible(receiver->peerCompatibilityPolicy(), reader.protocolVersion())) {
auto msgReceiver = self->endpoints.get(destination.token);
if (msgReceiver && (isTrustedPeer || msgReceiver->isPublic())) {
if (!checkCompatible(msgReceiver->peerCompatibilityPolicy(), reader.protocolVersion())) {
return;
}
try {
Expand All @@ -1177,7 +1177,18 @@ ACTOR static void deliver(TransportData* self,
StringRef data = reader.arenaReadAll();
ASSERT(data.size() > 8);
ArenaObjectReader objReader(reader.arena(), reader.arenaReadAll(), AssumeVersion(reader.protocolVersion()));
receiver->receive(objReader);

// Defensive check: verify receiver is still valid before using it
// Re-fetch the receiver to ensure it hasn't been invalidated
auto currentReceiver = self->endpoints.get(destination.token);
if (currentReceiver != msgReceiver) {
TraceEvent(SevWarn, "ReceiverInvalidated")
.detail("Token", destination.token.toString())
.detail("Peer", destination.getPrimaryAddress());
return;
}

msgReceiver->receive(objReader);
g_currentDeliveryPeerAddress = NetworkAddressList();
g_currentDeliverPeerAddressTrusted = false;
g_currentDeliveryPeerDisconnect = Future<Void>();
Expand All @@ -1197,11 +1208,11 @@ ACTOR static void deliver(TransportData* self,
}
} else if (destination.token.first() & TOKEN_STREAM_FLAG) {
// We don't have the (stream) endpoint 'token', notify the remote machine
if (receiver) {
if (msgReceiver) {
TraceEvent(SevWarnAlways, "AttemptedRPCToPrivatePrevented"_audit)
.detail("From", peerAddress)
.detail("Token", destination.token)
.detail("Receiver", typeid(*receiver).name());
.detail("Receiver", typeid(*msgReceiver).name());
ASSERT(!self->isLocalAddress(destination.getPrimaryAddress()));
Reference<Peer> peer = self->getOrOpenPeer(destination.getPrimaryAddress());
sendPacket(self,
Expand Down Expand Up @@ -1849,11 +1860,15 @@ void FlowTransport::addPeerReference(const Endpoint& endpoint, bool isStream) {
void FlowTransport::removePeerReference(const Endpoint& endpoint, bool isStream) {
if (!isStream || !endpoint.getPrimaryAddress().isValid() || !endpoint.getPrimaryAddress().isPublic())
return;
Reference<Peer> peer = self->getPeer(endpoint.getPrimaryAddress());
Reference<Peer> peer = self->getOrOpenPeer(endpoint.getPrimaryAddress());
if (peer) {
peer->peerReferences--;
if (peer->peerReferences < 0) {
TraceEvent(SevError, "InvalidPeerReferences")
// Use getOrOpenPeer to ensure consistency with addPeerReference
// This eliminates race conditions between add/remove operations
if (peer->peerReferences > 0) {
peer->peerReferences--;
} else {
// This indicates a bug in our reference counting logic
TraceEvent(SevError, "PeerReferenceUnexpectedState")
.detail("References", peer->peerReferences)
.detail("Address", endpoint.getPrimaryAddress())
.detail("Token", endpoint.token);
Expand Down
23 changes: 23 additions & 0 deletions fdbrpc/HTTP.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "libb64/encode.h"
#include "flow/Knobs.h"
#include <cctype>
#include <sstream>
#include "flow/IConnection.h"
#include <unordered_map>

Expand Down Expand Up @@ -67,6 +68,28 @@ std::string urlEncode(const std::string& s) {
return o;
}

std::string urlDecode(const std::string& encoded) {
std::string decoded;
decoded.reserve(encoded.size());
for (size_t i = 0; i < encoded.length(); ++i) {
if (encoded[i] == '%' && i + 2 < encoded.length()) {
int value;
std::istringstream is(encoded.substr(i + 1, 2));
if (is >> std::hex >> value) {
decoded += static_cast<char>(value);
i += 2;
} else {
decoded += encoded[i];
}
} else if (encoded[i] == '+') {
decoded += ' ';
} else {
decoded += encoded[i];
}
}
return decoded;
}

template <typename T>
std::string ResponseBase<T>::getCodeDescription() {
if (code == HTTP_STATUS_CODE_OK) {
Expand Down
1 change: 1 addition & 0 deletions fdbrpc/include/fdbrpc/HTTP.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ const std::string HTTP_VERB_CONNECT = "CONNECT";
typedef std::map<std::string, std::string, is_iless> Headers;

std::string urlEncode(const std::string& s);
std::string urlDecode(const std::string& s);
std::string awsV4URIEncode(const std::string& s, bool encodeSlash);

template <class T>
Expand Down
1 change: 1 addition & 0 deletions fdbrpc/include/fdbrpc/simulator.h
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ class ISimulator : public INetwork {
virtual Future<Void> registerSimHTTPServer(std::string hostname,
std::string service,
Reference<HTTP::IRequestHandler> requestHandler) = 0;
virtual Future<Void> unregisterSimHTTPServer(std::string hostname, std::string service) = 0;

int desiredCoordinators;
int physicalDatacenters;
Expand Down
57 changes: 56 additions & 1 deletion fdbrpc/sim2.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,15 @@ struct Sim2Conn final : IConnection, ReferenceCounted<Sim2Conn> {
.detail("StableConnection", stableConnection);
}

~Sim2Conn() { ASSERT_ABORT(!opened || closedByCaller); }
~Sim2Conn() {
// Allow simulated HTTP server connections (either endpoint) to be destroyed without an explicit close.
// These are managed by the HTTP server lifecycle and may not be closed by callers.
const bool isHttpSide = g_simulator->httpServerIps.count(process->address.ip);
const bool isHttpPeer = g_simulator->httpServerIps.count(peerEndpoint.ip);
if (!(isHttpSide || isHttpPeer)) {
ASSERT_ABORT(!opened || closedByCaller);
}
}

void addref() override { ReferenceCounted<Sim2Conn>::addref(); }
void delref() override { ReferenceCounted<Sim2Conn>::delref(); }
Expand Down Expand Up @@ -497,6 +505,7 @@ struct Sim2Conn final : IConnection, ReferenceCounted<Sim2Conn> {
ACTOR static Future<Void> sender(Sim2Conn* self) {
loop {
wait(self->writtenBytes.onChange()); // takes place on peer!
wait(g_simulator->onProcess(self->peerProcess));
ASSERT(g_simulator->getCurrentProcess() == self->peerProcess);
wait(delay(.002 * deterministicRandom()->random01()));
self->sentBytes.set(self->writtenBytes.get()); // or possibly just some sometimes...
Expand Down Expand Up @@ -564,6 +573,9 @@ struct Sim2Conn final : IConnection, ReferenceCounted<Sim2Conn> {
}
try {
wait(self->peer->receivedBytes.onChange());
// Check if peer is still valid after the wait
if (!self->peer)
return Void();
ASSERT(g_simulator->getCurrentProcess() == self->peerProcess);
} catch (Error& e) {
if (e.code() != error_code_broken_promise)
Expand Down Expand Up @@ -1857,6 +1869,13 @@ class Sim2 final : public ISimulator, public INetworkConnections {
.detail("Address", p->address)
.detail("MachineId", p->locality.machineId());
currentlyRebootingProcesses.insert(std::pair<NetworkAddress, ProcessInfo*>(p->address, p));

// Safety check to prevent Optional assertion failure
if (!p->locality.machineId().present()) {
TraceEvent("Sim2DestroyProcessNoMachineId").detail("Name", p->name).detail("Address", p->address);
return;
}

std::vector<ProcessInfo*>& processes = machines[p->locality.machineId().get()].processes;
machines[p->locality.machineId().get()].removeRemotePort(p->address.port);
if (p != processes.back()) {
Expand Down Expand Up @@ -2596,6 +2615,32 @@ class Sim2 final : public ISimulator, public INetworkConnections {
return registerSimHTTPServerActor(this, hostname, service, requestHandler);
}

ACTOR static Future<Void> unregisterSimHTTPServerActor(Sim2* self, std::string hostname, std::string service) {
std::string id = hostname + ":" + service;
state std::unordered_map<std::string, Reference<HTTP::SimRegisteredHandlerContext>>::iterator handlerIt =
self->httpHandlers.find(id);
if (handlerIt == self->httpHandlers.end()) {
return Void();
}
// Copy processes to avoid races
state std::vector<std::pair<ProcessInfo*, Reference<HTTP::SimServerContext>>> procsCopy =
self->httpServerProcesses;
state int i = 0;
for (; i < procsCopy.size(); i++) {
state ProcessInfo* serverProcess = procsCopy[i].first;
wait(self->onProcess(serverProcess, TaskPriority::DefaultYield));
handlerIt->second->removeIp(serverProcess->address.ip);
// Stop the HTTP server listeners to ensure connections are torn down
procsCopy[i].second->stop();
}
self->httpHandlers.erase(handlerIt);
return Void();
}

Future<Void> unregisterSimHTTPServer(std::string hostname, std::string service) override {
return unregisterSimHTTPServerActor(this, hostname, service);
}

Sim2(bool printSimTime)
: time(0.0), timerTime(0.0), currentTaskID(TaskPriority::Zero), yielded(false), yield_limit(0),
printSimTime(printSimTime) {
Expand Down Expand Up @@ -2733,6 +2778,10 @@ class UDPSimSocket : public IUDPSocket, ReferenceCounted<UDPSimSocket> {
Future<Void> onClosed() const { return closed.getFuture(); }

ACTOR static Future<Void> cleanupPeerSocket(UDPSimSocket* self) {
// Safety check to prevent Optional assertion failure
if (!self->peerSocket.present()) {
return Void();
}
wait(self->peerSocket.get()->onClosed());
self->peerSocket.reset();
return Void();
Expand Down Expand Up @@ -2803,6 +2852,12 @@ class UDPSimSocket : public IUDPSocket, ReferenceCounted<UDPSimSocket> {
}
if (!peerSocket.present() || peerSocket.get()->isClosed()) {
peerSocket.reset();

// Safety check to prevent Optional assertion failure
if (!peerProcess.present()) {
return res;
}

auto iter = peerProcess.get()->boundUDPSockets.find(peerAddress.get());
if (iter == peerProcess.get()->boundUDPSockets.end()) {
return fmap([sz](Void) { return sz; }, delay(0.0));
Expand Down
Loading