Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit afe638a

Browse files
authored
Fork with point in time recovery (#691)
* fork with point in time recovery * removed old /v1/namespaces/:name/restore endpoint
1 parent 8d900c1 commit afe638a

File tree

6 files changed

+184
-153
lines changed

6 files changed

+184
-153
lines changed

bottomless/src/replicator.rs

Lines changed: 84 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -147,101 +147,66 @@ impl Options {
147147
}
148148

149149
pub fn from_env() -> Result<Self> {
150-
let mut options = Self::default();
151-
if let Ok(key) = std::env::var("LIBSQL_BOTTOMLESS_ENDPOINT") {
152-
options.aws_endpoint = Some(key);
153-
}
154-
if let Ok(bucket_name) = std::env::var("LIBSQL_BOTTOMLESS_BUCKET") {
155-
options.bucket_name = bucket_name;
156-
}
157-
if let Ok(seconds) = std::env::var("LIBSQL_BOTTOMLESS_BATCH_INTERVAL_SECS") {
158-
if let Ok(seconds) = seconds.parse::<u64>() {
159-
options.max_batch_interval = Duration::from_secs(seconds);
160-
}
161-
}
162-
if let Ok(access_key_id) = std::env::var("LIBSQL_BOTTOMLESS_AWS_ACCESS_KEY_ID") {
163-
options.access_key_id = Some(access_key_id);
164-
}
165-
if let Ok(secret_access_key) = std::env::var("LIBSQL_BOTTOMLESS_AWS_SECRET_ACCESS_KEY") {
166-
options.secret_access_key = Some(secret_access_key);
167-
}
168-
if let Ok(region) = std::env::var("LIBSQL_BOTTOMLESS_AWS_DEFAULT_REGION") {
169-
options.region = Some(region);
170-
}
171-
if let Ok(count) = std::env::var("LIBSQL_BOTTOMLESS_BATCH_MAX_FRAMES") {
172-
match count.parse::<usize>() {
173-
Ok(count) => options.max_frames_per_batch = count,
174-
Err(e) => {
175-
bail!(
176-
"Invalid LIBSQL_BOTTOMLESS_BATCH_MAX_FRAMES environment variable: {}",
177-
e
178-
)
179-
}
150+
fn env_var(key: &str) -> Result<String> {
151+
match std::env::var(key) {
152+
Ok(res) => Ok(res),
153+
Err(_) => bail!("{} environment variable not set", key),
180154
}
181155
}
182-
if let Ok(parallelism) = std::env::var("LIBSQL_BOTTOMLESS_S3_PARALLEL_MAX") {
183-
match parallelism.parse::<usize>() {
184-
Ok(parallelism) => options.s3_upload_max_parallelism = parallelism,
185-
Err(e) => bail!(
186-
"Invalid LIBSQL_BOTTOMLESS_S3_PARALLEL_MAX environment variable: {}",
187-
e
188-
),
156+
fn env_var_or<S: ToString>(key: &str, default_value: S) -> String {
157+
match std::env::var(key) {
158+
Ok(res) => res,
159+
Err(_) => default_value.to_string(),
189160
}
190161
}
191-
if let Ok(swap_after) = std::env::var("LIBSQL_BOTTOMLESS_RESTORE_TXN_SWAP_THRESHOLD") {
192-
match swap_after.parse::<u32>() {
193-
Ok(swap_after) => options.restore_transaction_page_swap_after = swap_after,
194-
Err(e) => bail!(
195-
"Invalid LIBSQL_BOTTOMLESS_RESTORE_TXN_SWAP_THRESHOLD environment variable: {}",
196-
e
197-
),
198-
}
199-
}
200-
if let Ok(fpath) = std::env::var("LIBSQL_BOTTOMLESS_RESTORE_TXN_FILE") {
201-
options.restore_transaction_cache_fpath = fpath;
202-
}
203-
if let Ok(compression) = std::env::var("LIBSQL_BOTTOMLESS_COMPRESSION") {
204-
match CompressionKind::parse(&compression) {
205-
Ok(compression) => options.use_compression = compression,
206-
Err(e) => bail!(
207-
"Invalid LIBSQL_BOTTOMLESS_COMPRESSION environment variable: {}",
208-
e
209-
),
210-
}
211-
}
212-
if let Ok(verify) = std::env::var("LIBSQL_BOTTOMLESS_VERIFY_CRC") {
213-
match verify.to_lowercase().as_ref() {
214-
"yes" | "true" | "1" | "y" | "t" => options.verify_crc = true,
215-
"no" | "false" | "0" | "n" | "f" => options.verify_crc = false,
216-
other => bail!(
217-
"Invalid LIBSQL_BOTTOMLESS_VERIFY_CRC environment variable: {}",
218-
other
219-
),
220-
}
221-
}
222-
Ok(options)
223-
}
224-
}
225162

226-
impl Default for Options {
227-
fn default() -> Self {
228-
let db_id = std::env::var("LIBSQL_BOTTOMLESS_DATABASE_ID").ok();
229-
Options {
230-
create_bucket_if_not_exists: true,
231-
verify_crc: true,
232-
use_compression: CompressionKind::Gzip,
233-
max_batch_interval: Duration::from_secs(15),
234-
max_frames_per_batch: 500, // basically half of the default SQLite checkpoint size
235-
s3_upload_max_parallelism: 32,
236-
restore_transaction_page_swap_after: 1000,
163+
let db_id = env_var("LIBSQL_BOTTOMLESS_DATABASE_ID").ok();
164+
let aws_endpoint = env_var("LIBSQL_BOTTOMLESS_ENDPOINT").ok();
165+
let bucket_name = env_var_or("LIBSQL_BOTTOMLESS_BUCKET", "bottomless");
166+
let max_batch_interval = Duration::from_secs(
167+
env_var_or("LIBSQL_BOTTOMLESS_BATCH_INTERVAL_SECS", 15).parse::<u64>()?,
168+
);
169+
let access_key_id = env_var("LIBSQL_BOTTOMLESS_AWS_ACCESS_KEY_ID").ok();
170+
let secret_access_key = env_var("LIBSQL_BOTTOMLESS_AWS_SECRET_ACCESS_KEY").ok();
171+
let region = env_var("LIBSQL_BOTTOMLESS_AWS_DEFAULT_REGION").ok();
172+
let max_frames_per_batch =
173+
env_var_or("LIBSQL_BOTTOMLESS_BATCH_MAX_FRAMES", 500).parse::<usize>()?;
174+
let s3_upload_max_parallelism =
175+
env_var_or("LIBSQL_BOTTOMLESS_S3_PARALLEL_MAX", 32).parse::<usize>()?;
176+
let restore_transaction_page_swap_after =
177+
env_var_or("LIBSQL_BOTTOMLESS_RESTORE_TXN_SWAP_THRESHOLD", 1000).parse::<u32>()?;
178+
let restore_transaction_cache_fpath =
179+
env_var_or("LIBSQL_BOTTOMLESS_RESTORE_TXN_FILE", ".bottomless.restore");
180+
let use_compression =
181+
CompressionKind::parse(&env_var_or("LIBSQL_BOTTOMLESS_COMPRESSION", "gz"))
182+
.map_err(|e| anyhow!("unknown compression kind: {}", e))?;
183+
let verify_crc = match env_var_or("LIBSQL_BOTTOMLESS_VERIFY_CRC", true)
184+
.to_lowercase()
185+
.as_ref()
186+
{
187+
"yes" | "true" | "1" | "y" | "t" => true,
188+
"no" | "false" | "0" | "n" | "f" => false,
189+
other => bail!(
190+
"Invalid LIBSQL_BOTTOMLESS_VERIFY_CRC environment variable: {}",
191+
other
192+
),
193+
};
194+
Ok(Options {
237195
db_id,
238-
aws_endpoint: None,
239-
access_key_id: None,
240-
secret_access_key: None,
241-
region: None,
242-
restore_transaction_cache_fpath: ".bottomless.restore".to_string(),
243-
bucket_name: "bottomless".to_string(),
244-
}
196+
create_bucket_if_not_exists: true,
197+
verify_crc,
198+
use_compression,
199+
max_batch_interval,
200+
max_frames_per_batch,
201+
s3_upload_max_parallelism,
202+
restore_transaction_page_swap_after,
203+
aws_endpoint,
204+
access_key_id,
205+
secret_access_key,
206+
region,
207+
restore_transaction_cache_fpath,
208+
bucket_name,
209+
})
245210
}
246211
}
247212

@@ -276,13 +241,10 @@ impl Replicator {
276241
}
277242

278243
let db_path = db_path.into();
279-
let db_name = {
280-
let db_id = options.db_id.unwrap_or_default();
281-
let name = match db_path.find('/') {
282-
Some(index) => &db_path[..index],
283-
None => &db_path,
284-
};
285-
db_id + ":" + name
244+
let db_name = if let Some(db_id) = options.db_id.clone() {
245+
db_id
246+
} else {
247+
bail!("database id was not set")
286248
};
287249
tracing::debug!("Database path: '{}', name: '{}'", db_path, db_name);
288250

@@ -963,7 +925,7 @@ impl Replicator {
963925
async fn restore_from(
964926
&mut self,
965927
generation: Uuid,
966-
utc_time: Option<NaiveDateTime>,
928+
timestamp: Option<NaiveDateTime>,
967929
) -> Result<(RestoreAction, bool)> {
968930
if let Some(tombstone) = self.get_tombstone().await? {
969931
if let Some(timestamp) = Self::generation_to_timestamp(&generation) {
@@ -991,11 +953,31 @@ impl Replicator {
991953

992954
// at this point we know, we should do a full restore
993955

994-
tokio::fs::rename(&self.db_path, format!("{}.bottomless.backup", self.db_path))
995-
.await
996-
.ok(); // Best effort
956+
let backup_path = format!("{}.bottomless.backup", self.db_path);
957+
tokio::fs::rename(&self.db_path, &backup_path).await.ok(); // Best effort
958+
match self.full_restore(generation, timestamp, last_frame).await {
959+
Ok(result) => {
960+
let elapsed = Instant::now() - start_ts;
961+
tracing::info!("Finished database restoration in {:?}", elapsed);
962+
tokio::fs::remove_file(backup_path).await.ok();
963+
Ok(result)
964+
}
965+
Err(e) => {
966+
tracing::error!("failed to restore the database: {}. Rollback", e);
967+
tokio::fs::rename(&backup_path, &self.db_path).await.ok();
968+
Err(e)
969+
}
970+
}
971+
}
972+
973+
async fn full_restore(
974+
&mut self,
975+
generation: Uuid,
976+
timestamp: Option<NaiveDateTime>,
977+
last_frame: u32,
978+
) -> Result<(RestoreAction, bool)> {
997979
let _ = self.remove_wal_files().await; // best effort, WAL files may not exists
998-
let mut db = tokio::fs::OpenOptions::new()
980+
let mut db = OpenOptions::new()
999981
.create(true)
1000982
.read(true)
1001983
.write(true)
@@ -1052,7 +1034,7 @@ impl Replicator {
10521034
page_size as usize,
10531035
last_frame,
10541036
checksum,
1055-
utc_time,
1037+
timestamp,
10561038
&mut db,
10571039
)
10581040
.await?;
@@ -1063,8 +1045,6 @@ impl Replicator {
10631045
}
10641046

10651047
db.shutdown().await?;
1066-
let elapsed = Instant::now() - start_ts;
1067-
tracing::info!("Finished database restoration in {:?}", elapsed);
10681048

10691049
if applied_wal_frame {
10701050
tracing::info!("WAL file has been applied onto database file in generation {}. Requesting snapshot.", generation);

sqld/src/error.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ impl IntoResponse for ForkError {
193193
ForkError::Internal(_)
194194
| ForkError::Io(_)
195195
| ForkError::LogRead(_)
196+
| ForkError::BackupServiceNotConfigured
196197
| ForkError::CreateNamespace(_) => self.format_err(StatusCode::INTERNAL_SERVER_ERROR),
197198
ForkError::ForkReplica => self.format_err(StatusCode::BAD_REQUEST),
198199
}

sqld/src/http/admin/mod.rs

Lines changed: 8 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use std::io::ErrorKind;
99
use std::sync::Arc;
1010
use tokio_util::io::ReaderStream;
1111
use url::Url;
12-
use uuid::Uuid;
1312

1413
use crate::connection::config::DatabaseConfig;
1514
use crate::error::LoadDumpError;
@@ -41,10 +40,6 @@ where
4140
"/v1/namespaces/:namespace/create",
4241
post(handle_create_namespace),
4342
)
44-
.route(
45-
"/v1/namespaces/:namespace/restore",
46-
post(handle_restore_namespace),
47-
)
4843
.route("/v1/namespaces/:namespace", delete(handle_delete_namespace))
4944
.route("/v1/namespaces/:namespace/stats", get(stats::handle_stats))
5045
.with_state(Arc::new(AppState { namespaces }));
@@ -120,13 +115,20 @@ async fn handle_create_namespace<M: MakeNamespace>(
120115
Ok(())
121116
}
122117

118+
#[derive(Debug, Deserialize)]
119+
struct ForkNamespaceReq {
120+
timestamp: NaiveDateTime,
121+
}
122+
123123
async fn handle_fork_namespace<M: MakeNamespace>(
124124
State(app_state): State<Arc<AppState<M>>>,
125125
Path((from, to)): Path<(String, String)>,
126+
req: Option<Json<ForkNamespaceReq>>,
126127
) -> crate::Result<()> {
128+
let timestamp = req.map(|v| v.timestamp);
127129
let from = NamespaceName::from_string(from)?;
128130
let to = NamespaceName::from_string(to)?;
129-
app_state.namespaces.fork(from, to).await?;
131+
app_state.namespaces.fork(from, to, timestamp).await?;
130132
Ok(())
131133
}
132134

@@ -174,27 +176,3 @@ async fn handle_delete_namespace<F: MakeNamespace>(
174176
.await?;
175177
Ok(())
176178
}
177-
178-
#[derive(Debug, Deserialize)]
179-
struct RestoreReq {
180-
generation: Option<Uuid>,
181-
timestamp: Option<NaiveDateTime>,
182-
}
183-
184-
async fn handle_restore_namespace<F: MakeNamespace>(
185-
State(app_state): State<Arc<AppState<F>>>,
186-
Path(namespace): Path<String>,
187-
Json(req): Json<RestoreReq>,
188-
) -> crate::Result<()> {
189-
let restore_option = match (req.generation, req.timestamp) {
190-
(None, None) => RestoreOption::Latest,
191-
(Some(generation), None) => RestoreOption::Generation(generation),
192-
(None, Some(timestamp)) => RestoreOption::PointInTime(timestamp),
193-
(Some(_), Some(_)) => return Err(crate::Error::ConflictingRestoreParameters),
194-
};
195-
app_state
196-
.namespaces
197-
.reset(NamespaceName::from_string(namespace)?, restore_option)
198-
.await?;
199-
Ok(())
200-
}

0 commit comments

Comments
 (0)