Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit 8d2d8b3

Browse files
committed
Snapshot merging
1 parent 4f059f5 commit 8d2d8b3

File tree

6 files changed

+530
-309
lines changed

6 files changed

+530
-309
lines changed

sqld/src/replication/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -529,7 +529,7 @@ impl ReadReplicationHook {
529529
}
530530
Err(e) => return Err(e.into()),
531531
};
532-
let frame = WalFrame::decode(raw_frame.data)?;
532+
let frame = WalFrame::try_from_bytes(raw_frame.data)?;
533533
debug_assert_eq!(
534534
Some(frame.header.frame_id),
535535
current_offset

sqld/src/replication/log_compaction.rs

Lines changed: 0 additions & 228 deletions
This file was deleted.

sqld/src/replication/logger.rs

Lines changed: 28 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,7 @@ use anyhow::{bail, ensure};
1010
use bytemuck::{bytes_of, pod_read_unaligned, try_from_bytes, Pod, Zeroable};
1111
use bytes::{Bytes, BytesMut};
1212
use crc::Crc;
13-
use once_cell::sync::Lazy;
1413
use parking_lot::RwLock;
15-
use regex::Regex;
1614
use rusqlite::ffi::SQLITE_ERROR;
1715
use uuid::Uuid;
1816

@@ -22,8 +20,7 @@ use crate::libsql::ffi::{
2220
};
2321
use crate::libsql::{ffi::PageHdrIter, wal_hook::WalHook};
2422

25-
use super::log_compaction::LogCompactor;
26-
use super::snapshot::SnapshotFile;
23+
use super::snapshot::{find_snapshot_file, LogCompactor, SnapshotFile};
2724

2825
pub const WAL_PAGE_SIZE: i32 = 4096;
2926
pub const WAL_MAGIC: u64 = u64::from_le_bytes(*b"SQLDWAL\0");
@@ -99,7 +96,7 @@ unsafe impl WalHook for ReplicationLoggerHook {
9996
.maybe_compact(
10097
self.logger.compactor.clone(),
10198
ntruncate,
102-
&self.logger.path,
99+
&self.logger.db_path,
103100
self.logger.current_checksum.load(Ordering::Relaxed),
104101
)
105102
.unwrap();
@@ -136,6 +133,24 @@ pub struct WalFrame {
136133
pub data: Bytes,
137134
}
138135

136+
impl WalFrame {
137+
/// size of a single frame
138+
pub const SIZE: usize = size_of::<FrameHeader>() + WAL_PAGE_SIZE as usize;
139+
140+
pub fn try_from_bytes(mut data: Bytes) -> anyhow::Result<Self> {
141+
let header_bytes = data.split_to(size_of::<FrameHeader>());
142+
ensure!(
143+
data.len() == WAL_PAGE_SIZE as usize,
144+
"invalid frame size, expected: {}, found: {}",
145+
WAL_PAGE_SIZE,
146+
data.len()
147+
);
148+
let header = FrameHeader::decode(&header_bytes)?;
149+
150+
Ok(Self { header, data })
151+
}
152+
}
153+
139154
pub struct WalFrameBorrowed<'a> {
140155
pub header: FrameHeader,
141156
pub data: &'a [u8],
@@ -150,20 +165,6 @@ impl<'a> From<&'a WalFrame> for WalFrameBorrowed<'a> {
150165
}
151166
}
152167

