Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit c63b220

Browse files
authored
wal recovery (#447)
1 parent 109c515 commit c63b220

File tree

3 files changed

+131
-51
lines changed

3 files changed

+131
-51
lines changed

Cargo.lock

Lines changed: 2 additions & 38 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sqld-libsql-bindings/src/ffi/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ pub use rusqlite::ffi::{
1111

1212
pub use rusqlite::ffi::libsql_pghdr as PgHdr;
1313
pub use rusqlite::ffi::libsql_wal as Wal;
14+
pub use rusqlite::ffi::*;
1415

1516
pub struct PageHdrIter {
1617
current_ptr: *const PgHdr,

sqld/src/replication/primary/logger.rs

Lines changed: 128 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::ffi::{c_int, c_void, CStr};
2-
use std::fs::{File, OpenOptions};
2+
use std::fs::{remove_dir_all, File, OpenOptions};
33
use std::io::Write;
44
use std::mem::size_of;
55
use std::os::unix::prelude::FileExt;
@@ -10,25 +10,36 @@ use anyhow::{bail, ensure};
1010
use bytemuck::{bytes_of, pod_read_unaligned, Pod, Zeroable};
1111
use bytes::{Bytes, BytesMut};
1212
use parking_lot::RwLock;
13-
use rusqlite::ffi::SQLITE_IOERR;
1413
use sqld_libsql_bindings::init_static_wal_method;
1514
use tokio::sync::watch;
1615
use uuid::Uuid;
1716

17+
#[cfg(feature = "bottomless")]
18+
use crate::libsql::ffi::SQLITE_IOERR_WRITE;
1819
use crate::libsql::ffi::{
1920
sqlite3,
2021
types::{XWalCheckpointFn, XWalFrameFn, XWalSavePointUndoFn, XWalUndoFn},
21-
PgHdr, Wal, SQLITE_OK,
22+
PageHdrIter, PgHdr, Wal, SQLITE_CHECKPOINT_TRUNCATE, SQLITE_IOERR, SQLITE_OK,
2223
};
23-
#[cfg(feature = "bottomless")]
24-
use crate::libsql::ffi::{SQLITE_CHECKPOINT_TRUNCATE, SQLITE_IOERR_WRITE};
25-
use crate::libsql::{ffi::PageHdrIter, wal_hook::WalHook};
24+
use crate::libsql::wal_hook::WalHook;
2625
use crate::replication::frame::{Frame, FrameHeader};
2726
use crate::replication::snapshot::{find_snapshot_file, LogCompactor, SnapshotFile};
2827
use crate::replication::{FrameNo, CRC_64_GO_ISO, WAL_MAGIC, WAL_PAGE_SIZE};
2928

3029
init_static_wal_method!(REPLICATION_METHODS, ReplicationLoggerHook);
3130

31+
#[derive(PartialEq, Eq)]
32+
struct Version([u16; 4]);
33+
34+
impl Version {
35+
fn current() -> Self {
36+
let major = env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap();
37+
let minor = env!("CARGO_PKG_VERSION_MINOR").parse().unwrap();
38+
let patch = env!("CARGO_PKG_VERSION_PATCH").parse().unwrap();
39+
Self([0, major, minor, patch])
40+
}
41+
}
42+
3243
pub enum ReplicationLoggerHook {}
3344

3445
#[derive(Clone)]
@@ -378,14 +389,14 @@ impl LogFile {
378389
if file_end == 0 {
379390
let db_id = Uuid::new_v4();
380391
let header = LogFileHeader {
381-
version: 1,
392+
version: 2,
382393
start_frame_no: 0,
383394
magic: WAL_MAGIC,
384395
page_size: WAL_PAGE_SIZE,
385396
start_checksum: 0,
386397
db_id: db_id.as_u128(),
387398
frame_count: 0,
388-
_pad: 0,
399+
sqld_version: Version::current().0,
389400
};
390401

391402
let mut this = Self {
@@ -623,6 +634,13 @@ impl LogFile {
623634
Some(self.header.start_frame_no + self.header.frame_count - 1)
624635
}
625636
}
637+
638+
fn reset(self) -> anyhow::Result<Self> {
639+
let max_log_frame_count = self.max_log_frame_count;
640+
// truncate file
641+
self.file.set_len(0)?;
642+
Self::new(self.file, max_log_frame_count)
643+
}
626644
}
627645

628646
#[cfg(target_os = "macos")]
@@ -680,17 +698,22 @@ pub struct LogFileHeader {
680698
pub start_frame_no: FrameNo,
681699
/// entry count in file
682700
pub frame_count: u64,
683-
/// Wal file version number, currently: 1
701+
/// Wal file version number, currently: 2
684702
pub version: u32,
685703
/// page size: 4096
686704
pub page_size: i32,
687-
pub _pad: u64,
705+
/// sqld version when creating this log
706+
pub sqld_version: [u16; 4],
688707
}
689708

690709
impl LogFileHeader {
691710
pub fn last_frame_no(&self) -> FrameNo {
692711
self.start_frame_no + self.frame_count
693712
}
713+
714+
fn sqld_version(&self) -> Version {
715+
Version(self.sqld_version)
716+
}
694717
}
695718

696719
pub struct Generation {
@@ -720,28 +743,87 @@ pub struct ReplicationLogger {
720743
impl ReplicationLogger {
721744
pub fn open(db_path: &Path, max_log_size: u64) -> anyhow::Result<Self> {
722745
let log_path = db_path.join("wallog");
746+
let data_path = db_path.join("data");
747+
748+
let fresh = !log_path.exists();
749+
723750
let file = OpenOptions::new()
724751
.create(true)
725752
.write(true)
726753
.read(true)
727754
.open(log_path)?;
755+
728756
let max_log_frame_count = max_log_size * 1_000_000 / LogFile::FRAME_SIZE as u64;
729757
let log_file = LogFile::new(file, max_log_frame_count)?;
758+
let header = log_file.header();
759+
760+
let should_recover = if header.version < 2 || header.sqld_version() != Version::current() {
761+
tracing::info!("replication log version not compatible with current sqld version, recovering from database file.");
762+
true
763+
} else if fresh && data_path.exists() {
764+
tracing::info!("replication log not found, recovering from database file.");
765+
true
766+
} else {
767+
false
768+
};
769+
770+
if should_recover {
771+
Self::recover(log_file, data_path)
772+
} else {
773+
Self::from_log_file(db_path.to_path_buf(), log_file)
774+
}
775+
}
730776

731-
let header = log_file.header;
777+
fn from_log_file(db_path: PathBuf, log_file: LogFile) -> anyhow::Result<Self> {
778+
let header = log_file.header();
732779
let generation_start_frame_no = header.start_frame_no + header.frame_count;
733780

734781
let (new_frame_notifier, _) = watch::channel(generation_start_frame_no);
735782

736783
Ok(Self {
737784
generation: Generation::new(generation_start_frame_no),
738-
compactor: LogCompactor::new(db_path, log_file.header.db_id)?,
785+
compactor: LogCompactor::new(&db_path, log_file.header.db_id)?,
739786
log_file: RwLock::new(log_file),
740-
db_path: db_path.to_owned(),
787+
db_path,
741788
new_frame_notifier,
742789
})
743790
}
744791

792+
fn recover(log_file: LogFile, mut data_path: PathBuf) -> anyhow::Result<Self> {
793+
// It is necessary to checkpoint before we restore the replication log, since the WAL may
794+
// contain pages that are not in the database file.
795+
checkpoint_db(&data_path)?;
796+
let mut log_file = log_file.reset()?;
797+
let snapshot_path = data_path.parent().unwrap().join("snapshots");
798+
// best effort, there may be no snapshots
799+
let _ = remove_dir_all(snapshot_path);
800+
801+
let data_file = File::open(&data_path)?;
802+
let size = data_path.metadata()?.len();
803+
assert!(
804+
size % WAL_PAGE_SIZE as u64 == 0,
805+
"database file size is not a multiple of page size"
806+
);
807+
let num_page = size / WAL_PAGE_SIZE as u64;
808+
let mut buf = [0; WAL_PAGE_SIZE as usize];
809+
let mut page_no = 1; // page numbering starts at 1
810+
for i in 0..num_page {
811+
data_file.read_exact_at(&mut buf, i * WAL_PAGE_SIZE as u64)?;
812+
log_file.push_page(&WalPage {
813+
page_no,
814+
size_after: if i == num_page - 1 { num_page as _ } else { 0 },
815+
data: Bytes::copy_from_slice(&buf),
816+
})?;
817+
log_file.commit()?;
818+
819+
page_no += 1;
820+
}
821+
822+
assert!(data_path.pop());
823+
824+
Self::from_log_file(data_path, log_file)
825+
}
826+
745827
pub fn database_id(&self) -> anyhow::Result<Uuid> {
746828
Ok(Uuid::from_u128((self.log_file.read()).header().db_id))
747829
}
@@ -790,6 +872,39 @@ impl ReplicationLogger {
790872
}
791873
}
792874

875+
fn checkpoint_db(data_path: &Path) -> anyhow::Result<()> {
876+
unsafe {
877+
let conn = rusqlite::Connection::open(data_path)?;
878+
conn.pragma_query(None, "page_size", |row| {
879+
let page_size = row.get::<_, i32>(0).unwrap();
880+
assert_eq!(
881+
page_size, WAL_PAGE_SIZE,
882+
"invalid database file, expected page size to be {}, but found {} instead",
883+
WAL_PAGE_SIZE, page_size
884+
);
885+
Ok(())
886+
})?;
887+
let mut num_checkpointed: c_int = 0;
888+
let rc = rusqlite::ffi::sqlite3_wal_checkpoint_v2(
889+
conn.handle(),
890+
std::ptr::null(),
891+
SQLITE_CHECKPOINT_TRUNCATE,
892+
&mut num_checkpointed as *mut _,
893+
std::ptr::null_mut(),
894+
);
895+
896+
// TODO: ensure correct page size
897+
ensure!(
898+
rc == 0 && num_checkpointed >= 0,
899+
"failed to checkpoint database while recovering replication log"
900+
);
901+
902+
conn.execute("VACUUM", ())?;
903+
}
904+
905+
Ok(())
906+
}
907+
793908
#[cfg(test)]
794909
mod test {
795910
use super::*;

0 commit comments

Comments
 (0)