diff --git a/sqld/src/replication/frame.rs b/sqld/src/replication/frame.rs index 17deb60f..4a990eb3 100644 --- a/sqld/src/replication/frame.rs +++ b/sqld/src/replication/frame.rs @@ -34,6 +34,66 @@ pub struct Frame { data: Bytes, } +#[repr(transparent)] +#[derive(Clone, Copy, Debug, Zeroable, Pod)] +// NOTICE: frame number 0 indicates that the frame is in the main db file. +// Any other number indicates that it's in the WAL file. +// We do not use an enum here in order to make this struct transparently +// serializable for C code and on-disk representation. +pub struct FrameLocation { + pub frame_no: u32, +} + +impl FrameLocation { + pub const IN_MAIN_DB_FILE: u32 = 0; + + pub fn new(frame_no: u32) -> Self { + Self { frame_no } + } + + pub fn in_wal_file(frame_no: u32) -> Self { + assert_ne!(frame_no, FrameLocation::IN_MAIN_DB_FILE); + Self { frame_no } + } + + pub fn in_main_db_file() -> Self { + Self { + frame_no: Self::IN_MAIN_DB_FILE, + } + } +} + +#[repr(C)] +#[derive(Clone, Copy, Debug, Zeroable, Pod)] +pub struct FrameRef { + pub header: FrameHeader, + pub location: FrameLocation, + _pad: u32, +} + +impl FrameRef { + pub const SIZE: usize = size_of::(); + + pub fn new(header: FrameHeader, location: FrameLocation) -> Self { + Self { + header, + location, + _pad: 0, + } + } + + pub fn as_bytes(&self) -> Bytes { + Bytes::copy_from_slice(bytes_of(self)) + } + + pub fn try_from_bytes(data: Bytes) -> anyhow::Result { + anyhow::ensure!(data.len() == Self::SIZE, "invalid frame size"); + try_from_bytes(&data) + .copied() + .map_err(|e| anyhow::anyhow!(e)) + } +} + impl fmt::Debug for Frame { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Frame") diff --git a/sqld/src/replication/primary/logger.rs b/sqld/src/replication/primary/logger.rs index 0a4a44d6..3eed1532 100644 --- a/sqld/src/replication/primary/logger.rs +++ b/sqld/src/replication/primary/logger.rs @@ -1,3 +1,4 @@ +use bytes::BytesMut; use std::ffi::{c_int, c_void, CStr}; use std::fs::{remove_dir_all, File, OpenOptions}; use std::io::Write; @@ -8,21 +9,18 @@ use std::sync::Arc; use anyhow::{bail, ensure}; use bytemuck::{bytes_of, pod_read_unaligned, Pod, Zeroable}; -use bytes::{Bytes, BytesMut}; use parking_lot::RwLock; use sqld_libsql_bindings::init_static_wal_method; use tokio::sync::watch; use uuid::Uuid; -#[cfg(feature = "bottomless")] -use crate::libsql::ffi::SQLITE_IOERR_WRITE; use crate::libsql::ffi::{ sqlite3, types::{XWalCheckpointFn, XWalFrameFn, XWalSavePointUndoFn, XWalUndoFn}, - PageHdrIter, PgHdr, Wal, SQLITE_CHECKPOINT_TRUNCATE, SQLITE_IOERR, SQLITE_OK, + PageHdrIter, PgHdr, Wal, SQLITE_CHECKPOINT_TRUNCATE, SQLITE_IOERR_WRITE, SQLITE_OK, }; use crate::libsql::wal_hook::WalHook; -use crate::replication::frame::{Frame, FrameHeader}; +use crate::replication::frame::{Frame, FrameHeader, FrameLocation, FrameRef}; use crate::replication::snapshot::{find_snapshot_file, LogCompactor, SnapshotFile}; use crate::replication::{FrameNo, CRC_64_GO_ISO, WAL_MAGIC, WAL_PAGE_SIZE}; @@ -75,6 +73,7 @@ unsafe impl WalHook for ReplicationLoggerHook { orig: XWalFrameFn, ) -> c_int { assert_eq!(page_size, 4096); + let mut frame_no = wal.hdr.mxFrame + 1; let wal_ptr = wal as *mut _; #[cfg(feature = "bottomless")] let last_valid_frame = wal.hdr.mxFrame; @@ -82,13 +81,14 @@ unsafe impl WalHook for ReplicationLoggerHook { let frame_checksum = wal.hdr.aFrameCksum; let ctx = Self::wal_extract_ctx(wal); - for (page_no, data) in PageHdrIter::new(page_headers, page_size as _) { - ctx.write_frame(page_no, data) + for (page_no, _data) in PageHdrIter::new(page_headers, page_size as _) { + ctx.write_frame(page_no, FrameLocation::in_wal_file(frame_no)); + frame_no += 1; } if let Err(e) = ctx.flush(ntruncate) { tracing::error!("error writing to replication log: {e}"); // returning IO_ERR ensure that xUndo will be called by sqlite. - return SQLITE_IOERR; + return SQLITE_IOERR_WRITE; } // FIXME: instead of block_on, we should consider replicating asynchronously in the background, @@ -294,7 +294,7 @@ pub struct WalPage { pub page_no: u32, /// 0 for non-commit frames pub size_after: u32, - pub data: Bytes, + pub data: FrameLocation, } impl ReplicationLoggerHookCtx { @@ -314,11 +314,11 @@ impl ReplicationLoggerHookCtx { } } - fn write_frame(&mut self, page_no: u32, data: &[u8]) { + fn write_frame(&mut self, page_no: u32, data: FrameLocation) { let entry = WalPage { page_no, size_after: 0, - data: Bytes::copy_from_slice(data), + data, }; self.buffer.push(entry); } @@ -352,6 +352,7 @@ impl ReplicationLoggerHookCtx { #[derive(Debug)] pub struct LogFile { file: File, + db_path: PathBuf, // actual data of the logged frames resides either in the main db file or the wal file pub header: LogFileHeader, /// the maximum number of frames this log is allowed to contain before it should be compacted. max_log_frame_count: u64, @@ -379,9 +380,12 @@ pub enum LogReadError { impl LogFile { /// size of a single frame + /// FIXME: LogFile should only ever use references -> what to do with snapshots? pub const FRAME_SIZE: usize = size_of::() + WAL_PAGE_SIZE as usize; + /// size of a single frame reference + pub const FRAME_REF_SIZE: usize = size_of::(); - pub fn new(file: File, max_log_frame_count: u64) -> anyhow::Result { + pub fn new(file: File, db_path: PathBuf, max_log_frame_count: u64) -> anyhow::Result { // FIXME: we should probably take a lock on this file, to prevent anybody else to write to // it. let file_end = file.metadata()?.len(); @@ -401,6 +405,7 @@ impl LogFile { let mut this = Self { file, + db_path, header, max_log_frame_count, uncommitted_frame_count: 0, @@ -415,6 +420,7 @@ impl LogFile { let header = Self::read_header(&file)?; let mut this = Self { file, + db_path, header, max_log_frame_count, uncommitted_frame_count: 0, @@ -504,30 +510,33 @@ impl LogFile { })) } - fn compute_checksum(&self, page: &WalPage) -> u64 { - let mut digest = CRC_64_GO_ISO.digest_with_initial(self.uncommitted_checksum); - digest.update(&page.data); + fn compute_checksum(&self, _page: &WalPage) -> u64 { + let digest = CRC_64_GO_ISO.digest_with_initial(self.uncommitted_checksum); + // FIXME: we should either read the page from its location and compute checksum, + // or just rely on the fact that the page is already checksummed by WAL or the main db file. + //digest.update(&page.data); digest.finalize() } pub fn push_page(&mut self, page: &WalPage) -> anyhow::Result<()> { let checksum = self.compute_checksum(page); - let frame = Frame::from_parts( - &FrameHeader { - frame_no: self.next_frame_no(), - checksum, - page_no: page.page_no, - size_after: page.size_after, - }, - &page.data, - ); + let header = FrameHeader { + frame_no: self.next_frame_no(), + checksum, + page_no: page.page_no, + size_after: page.size_after, + }; + + let frame_ref = FrameRef::new(header, page.data); let byte_offset = self.next_byte_offset(); - tracing::trace!( - "writing frame {} at offset {byte_offset}", - frame.header().frame_no + let data = frame_ref.as_bytes(); + tracing::warn!( + "writing frame {} at offset {byte_offset}, size {}", + frame_ref.header.frame_no, + data.len() ); - self.file.write_all_at(frame.as_slice(), byte_offset)?; + self.file.write_all_at(&data, byte_offset)?; self.uncommitted_frame_count += 1; self.uncommitted_checksum = checksum; @@ -546,7 +555,7 @@ impl LogFile { /// Returns the bytes position of the `nth` entry in the log fn absolute_byte_offset(nth: u64) -> u64 { - std::mem::size_of::() as u64 + nth * Self::FRAME_SIZE as u64 + std::mem::size_of::() as u64 + nth * Self::FRAME_REF_SIZE as u64 } fn byte_offset(&self, id: FrameNo) -> anyhow::Result> { @@ -602,7 +611,7 @@ impl LogFile { .write(true) .create(true) .open(&temp_log_path)?; - let mut new_log_file = LogFile::new(file, self.max_log_frame_count)?; + let mut new_log_file = LogFile::new(file, self.db_path.clone(), self.max_log_frame_count)?; let new_header = LogFileHeader { start_frame_no: self.header.start_frame_no + self.header.frame_count, frame_count: 0, @@ -620,11 +629,39 @@ impl LogFile { } fn read_frame_byte_offset(&self, offset: u64) -> anyhow::Result { - let mut buffer = BytesMut::zeroed(LogFile::FRAME_SIZE); + let mut buffer = BytesMut::zeroed(LogFile::FRAME_REF_SIZE); self.file.read_exact_at(&mut buffer, offset)?; - let buffer = buffer.freeze(); + tracing::trace!("Buffer size {}", buffer.len()); + let frame_ref = FrameRef::try_from_bytes(buffer.freeze())?; + + tracing::trace!("Frame reference: {frame_ref:?}"); + + let mut frame_data = [0; WAL_PAGE_SIZE as usize]; + match frame_ref.location { + FrameLocation { + frame_no: FrameLocation::IN_MAIN_DB_FILE, + } => { + let main_db_file = std::fs::File::open(self.db_path.join("data"))?; + main_db_file.read_exact_at( + &mut frame_data, + frame_ref.header.page_no as u64 * WAL_PAGE_SIZE as u64, + )?; + } + FrameLocation { frame_no } => { + let wal_file = std::fs::File::open(self.db_path.join("data-wal"))?; + // FIXME: this is *not* the correct way to read a frame from the wal file. + // It needs to take into account the wal file header, and the wal page headers. + wal_file.read_exact_at(&mut frame_data, Self::offset_in_wal(frame_no))?; + } + } + + // FIXME: memory copy, easy enough to avoid + Ok(Frame::from_parts(&frame_ref.header, &frame_data)) + } - Frame::try_from_bytes(buffer) + // The offset of frame `frame_no` in the libSQL WAL file + fn offset_in_wal(frame_no: u32) -> u64 { + 32 + ((frame_no - 1) as u64) * (WAL_PAGE_SIZE as u64 + 24) } fn last_commited_frame_no(&self) -> Option { @@ -639,7 +676,7 @@ impl LogFile { let max_log_frame_count = self.max_log_frame_count; // truncate file self.file.set_len(0)?; - Self::new(self.file, max_log_frame_count) + Self::new(self.file, self.db_path, max_log_frame_count) } } @@ -754,7 +791,7 @@ impl ReplicationLogger { .open(log_path)?; let max_log_frame_count = max_log_size * 1_000_000 / LogFile::FRAME_SIZE as u64; - let log_file = LogFile::new(file, max_log_frame_count)?; + let log_file = LogFile::new(file, db_path.to_owned(), max_log_frame_count)?; let header = log_file.header(); let should_recover = if header.version < 2 || header.sqld_version() != Version::current() { @@ -798,21 +835,18 @@ impl ReplicationLogger { // best effort, there may be no snapshots let _ = remove_dir_all(snapshot_path); - let data_file = File::open(&data_path)?; let size = data_path.metadata()?.len(); assert!( size % WAL_PAGE_SIZE as u64 == 0, "database file size is not a multiple of page size" ); let num_page = size / WAL_PAGE_SIZE as u64; - let mut buf = [0; WAL_PAGE_SIZE as usize]; let mut page_no = 1; // page numbering starts at 1 for i in 0..num_page { - data_file.read_exact_at(&mut buf, i * WAL_PAGE_SIZE as u64)?; log_file.push_page(&WalPage { page_no, size_after: if i == num_page - 1 { num_page as _ } else { 0 }, - data: Bytes::copy_from_slice(&buf), + data: FrameLocation::in_main_db_file(), // log recovery is performed from the main db file })?; log_file.commit()?; @@ -918,10 +952,14 @@ mod test { .map(|i| WalPage { page_no: i, size_after: 0, - data: Bytes::from(vec![i as _; 4096]), + data: FrameLocation::in_main_db_file(), }) .collect::>(); logger.write_pages(&frames).unwrap(); + let main_db_file = std::fs::File::create(dir.path().join("data")).unwrap(); + for i in 0..10 { + main_db_file.write_at(&[i as u8; 4096], i * 4096).unwrap(); + } logger.commit().unwrap(); let log_file = logger.log_file.write(); @@ -945,30 +983,16 @@ mod test { assert!(matches!(log_file.frame(1), Err(LogReadError::Ahead))); } - #[test] - #[should_panic] - fn incorrect_frame_size() { - let dir = tempfile::tempdir().unwrap(); - let logger = ReplicationLogger::open(dir.path(), 0).unwrap(); - let entry = WalPage { - page_no: 0, - size_after: 0, - data: vec![0; 3].into(), - }; - - logger.write_pages(&[entry]).unwrap(); - logger.commit().unwrap(); - } - #[test] fn log_file_test_rollback() { + let db = tempfile::tempdir().unwrap(); let f = tempfile::tempfile().unwrap(); - let mut log_file = LogFile::new(f, 100).unwrap(); + let mut log_file = LogFile::new(f, db.path().to_owned(), 100).unwrap(); (0..5) .map(|i| WalPage { page_no: i, size_after: 5, - data: Bytes::from_static(&[1; 4096]), + data: FrameLocation::in_main_db_file(), // FIXME: actually fill the fake main db file with data }) .for_each(|p| { log_file.push_page(&p).unwrap(); @@ -982,7 +1006,7 @@ mod test { .map(|i| WalPage { page_no: i, size_after: 5, - data: Bytes::from_static(&[1; 4096]), + data: FrameLocation::in_main_db_file(), }) .for_each(|p| { log_file.push_page(&p).unwrap(); @@ -995,7 +1019,7 @@ mod test { .push_page(&WalPage { page_no: 42, size_after: 5, - data: Bytes::from_static(&[1; 4096]), + data: FrameLocation::in_main_db_file(), }) .unwrap(); diff --git a/sqld/src/replication/snapshot.rs b/sqld/src/replication/snapshot.rs index 01da387d..9b7ef4c0 100644 --- a/sqld/src/replication/snapshot.rs +++ b/sqld/src/replication/snapshot.rs @@ -455,7 +455,7 @@ mod test { use bytes::Bytes; use tempfile::tempdir; - use crate::replication::frame::FrameHeader; + use crate::replication::frame::{FrameHeader, FrameLocation}; use crate::replication::primary::logger::WalPage; use crate::replication::snapshot::SnapshotFile; @@ -464,7 +464,13 @@ mod test { #[test] fn compact_file_create_snapshot() { let temp = tempfile::NamedTempFile::new().unwrap(); - let mut log_file = LogFile::new(temp.as_file().try_clone().unwrap(), 0).unwrap(); + let temp_db = tempfile::tempdir().unwrap(); + let mut log_file = LogFile::new( + temp.as_file().try_clone().unwrap(), + temp_db.path().to_owned(), + 0, + ) + .unwrap(); let db_id = Uuid::new_v4(); log_file.header.db_id = db_id.as_u128(); log_file.write_header().unwrap(); @@ -472,16 +478,22 @@ mod test { // add 50 pages, each one in two versions for _ in 0..2 { for i in 0..25 { - let data = std::iter::repeat(0).take(4096).collect::(); + let _data = std::iter::repeat(0).take(4096).collect::(); let page = WalPage { page_no: i, size_after: i + 1, - data, + data: FrameLocation::in_main_db_file(), }; log_file.push_page(&page).unwrap(); } } + // Fill the fake main db file with the aforementioned 50 pages + std::fs::File::create(temp_db.path().join("data")) + .unwrap() + .write_all(&[0; 4096 * 50]) + .unwrap(); + log_file.commit().unwrap(); let dump_dir = tempdir().unwrap();