Skip to content
This repository has been archived by the owner on Oct 18, 2023. It is now read-only.

Commit

Permalink
flush frames and confirm snapshot completion before calling SQLite ch…
Browse files Browse the repository at this point in the history
…eckpoint (#704)

* flush frames and confirm snapshot completion before calling SQLite checkpoint

* make blocking rt multithreaded

---------

Co-authored-by: ad hoc <[email protected]>
  • Loading branch information
Horusiath and MarinPostma authored Sep 26, 2023
1 parent 8fa0244 commit 0a42e6e
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 71 deletions.
2 changes: 1 addition & 1 deletion bottomless/src/backup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl WalCopier {
if let Some(wal) = self.wal.as_mut() {
wal
} else {
return Err(anyhow!("WAL file not found: \"{:?}\"", self.wal_path));
return Err(anyhow!("WAL file not found: `{}`", self.wal_path));
}
};
let generation = if let Some(generation) = self.generation.load_full() {
Expand Down
63 changes: 38 additions & 25 deletions bottomless/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,31 @@ pub extern "C" fn xCheckpoint(
tracing::trace!("Ignoring a checkpoint request weaker than TRUNCATE");
return ffi::SQLITE_OK;
}

let ctx = get_replicator_context(wal);
let last_known_frame = ctx.replicator.last_known_frame();
ctx.replicator.request_flush();
if last_known_frame == 0 {
tracing::debug!("No committed changes in this generation, not snapshotting");
ctx.replicator.skip_snapshot_for_current_generation();
return ffi::SQLITE_OK;
}
if let Err(e) = block_on!(
ctx.runtime,
ctx.replicator.wait_until_committed(last_known_frame)
) {
tracing::error!(
"Failed to finalize frame {} replication: {}",
last_known_frame,
e
);
return ffi::SQLITE_IOERR_WRITE;
}
if let Err(e) = block_on!(ctx.runtime, ctx.replicator.wait_until_snapshotted()) {
tracing::error!("Failed to finalize snapshot replication: {}", e);
return ffi::SQLITE_IOERR_WRITE;
}

/* If there's no busy handler, let's provide a default one,
** since we auto-upgrade the passive checkpoint
*/
Expand Down Expand Up @@ -342,31 +367,19 @@ pub extern "C" fn xCheckpoint(
return rc;
}

let ctx = get_replicator_context(wal);
let last_known_frame = ctx.replicator.last_known_frame();
ctx.replicator.request_flush();
if last_known_frame == 0 {
tracing::debug!("No committed changes in this generation, not snapshotting");
ctx.replicator.skip_snapshot_for_current_generation();
return ffi::SQLITE_OK;
}
if let Err(e) = block_on!(
ctx.runtime,
ctx.replicator.wait_until_committed(last_known_frame)
) {
tracing::error!("Failed to finalize replication: {}", e);
return ffi::SQLITE_IOERR_WRITE;
}

let prev = ctx.replicator.new_generation();
let _prev = ctx.replicator.new_generation();
tracing::debug!("Snapshotting after checkpoint");
let result = block_on!(ctx.runtime, ctx.replicator.snapshot_main_db_file(prev));
if let Err(e) = result {
tracing::error!(
"Failed to snapshot the main db file during checkpoint: {}",
e
);
return ffi::SQLITE_IOERR_WRITE;
match block_on!(ctx.runtime, ctx.replicator.snapshot_main_db_file()) {
Ok(_handle) => {
tracing::trace!("got snapshot handle");
}
Err(e) => {
tracing::error!(
"Failed to snapshot the main db file during checkpoint: {}",
e
);
return ffi::SQLITE_IOERR_WRITE;
}
}
tracing::debug!("Checkpoint completed in {:?}", Instant::now() - start);

Expand Down Expand Up @@ -417,7 +430,7 @@ async fn try_restore(replicator: &mut replicator::Replicator) -> i32 {
match replicator.restore(None, None).await {
Ok((replicator::RestoreAction::SnapshotMainDbFile, _)) => {
replicator.new_generation();
match replicator.snapshot_main_db_file(None).await {
match replicator.snapshot_main_db_file().await {
Ok(Some(h)) => {
if let Err(e) = h.await {
tracing::error!("Failed to join snapshot main db file task: {}", e);
Expand Down
56 changes: 31 additions & 25 deletions bottomless/src/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,18 +373,28 @@ impl Replicator {
self.last_sent_frame_no.load(Ordering::Acquire)
}

pub async fn wait_until_snapshotted(&mut self, generation: Uuid) -> Result<()> {
let res = self
.snapshot_waiter
.wait_for(|result| match result {
Ok(Some(gen)) => *gen == generation,
Ok(None) => false,
Err(_) => true,
})
.await?;
match res.deref() {
Ok(_) => Ok(()),
Err(e) => Err(anyhow!("Failed snapshot generation {}: {}", generation, e)),
pub async fn wait_until_snapshotted(&mut self) -> Result<bool> {
if let Ok(generation) = self.generation() {
if !self.main_db_exists_and_not_empty().await {
tracing::debug!("Not snapshotting, the main db file does not exist or is empty");
let _ = self.snapshot_notifier.send(Ok(Some(generation)));
return Ok(false);
}
tracing::debug!("waiting for generation snapshot {} to complete", generation);
let res = self
.snapshot_waiter
.wait_for(|result| match result {
Ok(Some(gen)) => *gen == generation,
Ok(None) => false,
Err(_) => true,
})
.await?;
match res.deref() {
Ok(_) => Ok(true),
Err(e) => Err(anyhow!("Failed snapshot generation {}: {}", generation, e)),
}
} else {
Ok(false)
}
}

Expand Down Expand Up @@ -706,23 +716,18 @@ impl Replicator {
// Sends the main database file to S3 - if -wal file is present, it's replicated
// too - it means that the local file was detected to be newer than its remote
// counterpart.
pub async fn snapshot_main_db_file(
&mut self,
prev_generation: Option<Uuid>,
) -> Result<Option<JoinHandle<()>>> {
pub async fn snapshot_main_db_file(&mut self) -> Result<Option<JoinHandle<()>>> {
if !self.main_db_exists_and_not_empty().await {
tracing::debug!("Not snapshotting, the main db file does not exist or is empty");
let _ = self.snapshot_notifier.send(Ok(prev_generation));
let generation = self.generation()?;
tracing::debug!(
"Not snapshotting {}, the main db file does not exist or is empty",
generation
);
let _ = self.snapshot_notifier.send(Ok(Some(generation)));
return Ok(None);
}
let generation = self.generation()?;
tracing::debug!("Snapshotting generation {}", generation);
let start_ts = Instant::now();
if let Some(prev) = prev_generation {
tracing::debug!("waiting for previous generation {} to complete", prev);
self.wait_until_snapshotted(prev).await?;
}

let client = self.client.clone();
let mut db_file = File::open(&self.db_path).await?;
let change_counter = Self::read_change_counter(&mut db_file).await?;
Expand All @@ -749,6 +754,7 @@ impl Replicator {
let snapshot_notifier = self.snapshot_notifier.clone();
let compression = self.use_compression;
let handle = tokio::spawn(async move {
tracing::trace!("Start snapshotting generation {}", generation);
let start = Instant::now();
let body = match Self::maybe_compress_main_db_file(db_file, compression).await {
Ok(file) => file,
Expand Down Expand Up @@ -788,7 +794,7 @@ impl Replicator {
let _ = tokio::fs::remove_file(format!("db.{}", compression)).await;
});
let elapsed = Instant::now() - start_ts;
tracing::debug!("Scheduled DB snapshot (took {:?})", elapsed);
tracing::debug!("Scheduled DB snapshot {} (took {:?})", generation, elapsed);

Ok(Some(handle))
}
Expand Down
3 changes: 2 additions & 1 deletion sqld/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@ const DB_CREATE_TIMEOUT: Duration = Duration::from_secs(1);
const DEFAULT_AUTO_CHECKPOINT: u32 = 1000;

pub(crate) static BLOCKING_RT: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_current_thread()
tokio::runtime::Builder::new_multi_thread()
.max_blocking_threads(50_000)
.enable_all()
.build()
.unwrap()
});
Expand Down
4 changes: 3 additions & 1 deletion sqld/src/namespace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,9 @@ pub async fn init_bottomless_replicator(
match action {
bottomless::replicator::RestoreAction::SnapshotMainDbFile => {
replicator.new_generation();
replicator.snapshot_main_db_file(None).await?;
if let Some(_handle) = replicator.snapshot_main_db_file().await? {
tracing::trace!("got snapshot handle after restore with generation upgrade");
}
// Restoration process only leaves the local WAL file if it was
// detected to be newer than its remote counterpart.
replicator.maybe_replicate_wal().await?
Expand Down
53 changes: 35 additions & 18 deletions sqld/src/replication/primary/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,39 @@ unsafe impl WalHook for ReplicationLoggerHook {
return SQLITE_BUSY;
}
}

#[allow(clippy::await_holding_lock)]
// uncontended -> only gets called under a libSQL write lock
{
let ctx = Self::wal_extract_ctx(wal);
let runtime = tokio::runtime::Handle::current();
if let Some(replicator) = ctx.bottomless_replicator.as_mut() {
let mut replicator = replicator.lock().unwrap();
let last_known_frame = replicator.last_known_frame();
replicator.request_flush();
if last_known_frame == 0 {
tracing::debug!("No comitted changes in this generation, not snapshotting");
replicator.skip_snapshot_for_current_generation();
return SQLITE_OK;
}
if let Err(e) = runtime.block_on(replicator.wait_until_committed(last_known_frame))
{
tracing::error!(
"Failed to wait for S3 replicator to confirm {} frames backup: {}",
last_known_frame,
e
);
return SQLITE_IOERR_WRITE;
}
if let Err(e) = runtime.block_on(replicator.wait_until_snapshotted()) {
tracing::error!(
"Failed to wait for S3 replicator to confirm database snapshot backup: {}",
e
);
return SQLITE_IOERR_WRITE;
}
}
}
let rc = unsafe {
orig(
wal,
Expand All @@ -229,25 +262,9 @@ unsafe impl WalHook for ReplicationLoggerHook {
let runtime = tokio::runtime::Handle::current();
if let Some(replicator) = ctx.bottomless_replicator.as_mut() {
let mut replicator = replicator.lock().unwrap();
let last_known_frame = replicator.last_known_frame();
replicator.request_flush();
if last_known_frame == 0 {
tracing::debug!("No comitted changes in this generation, not snapshotting");
replicator.skip_snapshot_for_current_generation();
return SQLITE_OK;
}
if let Err(e) = runtime.block_on(replicator.wait_until_committed(last_known_frame))
{
tracing::error!(
"Failed to wait for S3 replicator to confirm {} frames backup: {}",
last_known_frame,
e
);
return SQLITE_IOERR_WRITE;
}
let prev = replicator.new_generation();
let _prev = replicator.new_generation();
if let Err(e) =
runtime.block_on(async move { replicator.snapshot_main_db_file(prev).await })
runtime.block_on(async move { replicator.snapshot_main_db_file().await })
{
tracing::error!("Failed to snapshot the main db file during checkpoint: {e}");
return SQLITE_IOERR_WRITE;
Expand Down

0 comments on commit 0a42e6e

Please sign in to comment.