Skip to content

Separate backup and restore into two workloads [release-7.4] #12172

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 15 commits into
base: release-7.4
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
414 changes: 272 additions & 142 deletions fdbclient/FileBackupAgent.actor.cpp

Large diffs are not rendered by default.

28 changes: 28 additions & 0 deletions fdbclient/ManagementAPI.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,34 @@ bool isCompleteConfiguration(std::map<std::string, std::string> const& options)
options.count(p + "storage_engine") == 1;
}

ACTOR Future<Void> disableBackupWorker(Database cx) {
DatabaseConfiguration configuration = wait(getDatabaseConfiguration(cx));
if (!configuration.backupWorkerEnabled) {
TraceEvent("BackupWorkerAlreadyDisabled");
return Void();
}
ConfigurationResult res = wait(ManagementAPI::changeConfig(cx.getReference(), "backup_worker_enabled:=0", true));
if (res != ConfigurationResult::SUCCESS) {
TraceEvent("BackupWorkerDisableFailed").detail("Result", res);
throw operation_failed();
}
return Void();
}

ACTOR Future<Void> enableBackupWorker(Database cx) {
DatabaseConfiguration configuration = wait(getDatabaseConfiguration(cx));
if (configuration.backupWorkerEnabled) {
TraceEvent("BackupWorkerAlreadyEnabled");
return Void();
}
ConfigurationResult res = wait(ManagementAPI::changeConfig(cx.getReference(), "backup_worker_enabled:=1", true));
if (res != ConfigurationResult::SUCCESS) {
TraceEvent("BackupWorkerEnableFailed").detail("Result", res);
throw operation_failed();
}
return Void();
}

