1
+ use bytes:: BytesMut ;
1
2
use std:: ffi:: { c_int, c_void, CStr } ;
2
3
use std:: fs:: { remove_dir_all, File , OpenOptions } ;
3
4
use std:: io:: Write ;
@@ -8,21 +9,18 @@ use std::sync::Arc;
8
9
9
10
use anyhow:: { bail, ensure} ;
10
11
use bytemuck:: { bytes_of, pod_read_unaligned, Pod , Zeroable } ;
11
- use bytes:: { Bytes , BytesMut } ;
12
12
use parking_lot:: RwLock ;
13
13
use sqld_libsql_bindings:: init_static_wal_method;
14
14
use tokio:: sync:: watch;
15
15
use uuid:: Uuid ;
16
16
17
- #[ cfg( feature = "bottomless" ) ]
18
- use crate :: libsql:: ffi:: SQLITE_IOERR_WRITE ;
19
17
use crate :: libsql:: ffi:: {
20
18
sqlite3,
21
19
types:: { XWalCheckpointFn , XWalFrameFn , XWalSavePointUndoFn , XWalUndoFn } ,
22
- PageHdrIter , PgHdr , Wal , SQLITE_CHECKPOINT_TRUNCATE , SQLITE_IOERR , SQLITE_OK ,
20
+ PageHdrIter , PgHdr , Wal , SQLITE_CHECKPOINT_TRUNCATE , SQLITE_IOERR_WRITE , SQLITE_OK ,
23
21
} ;
24
22
use crate :: libsql:: wal_hook:: WalHook ;
25
- use crate :: replication:: frame:: { Frame , FrameHeader } ;
23
+ use crate :: replication:: frame:: { Frame , FrameHeader , FrameLocation , FrameRef } ;
26
24
use crate :: replication:: snapshot:: { find_snapshot_file, LogCompactor , SnapshotFile } ;
27
25
use crate :: replication:: { FrameNo , CRC_64_GO_ISO , WAL_MAGIC , WAL_PAGE_SIZE } ;
28
26
@@ -75,20 +73,22 @@ unsafe impl WalHook for ReplicationLoggerHook {
75
73
orig : XWalFrameFn ,
76
74
) -> c_int {
77
75
assert_eq ! ( page_size, 4096 ) ;
76
+ let mut frame_no = wal. hdr . mxFrame + 1 ;
78
77
let wal_ptr = wal as * mut _ ;
79
78
#[ cfg( feature = "bottomless" ) ]
80
79
let last_valid_frame = wal. hdr . mxFrame ;
81
80
#[ cfg( feature = "bottomless" ) ]
82
81
let frame_checksum = wal. hdr . aFrameCksum ;
83
82
let ctx = Self :: wal_extract_ctx ( wal) ;
84
83
85
- for ( page_no, data) in PageHdrIter :: new ( page_headers, page_size as _ ) {
86
- ctx. write_frame ( page_no, data)
84
+ for ( page_no, _data) in PageHdrIter :: new ( page_headers, page_size as _ ) {
85
+ ctx. write_frame ( page_no, FrameLocation :: in_wal_file ( frame_no) ) ;
86
+ frame_no += 1 ;
87
87
}
88
88
if let Err ( e) = ctx. flush ( ntruncate) {
89
89
tracing:: error!( "error writing to replication log: {e}" ) ;
90
90
// returning IO_ERR ensure that xUndo will be called by sqlite.
91
- return SQLITE_IOERR ;
91
+ return SQLITE_IOERR_WRITE ;
92
92
}
93
93
94
94
// FIXME: instead of block_on, we should consider replicating asynchronously in the background,
@@ -294,7 +294,7 @@ pub struct WalPage {
294
294
pub page_no : u32 ,
295
295
/// 0 for non-commit frames
296
296
pub size_after : u32 ,
297
- pub data : Bytes ,
297
+ pub data : FrameLocation ,
298
298
}
299
299
300
300
impl ReplicationLoggerHookCtx {
@@ -314,11 +314,11 @@ impl ReplicationLoggerHookCtx {
314
314
}
315
315
}
316
316
317
- fn write_frame ( & mut self , page_no : u32 , data : & [ u8 ] ) {
317
+ fn write_frame ( & mut self , page_no : u32 , data : FrameLocation ) {
318
318
let entry = WalPage {
319
319
page_no,
320
320
size_after : 0 ,
321
- data : Bytes :: copy_from_slice ( data ) ,
321
+ data,
322
322
} ;
323
323
self . buffer . push ( entry) ;
324
324
}
@@ -352,6 +352,7 @@ impl ReplicationLoggerHookCtx {
352
352
#[ derive( Debug ) ]
353
353
pub struct LogFile {
354
354
file : File ,
355
+ main_db_path : PathBuf , // actual data of the logged frames resides either in the main db file or the wal file
355
356
pub header : LogFileHeader ,
356
357
/// the maximum number of frames this log is allowed to contain before it should be compacted.
357
358
max_log_frame_count : u64 ,
@@ -379,9 +380,16 @@ pub enum LogReadError {
379
380
380
381
impl LogFile {
381
382
/// size of a single frame
383
+ /// FIXME: LogFile should only ever use references -> what to do with snapshots?
382
384
pub const FRAME_SIZE : usize = size_of :: < FrameHeader > ( ) + WAL_PAGE_SIZE as usize ;
385
+ /// size of a single frame reference
386
+ pub const FRAME_REF_SIZE : usize = size_of :: < FrameRef > ( ) ;
383
387
384
- pub fn new ( file : File , max_log_frame_count : u64 ) -> anyhow:: Result < Self > {
388
+ pub fn new (
389
+ file : File ,
390
+ main_db_path : PathBuf ,
391
+ max_log_frame_count : u64 ,
392
+ ) -> anyhow:: Result < Self > {
385
393
// FIXME: we should probably take a lock on this file, to prevent anybody else to write to
386
394
// it.
387
395
let file_end = file. metadata ( ) ?. len ( ) ;
@@ -401,6 +409,7 @@ impl LogFile {
401
409
402
410
let mut this = Self {
403
411
file,
412
+ main_db_path,
404
413
header,
405
414
max_log_frame_count,
406
415
uncommitted_frame_count : 0 ,
@@ -415,6 +424,7 @@ impl LogFile {
415
424
let header = Self :: read_header ( & file) ?;
416
425
let mut this = Self {
417
426
file,
427
+ main_db_path,
418
428
header,
419
429
max_log_frame_count,
420
430
uncommitted_frame_count : 0 ,
@@ -504,30 +514,33 @@ impl LogFile {
504
514
} ) )
505
515
}
506
516
507
- fn compute_checksum ( & self , page : & WalPage ) -> u64 {
508
- let mut digest = CRC_64_GO_ISO . digest_with_initial ( self . uncommitted_checksum ) ;
509
- digest. update ( & page. data ) ;
517
+ fn compute_checksum ( & self , _page : & WalPage ) -> u64 {
518
+ let digest = CRC_64_GO_ISO . digest_with_initial ( self . uncommitted_checksum ) ;
519
+ // FIXME: we should either read the page from its location and compute checksum,
520
+ // or just rely on the fact that the page is already checksummed by WAL or the main db file.
521
+ //digest.update(&page.data);
510
522
digest. finalize ( )
511
523
}
512
524
513
525
pub fn push_page ( & mut self , page : & WalPage ) -> anyhow:: Result < ( ) > {
514
526
let checksum = self . compute_checksum ( page) ;
515
- let frame = Frame :: from_parts (
516
- & FrameHeader {
517
- frame_no : self . next_frame_no ( ) ,
518
- checksum,
519
- page_no : page. page_no ,
520
- size_after : page. size_after ,
521
- } ,
522
- & page. data ,
523
- ) ;
527
+ let header = FrameHeader {
528
+ frame_no : self . next_frame_no ( ) ,
529
+ checksum,
530
+ page_no : page. page_no ,
531
+ size_after : page. size_after ,
532
+ } ;
533
+
534
+ let frame_ref = FrameRef :: new ( header, page. data ) ;
524
535
525
536
let byte_offset = self . next_byte_offset ( ) ;
526
- tracing:: trace!(
527
- "writing frame {} at offset {byte_offset}" ,
528
- frame. header( ) . frame_no
537
+ let data = frame_ref. as_bytes ( ) ;
538
+ tracing:: warn!(
539
+ "writing frame {} at offset {byte_offset}, size {}" ,
540
+ frame_ref. header. frame_no,
541
+ data. len( )
529
542
) ;
530
- self . file . write_all_at ( frame . as_slice ( ) , byte_offset) ?;
543
+ self . file . write_all_at ( & data , byte_offset) ?;
531
544
532
545
self . uncommitted_frame_count += 1 ;
533
546
self . uncommitted_checksum = checksum;
@@ -546,7 +559,7 @@ impl LogFile {
546
559
547
560
/// Returns the bytes position of the `nth` entry in the log
548
561
fn absolute_byte_offset ( nth : u64 ) -> u64 {
549
- std:: mem:: size_of :: < LogFileHeader > ( ) as u64 + nth * Self :: FRAME_SIZE as u64
562
+ std:: mem:: size_of :: < LogFileHeader > ( ) as u64 + nth * Self :: FRAME_REF_SIZE as u64
550
563
}
551
564
552
565
fn byte_offset ( & self , id : FrameNo ) -> anyhow:: Result < Option < u64 > > {
@@ -602,7 +615,8 @@ impl LogFile {
602
615
. write ( true )
603
616
. create ( true )
604
617
. open ( & temp_log_path) ?;
605
- let mut new_log_file = LogFile :: new ( file, self . max_log_frame_count ) ?;
618
+ let mut new_log_file =
619
+ LogFile :: new ( file, self . main_db_path . clone ( ) , self . max_log_frame_count ) ?;
606
620
let new_header = LogFileHeader {
607
621
start_frame_no : self . header . start_frame_no + self . header . frame_count ,
608
622
frame_count : 0 ,
@@ -620,11 +634,40 @@ impl LogFile {
620
634
}
621
635
622
636
fn read_frame_byte_offset ( & self , offset : u64 ) -> anyhow:: Result < Frame > {
623
- let mut buffer = BytesMut :: zeroed ( LogFile :: FRAME_SIZE ) ;
637
+ let mut buffer = BytesMut :: zeroed ( LogFile :: FRAME_REF_SIZE ) ;
624
638
self . file . read_exact_at ( & mut buffer, offset) ?;
625
- let buffer = buffer. freeze ( ) ;
639
+ tracing:: trace!( "Buffer size {}" , buffer. len( ) ) ;
640
+ let frame_ref = FrameRef :: try_from_bytes ( buffer. freeze ( ) ) ?;
641
+
642
+ tracing:: trace!( "Frame reference: {frame_ref:?}" ) ;
643
+
644
+ let mut frame_data = [ 0 ; WAL_PAGE_SIZE as usize ] ;
645
+ match frame_ref. location {
646
+ FrameLocation {
647
+ frame_no : FrameLocation :: IN_MAIN_DB_FILE ,
648
+ } => {
649
+ let main_db_file = std:: fs:: File :: open ( & self . main_db_path ) ?;
650
+ main_db_file. read_exact_at (
651
+ & mut frame_data,
652
+ frame_ref. header . page_no as u64 * WAL_PAGE_SIZE as u64 ,
653
+ ) ?;
654
+ }
655
+ FrameLocation { frame_no } => {
656
+ tracing:: trace!( "Reading {}" , self . main_db_path. join( "data-wal" ) . display( ) ) ;
657
+ let wal_file = std:: fs:: File :: open ( self . main_db_path . join ( "data-wal" ) ) ?;
658
+ // FIXME: this is *not* the correct way to read a frame from the wal file.
659
+ // It needs to take into account the wal file header, and the wal page headers.
660
+ wal_file. read_exact_at ( & mut frame_data, Self :: offset_in_wal ( frame_no) ) ?;
661
+ }
662
+ }
663
+
664
+ // FIXME: memory copy, easy enough to avoid
665
+ Ok ( Frame :: from_parts ( & frame_ref. header , & frame_data) )
666
+ }
626
667
627
- Frame :: try_from_bytes ( buffer)
668
+ // The offset of frame `frame_no` in the libSQL WAL file
669
+ fn offset_in_wal ( frame_no : u32 ) -> u64 {
670
+ 32 + ( ( frame_no - 1 ) as u64 ) * ( WAL_PAGE_SIZE as u64 + 24 )
628
671
}
629
672
630
673
fn last_commited_frame_no ( & self ) -> Option < FrameNo > {
@@ -639,7 +682,7 @@ impl LogFile {
639
682
let max_log_frame_count = self . max_log_frame_count ;
640
683
// truncate file
641
684
self . file . set_len ( 0 ) ?;
642
- Self :: new ( self . file , max_log_frame_count)
685
+ Self :: new ( self . file , self . main_db_path , max_log_frame_count)
643
686
}
644
687
}
645
688
@@ -754,7 +797,7 @@ impl ReplicationLogger {
754
797
. open ( log_path) ?;
755
798
756
799
let max_log_frame_count = max_log_size * 1_000_000 / LogFile :: FRAME_SIZE as u64 ;
757
- let log_file = LogFile :: new ( file, max_log_frame_count) ?;
800
+ let log_file = LogFile :: new ( file, db_path . to_owned ( ) , max_log_frame_count) ?;
758
801
let header = log_file. header ( ) ;
759
802
760
803
let should_recover = if header. version < 2 || header. sqld_version ( ) != Version :: current ( ) {
@@ -798,21 +841,18 @@ impl ReplicationLogger {
798
841
// best effort, there may be no snapshots
799
842
let _ = remove_dir_all ( snapshot_path) ;
800
843
801
- let data_file = File :: open ( & data_path) ?;
802
844
let size = data_path. metadata ( ) ?. len ( ) ;
803
845
assert ! (
804
846
size % WAL_PAGE_SIZE as u64 == 0 ,
805
847
"database file size is not a multiple of page size"
806
848
) ;
807
849
let num_page = size / WAL_PAGE_SIZE as u64 ;
808
- let mut buf = [ 0 ; WAL_PAGE_SIZE as usize ] ;
809
850
let mut page_no = 1 ; // page numbering starts at 1
810
851
for i in 0 ..num_page {
811
- data_file. read_exact_at ( & mut buf, i * WAL_PAGE_SIZE as u64 ) ?;
812
852
log_file. push_page ( & WalPage {
813
853
page_no,
814
854
size_after : if i == num_page - 1 { num_page as _ } else { 0 } ,
815
- data : Bytes :: copy_from_slice ( & buf ) ,
855
+ data : FrameLocation :: in_main_db_file ( ) , // log recovery is performed from the main db file
816
856
} ) ?;
817
857
log_file. commit ( ) ?;
818
858
@@ -918,7 +958,7 @@ mod test {
918
958
. map ( |i| WalPage {
919
959
page_no : i,
920
960
size_after : 0 ,
921
- data : Bytes :: from ( vec ! [ i as _ ; 4096 ] ) ,
961
+ data : FrameLocation :: in_main_db_file ( ) ,
922
962
} )
923
963
. collect :: < Vec < _ > > ( ) ;
924
964
logger. write_pages ( & frames) . unwrap ( ) ;
@@ -953,7 +993,7 @@ mod test {
953
993
let entry = WalPage {
954
994
page_no : 0 ,
955
995
size_after : 0 ,
956
- data : vec ! [ 0 ; 3 ] . into ( ) ,
996
+ data : FrameLocation :: in_main_db_file ( ) ,
957
997
} ;
958
998
959
999
logger. write_pages ( & [ entry] ) . unwrap ( ) ;
@@ -962,13 +1002,14 @@ mod test {
962
1002
963
1003
#[ test]
964
1004
fn log_file_test_rollback ( ) {
1005
+ let db = tempfile:: tempdir ( ) . unwrap ( ) ;
965
1006
let f = tempfile:: tempfile ( ) . unwrap ( ) ;
966
- let mut log_file = LogFile :: new ( f, 100 ) . unwrap ( ) ;
1007
+ let mut log_file = LogFile :: new ( f, db . path ( ) . to_owned ( ) , 100 ) . unwrap ( ) ;
967
1008
( 0 ..5 )
968
1009
. map ( |i| WalPage {
969
1010
page_no : i,
970
1011
size_after : 5 ,
971
- data : Bytes :: from_static ( & [ 1 ; 4096 ] ) ,
1012
+ data : FrameLocation :: in_main_db_file ( ) , // FIXME: actually fill the fake main db file with data
972
1013
} )
973
1014
. for_each ( |p| {
974
1015
log_file. push_page ( & p) . unwrap ( ) ;
@@ -982,7 +1023,7 @@ mod test {
982
1023
. map ( |i| WalPage {
983
1024
page_no : i,
984
1025
size_after : 5 ,
985
- data : Bytes :: from_static ( & [ 1 ; 4096 ] ) ,
1026
+ data : FrameLocation :: in_main_db_file ( ) ,
986
1027
} )
987
1028
. for_each ( |p| {
988
1029
log_file. push_page ( & p) . unwrap ( ) ;
@@ -995,7 +1036,7 @@ mod test {
995
1036
. push_page ( & WalPage {
996
1037
page_no : 42 ,
997
1038
size_after : 5 ,
998
- data : Bytes :: from_static ( & [ 1 ; 4096 ] ) ,
1039
+ data : FrameLocation :: in_main_db_file ( ) ,
999
1040
} )
1000
1041
. unwrap ( ) ;
1001
1042
0 commit comments