153-
impl WalFrame {
154-
pub fn decode(mut data: Bytes) -> anyhow::Result<Self> {
155-
let header_bytes = data.split_to(size_of::<FrameHeader>());
156-
ensure!(
157-
data.len() == WAL_PAGE_SIZE as usize,
158-
"invalid frame size, expected: {}, found: {}",
159-
WAL_PAGE_SIZE,
160-
data.len()
161-
);
162-
let header = FrameHeader::decode(&header_bytes)?;
163-
Ok(Self { header, data })
164-
}
165-
}
166-
167168
impl ReplicationLoggerHook {
168169
pub fn new(logger: Arc<ReplicationLogger>) -> Self {
169170
Self {
@@ -519,12 +520,12 @@ pub struct ReplicationLogger {
519520
pub generation: Generation,
520521
pub log_file: RwLock<LogFile>,
521522
compactor: LogCompactor,
522-
path: PathBuf,
523+
db_path: PathBuf,
523524
}
524525

525526
impl ReplicationLogger {
526-
pub fn open(path: impl AsRef<Path>, max_log_size: u64) -> anyhow::Result<Self> {
527-
let log_path = path.as_ref().join("wallog");
527+
pub fn open(db_path: &Path, max_log_size: u64) -> anyhow::Result<Self> {
528+
let log_path = db_path.join("wallog");
528529
let file = OpenOptions::new()
529530
.create(true)
530531
.write(true)
@@ -544,9 +545,9 @@ impl ReplicationLogger {
544545
Ok(Self {
545546
current_checksum,
546547
generation: Generation::new(generation_start_frame_id),
548+
compactor: LogCompactor::new(db_path, log_file.header.db_id)?,
547549
log_file: RwLock::new(log_file),
548-
compactor: LogCompactor::new(path.as_ref().to_owned()),
549-
path: path.as_ref().to_owned(),
550+
db_path: db_path.to_owned(),
550551
})
551552
}
552553

@@ -616,43 +617,8 @@ impl ReplicationLogger {
616617
.store(new_current_checksum, Ordering::Relaxed);
617618
}
618619

619-
pub async fn locate_snapshot(
620-
&self,
621-
start_idx: FrameId,
622-
) -> anyhow::Result<Option<SnapshotFile>> {
623-
static SNAPSHOT_FILE_MATCHER: Lazy<Regex> = Lazy::new(|| {
624-
Regex::new(
625-
r#"(?x)
626-
# match database id
627-
(\w{8}-\w{4}-\w{4}-\w{4}-\w{12})-
628-
# match start frame id
629-
(\d*)-
630-
# match end frame-id
631-
(\d*).snap"#,
632-
)
633-
.unwrap()
634-
});
635-
636-
let snapshots_dir = self.path.join("snapshots");
637-
let mut entries = tokio::fs::read_dir(snapshots_dir).await?;
638-
639-
while let Some(entry) = entries.next_entry().await? {
640-
let path = entry.path();
641-
let Some(name) = path.file_name() else {continue;};
642-
let Some(name_str) = name.to_str() else { continue;};
643-
let Some(captures) = SNAPSHOT_FILE_MATCHER.captures(name_str) else {continue;};
644-
let _db_id = captures.get(1).unwrap();
645-
let start_index: u64 = captures.get(2).unwrap().as_str().parse().unwrap();
646-
let end_index: u64 = captures.get(3).unwrap().as_str().parse().unwrap();
647-
// we're looking for the frame right after the last applied frame on the replica
648-
if (start_index..=end_index).contains(&(start_idx + 1)) {
649-
tracing::debug!("found snapshot for frame {start_idx} at {path:?}");
650-
let snapshot_file = SnapshotFile::open(&path)?;
651-
return Ok(Some(snapshot_file));
652-
}
653-
}
654-
655-
Ok(None)
620+
pub fn get_snapshot_file(&self, from: FrameId) -> anyhow::Result<Option<SnapshotFile>> {
621+
find_snapshot_file(&self.db_path, from)
656622
}
657623
}
658624

@@ -677,7 +643,7 @@ mod test {
677643

678644
let log_file = logger.log_file.write();
679645
for i in 0..10 {
680-
let frame = WalFrame::decode(log_file.frame_bytes(i).unwrap()).unwrap();
646+
let frame = WalFrame::try_from_bytes(log_file.frame_bytes(i).unwrap()).unwrap();
681647
assert_eq!(frame.header.page_no, i as u32);
682648
assert!(frame.data.iter().all(|x| i as u8 == *x));
683649
}

0 commit comments

Comments
 (0)