@@ -10,9 +10,7 @@ use anyhow::{bail, ensure};
10
10
use bytemuck:: { bytes_of, pod_read_unaligned, try_from_bytes, Pod , Zeroable } ;
11
11
use bytes:: { Bytes , BytesMut } ;
12
12
use crc:: Crc ;
13
- use once_cell:: sync:: Lazy ;
14
13
use parking_lot:: RwLock ;
15
- use regex:: Regex ;
16
14
use rusqlite:: ffi:: SQLITE_ERROR ;
17
15
use uuid:: Uuid ;
18
16
@@ -22,8 +20,7 @@ use crate::libsql::ffi::{
22
20
} ;
23
21
use crate :: libsql:: { ffi:: PageHdrIter , wal_hook:: WalHook } ;
24
22
25
- use super :: log_compaction:: LogCompactor ;
26
- use super :: snapshot:: SnapshotFile ;
23
+ use super :: snapshot:: { find_snapshot_file, LogCompactor , SnapshotFile } ;
27
24
28
25
pub const WAL_PAGE_SIZE : i32 = 4096 ;
29
26
pub const WAL_MAGIC : u64 = u64:: from_le_bytes ( * b"SQLDWAL\0 " ) ;
@@ -99,7 +96,7 @@ unsafe impl WalHook for ReplicationLoggerHook {
99
96
. maybe_compact (
100
97
self . logger . compactor . clone ( ) ,
101
98
ntruncate,
102
- & self . logger . path ,
99
+ & self . logger . db_path ,
103
100
self . logger . current_checksum . load ( Ordering :: Relaxed ) ,
104
101
)
105
102
. unwrap ( ) ;
@@ -136,6 +133,24 @@ pub struct WalFrame {
136
133
pub data : Bytes ,
137
134
}
138
135
136
+ impl WalFrame {
137
+ /// size of a single frame
138
+ pub const SIZE : usize = size_of :: < FrameHeader > ( ) + WAL_PAGE_SIZE as usize ;
139
+
140
+ pub fn try_from_bytes ( mut data : Bytes ) -> anyhow:: Result < Self > {
141
+ let header_bytes = data. split_to ( size_of :: < FrameHeader > ( ) ) ;
142
+ ensure ! (
143
+ data. len( ) == WAL_PAGE_SIZE as usize ,
144
+ "invalid frame size, expected: {}, found: {}" ,
145
+ WAL_PAGE_SIZE ,
146
+ data. len( )
147
+ ) ;
148
+ let header = FrameHeader :: decode ( & header_bytes) ?;
149
+
150
+ Ok ( Self { header, data } )
151
+ }
152
+ }
153
+
139
154
pub struct WalFrameBorrowed < ' a > {
140
155
pub header : FrameHeader ,
141
156
pub data : & ' a [ u8 ] ,
@@ -150,20 +165,6 @@ impl<'a> From<&'a WalFrame> for WalFrameBorrowed<'a> {
150
165
}
151
166
}
152
167
153
- impl WalFrame {
154
- pub fn decode ( mut data : Bytes ) -> anyhow:: Result < Self > {
155
- let header_bytes = data. split_to ( size_of :: < FrameHeader > ( ) ) ;
156
- ensure ! (
157
- data. len( ) == WAL_PAGE_SIZE as usize ,
158
- "invalid frame size, expected: {}, found: {}" ,
159
- WAL_PAGE_SIZE ,
160
- data. len( )
161
- ) ;
162
- let header = FrameHeader :: decode ( & header_bytes) ?;
163
- Ok ( Self { header, data } )
164
- }
165
- }
166
-
167
168
impl ReplicationLoggerHook {
168
169
pub fn new ( logger : Arc < ReplicationLogger > ) -> Self {
169
170
Self {
@@ -519,12 +520,12 @@ pub struct ReplicationLogger {
519
520
pub generation : Generation ,
520
521
pub log_file : RwLock < LogFile > ,
521
522
compactor : LogCompactor ,
522
- path : PathBuf ,
523
+ db_path : PathBuf ,
523
524
}
524
525
525
526
impl ReplicationLogger {
526
- pub fn open ( path : impl AsRef < Path > , max_log_size : u64 ) -> anyhow:: Result < Self > {
527
- let log_path = path . as_ref ( ) . join ( "wallog" ) ;
527
+ pub fn open ( db_path : & Path , max_log_size : u64 ) -> anyhow:: Result < Self > {
528
+ let log_path = db_path . join ( "wallog" ) ;
528
529
let file = OpenOptions :: new ( )
529
530
. create ( true )
530
531
. write ( true )
@@ -544,9 +545,9 @@ impl ReplicationLogger {
544
545
Ok ( Self {
545
546
current_checksum,
546
547
generation : Generation :: new ( generation_start_frame_id) ,
548
+ compactor : LogCompactor :: new ( db_path, log_file. header . db_id ) ?,
547
549
log_file : RwLock :: new ( log_file) ,
548
- compactor : LogCompactor :: new ( path. as_ref ( ) . to_owned ( ) ) ,
549
- path : path. as_ref ( ) . to_owned ( ) ,
550
+ db_path : db_path. to_owned ( ) ,
550
551
} )
551
552
}
552
553
@@ -616,43 +617,8 @@ impl ReplicationLogger {
616
617
. store ( new_current_checksum, Ordering :: Relaxed ) ;
617
618
}
618
619
619
- pub async fn locate_snapshot (
620
- & self ,
621
- start_idx : FrameId ,
622
- ) -> anyhow:: Result < Option < SnapshotFile > > {
623
- static SNAPSHOT_FILE_MATCHER : Lazy < Regex > = Lazy :: new ( || {
624
- Regex :: new (
625
- r#"(?x)
626
- # match database id
627
- (\w{8}-\w{4}-\w{4}-\w{4}-\w{12})-
628
- # match start frame id
629
- (\d*)-
630
- # match end frame-id
631
- (\d*).snap"# ,
632
- )
633
- . unwrap ( )
634
- } ) ;
635
-
636
- let snapshots_dir = self . path . join ( "snapshots" ) ;
637
- let mut entries = tokio:: fs:: read_dir ( snapshots_dir) . await ?;
638
-
639
- while let Some ( entry) = entries. next_entry ( ) . await ? {
640
- let path = entry. path ( ) ;
641
- let Some ( name) = path. file_name ( ) else { continue ; } ;
642
- let Some ( name_str) = name. to_str ( ) else { continue ; } ;
643
- let Some ( captures) = SNAPSHOT_FILE_MATCHER . captures ( name_str) else { continue ; } ;
644
- let _db_id = captures. get ( 1 ) . unwrap ( ) ;
645
- let start_index: u64 = captures. get ( 2 ) . unwrap ( ) . as_str ( ) . parse ( ) . unwrap ( ) ;
646
- let end_index: u64 = captures. get ( 3 ) . unwrap ( ) . as_str ( ) . parse ( ) . unwrap ( ) ;
647
- // we're looking for the frame right after the last applied frame on the replica
648
- if ( start_index..=end_index) . contains ( & ( start_idx + 1 ) ) {
649
- tracing:: debug!( "found snapshot for frame {start_idx} at {path:?}" ) ;
650
- let snapshot_file = SnapshotFile :: open ( & path) ?;
651
- return Ok ( Some ( snapshot_file) ) ;
652
- }
653
- }
654
-
655
- Ok ( None )
620
+ pub fn get_snapshot_file ( & self , from : FrameId ) -> anyhow:: Result < Option < SnapshotFile > > {
621
+ find_snapshot_file ( & self . db_path , from)
656
622
}
657
623
}
658
624
@@ -677,7 +643,7 @@ mod test {
677
643
678
644
let log_file = logger. log_file . write ( ) ;
679
645
for i in 0 ..10 {
680
- let frame = WalFrame :: decode ( log_file. frame_bytes ( i) . unwrap ( ) ) . unwrap ( ) ;
646
+ let frame = WalFrame :: try_from_bytes ( log_file. frame_bytes ( i) . unwrap ( ) ) . unwrap ( ) ;
681
647
assert_eq ! ( frame. header. page_no, i as u32 ) ;
682
648
assert ! ( frame. data. iter( ) . all( |x| i as u8 == * x) ) ;
683
649
}
0 commit comments