/*
- Validates encryption and tenant mode configurations
- During cluster creation (configure new) we allow the following:
Expand Down
39 changes: 20 additions & 19 deletions fdbclient/include/fdbclient/BackupAgent.actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@
#elif !defined(FDBCLIENT_BACKUP_AGENT_ACTOR_H)
#define FDBCLIENT_BACKUP_AGENT_ACTOR_H

#include <ctime>
#include <climits>

#include "flow/flow.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/TaskBucket.h"
#include "fdbclient/Notified.h"
#include "flow/IAsyncFile.h"
#include "fdbclient/KeyBackedTypes.actor.h"
#include <ctime>
#include <climits>
#include "fdbclient/BackupContainer.h"
#include "flow/actorcompiler.h" // has to be last include

Expand Down Expand Up @@ -205,8 +206,7 @@ class FileBackupAgent : public BackupAgentBase {
OnlyApplyMutationLogs = OnlyApplyMutationLogs::False,
InconsistentSnapshotOnly = InconsistentSnapshotOnly::False,
Optional<std::string> const& encryptionKeyFileName = {},
Optional<std::string> blobManifestUrl = {},
TransformPartitionedLog transformPartitionedLog = TransformPartitionedLog::False);
Optional<std::string> blobManifestUrl = {});

// this method will construct range and version vectors and then call restore()
Future<Version> restore(Database cx,
Expand Down Expand Up @@ -245,8 +245,7 @@ class FileBackupAgent : public BackupAgentBase {
InconsistentSnapshotOnly inconsistentSnapshotOnly = InconsistentSnapshotOnly::False,
Version beginVersion = ::invalidVersion,
Optional<std::string> const& encryptionKeyFileName = {},
Optional<std::string> blobManifestUrl = {},
TransformPartitionedLog transformPartitionedLog = TransformPartitionedLog::False);
Optional<std::string> blobManifestUrl = {});

Future<Version> atomicRestore(Database cx,
Key tagName,
Expand Down Expand Up @@ -314,14 +313,16 @@ class FileBackupAgent : public BackupAgentBase {
partitionedLog,
incrementalBackupOnly,
encryptionKeyFileName,
blobManifestUrl);
blobManifestUrl) +
checkAndDisableBackupWorkers(cx);
});
}

Future<Void> discontinueBackup(Reference<ReadYourWritesTransaction> tr, Key tagName);
Future<Void> discontinueBackup(Database cx, Key tagName) {
return runRYWTransaction(
cx, [=](Reference<ReadYourWritesTransaction> tr) { return discontinueBackup(tr, tagName); });
cx, [=](Reference<ReadYourWritesTransaction> tr) { return discontinueBackup(tr, tagName); }) +
checkAndDisableBackupWorkers(cx);
}

// Terminate an ongoing backup, without waiting for the backup to finish.
Expand All @@ -333,9 +334,15 @@ class FileBackupAgent : public BackupAgentBase {
// logRangesRange and backupLogKeys will be cleared for this backup.
Future<Void> abortBackup(Reference<ReadYourWritesTransaction> tr, std::string tagName);
Future<Void> abortBackup(Database cx, std::string tagName) {
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) { return abortBackup(tr, tagName); });
// First abort the backup, then check and disable backup workers if needed.
return runRYWTransaction(cx,
[=](Reference<ReadYourWritesTransaction> tr) { return abortBackup(tr, tagName); }) +
checkAndDisableBackupWorkers(cx);
}

// Disable backup workers if no active partitioned backup is running.
Future<Void> checkAndDisableBackupWorkers(Database cx);

Future<std::string> getStatus(Database cx, ShowErrors, std::string tagName);
Future<std::string> getStatusJSON(Database cx, std::string tagName);

Expand Down Expand Up @@ -895,9 +902,6 @@ class BackupConfig : public KeyBackedTaskConfig {
return configSpace.pack(__FUNCTION__sr);
}

// Set to true if backup worker is enabled.
KeyBackedProperty<bool> backupWorkerEnabled() { return configSpace.pack(__FUNCTION__sr); }

// Set to true if partitioned log is enabled (only useful if backup worker is also enabled).
KeyBackedProperty<bool> partitionedLogEnabled() { return configSpace.pack(__FUNCTION__sr); }

Expand Down Expand Up @@ -929,18 +933,15 @@ class BackupConfig : public KeyBackedTaskConfig {
tr->setOption(FDBTransactionOptions::READ_LOCK_AWARE);
auto lastLog = latestLogEndVersion().get(tr);
auto firstSnapshot = firstSnapshotEndVersion().get(tr);
auto workerEnabled = backupWorkerEnabled().get(tr);
auto plogEnabled = partitionedLogEnabled().get(tr);
auto workerVersion = latestBackupWorkerSavedVersion().get(tr);
auto incrementalBackup = incrementalBackupOnly().get(tr);
return map(success(lastLog) && success(firstSnapshot) && success(workerEnabled) && success(plogEnabled) &&
success(workerVersion) && success(incrementalBackup),
return map(success(lastLog) && success(firstSnapshot) && success(plogEnabled) && success(workerVersion) &&
success(incrementalBackup),
[=](Void) -> Optional<Version> {
// The latest log greater than the oldest snapshot is the restorable version
Optional<Version> logVersion = workerEnabled.get().present() && workerEnabled.get().get() &&
plogEnabled.get().present() && plogEnabled.get().get()
? workerVersion.get()
: lastLog.get();
Optional<Version> logVersion =
plogEnabled.get().present() && plogEnabled.get().get() ? workerVersion.get() : lastLog.get();
if (logVersion.present() && firstSnapshot.get().present() &&
logVersion.get() > firstSnapshot.get().get()) {
return std::max(logVersion.get() - 1, firstSnapshot.get().get());
Expand Down
3 changes: 3 additions & 0 deletions fdbclient/include/fdbclient/ManagementAPI.actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -294,5 +294,8 @@ bool schemaMatch(json_spirit::mValue const& schema,
// storage nodes
ACTOR Future<Void> mgmtSnapCreate(Database cx, Standalone<StringRef> snapCmd, UID snapUID);

ACTOR Future<Void> disableBackupWorker(Database cx);
ACTOR Future<Void> enableBackupWorker(Database cx);

#include "flow/unactorcompiler.h"
#endif
2 changes: 0 additions & 2 deletions fdbserver/QuietDatabase.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1015,8 +1015,6 @@ ACTOR Future<Void> disableConsistencyScanInSim(Database db, bool waitForCompleti
return Void();
}

ACTOR Future<Void> disableBackupWorker(Database cx);

// Waits until a database quiets down (no data in flight, small tlog queue, low SQ, no active data distribution). This
// requires the database to be available and healthy in order to succeed.
ACTOR Future<Void> waitForQuietDatabase(Database cx,
Expand Down
14 changes: 7 additions & 7 deletions fdbserver/SimulatedCluster.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,8 @@ T simulate(const T& in) {
}

ACTOR Future<Void> runBackup(Reference<IClusterConnectionRecord> connRecord) {
state std::vector<Future<Void>> agentFutures;
state Future<Void> agentFuture;
state FileBackupAgent fileAgent;

while (g_simulator->backupAgents == ISimulator::BackupAgentType::WaitForType) {
wait(delay(1.0));
Expand All @@ -655,17 +656,16 @@ ACTOR Future<Void> runBackup(Reference<IClusterConnectionRecord> connRecord) {
if (g_simulator->backupAgents == ISimulator::BackupAgentType::BackupToFile) {
Database cx = Database::createDatabase(connRecord, ApiVersion::LATEST_VERSION);

state FileBackupAgent fileAgent;
agentFutures.push_back(fileAgent.run(
cx, 1.0 / CLIENT_KNOBS->BACKUP_AGGREGATE_POLL_RATE, CLIENT_KNOBS->SIM_BACKUP_TASKS_PER_AGENT));
TraceEvent("SimBackupAgentsStarting").log();
agentFuture =
fileAgent.run(cx, 1.0 / CLIENT_KNOBS->BACKUP_AGGREGATE_POLL_RATE, CLIENT_KNOBS->SIM_BACKUP_TASKS_PER_AGENT);

while (g_simulator->backupAgents == ISimulator::BackupAgentType::BackupToFile) {
wait(delay(1.0));
}

for (auto it : agentFutures) {
it.cancel();
}
TraceEvent("SimBackupAgentsStopping").log();
agentFuture.cancel();
}

wait(Future<Void>(Never()));
Expand Down
28 changes: 0 additions & 28 deletions fdbserver/tester.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2665,34 +2665,6 @@ ACTOR Future<Void> disableConnectionFailuresAfter(double seconds, std::string co
return Void();
}

ACTOR Future<Void> disableBackupWorker(Database cx) {
DatabaseConfiguration configuration = wait(getDatabaseConfiguration(cx));
if (!configuration.backupWorkerEnabled) {
TraceEvent("BackupWorkerAlreadyDisabled");
return Void();
}
ConfigurationResult res = wait(ManagementAPI::changeConfig(cx.getReference(), "backup_worker_enabled:=0", true));
if (res != ConfigurationResult::SUCCESS) {
TraceEvent("BackupWorkerDisableFailed").detail("Result", res);
throw operation_failed();
}
return Void();
}

ACTOR Future<Void> enableBackupWorker(Database cx) {
DatabaseConfiguration configuration = wait(getDatabaseConfiguration(cx));
if (configuration.backupWorkerEnabled) {
TraceEvent("BackupWorkerAlreadyEnabled");
return Void();
}
ConfigurationResult res = wait(ManagementAPI::changeConfig(cx.getReference(), "backup_worker_enabled:=1", true));
if (res != ConfigurationResult::SUCCESS) {
TraceEvent("BackupWorkerEnableFailed").detail("Result", res);
throw operation_failed();
}
return Void();
}

/**
* \brief Test orchestrator: sends test specification to testers in the right order and collects the results.
*
Expand Down
Loading