Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit e49eed8

Browse files
Merge #290
290: implement 2 phase commit for logging frames to replication log r=MarinPostma a=MarinPostma This PR implements a 2 phase commit between the actual WAL and the replication log, so no operation end in the replication log if they were not successfully committed to the database before. Co-authored-by: ad hoc <[email protected]>
2 parents ef938e6 + a643229 commit e49eed8

File tree

2 files changed

+115
-67
lines changed

2 files changed

+115
-67
lines changed

sqld/src/replication/logger.rs

Lines changed: 114 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@ use std::sync::atomic::{AtomicU64, Ordering};
88
use std::sync::Arc;
99

1010
use anyhow::ensure;
11-
use bytemuck::{cast_ref, try_from_bytes, Pod, Zeroable};
11+
use bytemuck::{bytes_of, cast_ref, try_from_bytes, Pod, Zeroable};
1212
use bytes::{BufMut, Bytes, BytesMut};
1313
use crc::Crc;
1414
use parking_lot::Mutex;
15+
use rusqlite::ffi::SQLITE_ERROR;
1516
use uuid::Uuid;
1617

1718
use crate::libsql::ffi::{
@@ -35,6 +36,14 @@ pub struct ReplicationLoggerHook {
3536
logger: Arc<ReplicationLogger>,
3637
}
3738

39+
/// This implementation of WalHook intercepts calls to `on_frame`, and writes them to a
40+
/// shadow wal. Writing to the shadow wal is done in three steps:
41+
/// i. append the new pages at the offset pointed by header.start_frame_index + header.frame_count
42+
/// ii. call the underlying implementation of on_frames
43+
/// iii. if the call of the underlying method was successfull, update the log header to the new
44+
/// frame count.
45+
///
46+
/// If either writing to the database of to the shadow wal fails, it must be noop.
3847
unsafe impl WalHook for ReplicationLoggerHook {
3948
fn on_frames(
4049
&mut self,
@@ -48,6 +57,22 @@ unsafe impl WalHook for ReplicationLoggerHook {
4857
) -> c_int {
4958
assert_eq!(page_size, 4096);
5059

60+
for (page_no, data) in PageHdrIter::new(page_headers, page_size as _) {
61+
self.write_frame(page_no, data)
62+
}
63+
64+
let commit_info = if is_commit != 0 {
65+
match self.flush(ntruncate) {
66+
Err(e) => {
67+
tracing::error!("error writing to replication log: {e}");
68+
return SQLITE_ERROR;
69+
}
70+
Ok(commit_info) => commit_info,
71+
}
72+
} else {
73+
None
74+
};
75+
5176
let rc = unsafe {
5277
orig(
5378
wal,
@@ -58,16 +83,11 @@ unsafe impl WalHook for ReplicationLoggerHook {
5883
sync_flags,
5984
)
6085
};
61-
if rc != crate::libsql::ffi::SQLITE_OK {
62-
return rc;
63-
}
6486

65-
for (page_no, data) in PageHdrIter::new(page_headers, page_size as _) {
66-
self.write_frame(page_no, data)
67-
}
68-
69-
if is_commit != 0 {
70-
self.commit(ntruncate);
87+
if is_commit != 0 && rc == 0 {
88+
if let Some((count, checksum)) = commit_info {
89+
self.commit(count, checksum);
90+
}
7191
}
7292

7393
rc
@@ -138,10 +158,21 @@ impl ReplicationLoggerHook {
138158
self.buffer.push(entry);
139159
}
140160

141-
fn commit(&mut self, size_after: u32) {
142-
self.buffer.last_mut().unwrap().size_after = size_after;
143-
self.logger.push_page(&self.buffer);
144-
self.buffer.clear();
161+
/// write buffered pages to the logger, without commiting.
162+
/// Returns the attempted count and checksum, that need to be passed to `commit`
163+
fn flush(&mut self, size_after: u32) -> anyhow::Result<Option<(u64, u64)>> {
164+
if !self.buffer.is_empty() {
165+
self.buffer.last_mut().unwrap().size_after = size_after;
166+
let ret = self.logger.write_pages(&self.buffer)?;
167+
self.buffer.clear();
168+
Ok(Some(ret))
169+
} else {
170+
Ok(None)
171+
}
172+
}
173+
174+
fn commit(&self, new_count: u64, new_checksum: u64) {
175+
self.logger.commit(new_count, new_checksum)
145176
}
146177

147178
fn rollback(&mut self) {
@@ -161,10 +192,14 @@ struct LogFileHeader {
161192
db_id: u128,
162193
/// Frame index of the first frame in the log
163194
start_frame_id: u64,
195+
/// entry count in file
196+
frame_count: u64,
164197
/// Wal file version number, currently: 1
165198
version: u32,
166199
/// page size: 4096
167200
page_size: i32,
201+
/// 0 padding for alignment
202+
_pad: u64,
168203
}
169204

170205
impl LogFileHeader {
@@ -179,6 +214,18 @@ impl LogFileHeader {
179214
fn encode<B: BufMut>(&self, mut buf: B) {
180215
buf.put(&cast_ref::<_, [u8; size_of::<Self>()]>(self)[..]);
181216
}
217+
218+
/// Returns the bytes position of the `nth` entry in the log
219+
fn absolute_byte_offset(nth: u64) -> u64 {
220+
std::mem::size_of::<Self>() as u64 + nth * ReplicationLogger::FRAME_SIZE as u64
221+
}
222+
223+
fn byte_offset(&self, id: FrameId) -> Option<u64> {
224+
if id < self.start_frame_id || id > self.start_frame_id + self.frame_count {
225+
return None;
226+
}
227+
Self::absolute_byte_offset(id - self.start_frame_id).into()
228+
}
182229
}
183230

184231
/// The file header for the WAL log. All fields are represented in little-endian ordering.
@@ -222,14 +269,10 @@ impl Generation {
222269
}
223270

224271
pub struct ReplicationLogger {
225-
/// offset id of the next Frame to write into the log
226-
next_frame_id: Mutex<FrameId>,
227-
/// first index present in the file
228-
start_frame_id: FrameId,
229-
log_file: File,
272+
log_header: Mutex<LogFileHeader>,
230273
current_checksum: AtomicU64,
231-
pub database_id: Uuid,
232274
pub generation: Generation,
275+
log_file: File,
233276
}
234277

235278
impl ReplicationLogger {
@@ -244,7 +287,6 @@ impl ReplicationLogger {
244287
.read(true)
245288
.open(path)?;
246289
let file_end = log_file.metadata()?.len();
247-
let end_id;
248290
let current_checksum;
249291

250292
let header = if file_end == 0 {
@@ -256,39 +298,43 @@ impl ReplicationLogger {
256298
page_size: WAL_PAGE_SIZE,
257299
start_checksum: 0,
258300
db_id: db_id.as_u128(),
301+
frame_count: 0,
302+
_pad: 0,
259303
};
260304

261305
let mut header_buf = BytesMut::new();
262306
header.encode(&mut header_buf);
307+
current_checksum = AtomicU64::new(0);
263308

264309
assert_eq!(header_buf.len(), std::mem::size_of::<LogFileHeader>());
265310

266311
log_file.write_all(&header_buf)?;
267-
end_id = 0;
268-
current_checksum = AtomicU64::new(0);
269312
header
270313
} else {
271314
let mut header_buf = BytesMut::zeroed(size_of::<LogFileHeader>());
272315
log_file.read_exact(&mut header_buf)?;
273316
let header = LogFileHeader::decode(&header_buf)?;
274-
end_id = (file_end - size_of::<FrameHeader>() as u64) / Self::FRAME_SIZE as u64;
275317
current_checksum = AtomicU64::new(Self::compute_checksum(&header, &log_file)?);
276318
header
277319
};
278320

279321
Ok(Self {
280-
next_frame_id: Mutex::new(end_id),
281-
start_frame_id: header.start_frame_id,
282-
log_file,
283322
current_checksum,
284-
database_id: Uuid::from_u128(header.db_id),
285-
generation: Generation::new(end_id),
323+
generation: Generation::new(header.start_frame_id),
324+
log_header: Mutex::new(header),
325+
log_file,
286326
})
287327
}
288328

289-
fn push_page(&self, pages: &[WalPage]) {
290-
let mut lock = self.next_frame_id.lock();
291-
let mut current_offset = *lock;
329+
pub fn database_id(&self) -> Uuid {
330+
Uuid::from_u128(self.log_header.lock().db_id)
331+
}
332+
333+
/// Write pages to the log, without updating the file header.
334+
/// Returns the new frame count and checksum to commit
335+
fn write_pages(&self, pages: &[WalPage]) -> anyhow::Result<(u64, u64)> {
336+
let log_header = { *self.log_header.lock() };
337+
let mut current_frame = log_header.frame_count;
292338
let mut buffer = BytesMut::with_capacity(Self::FRAME_SIZE);
293339
let mut current_checksum = self.current_checksum.load(Ordering::Relaxed);
294340
for page in pages.iter() {
@@ -298,7 +344,7 @@ impl ReplicationLogger {
298344
let checksum = digest.finalize();
299345

300346
let header = FrameHeader {
301-
frame_id: current_offset,
347+
frame_id: log_header.start_frame_id + current_frame,
302348
checksum,
303349
page_no: page.page_no,
304350
size_after: page.size_after,
@@ -311,58 +357,45 @@ impl ReplicationLogger {
311357

312358
frame.encode(&mut buffer);
313359

314-
self.log_file
315-
.write_all_at(
316-
&buffer,
317-
self.byte_offset(current_offset)
318-
.expect("attempt to write entry before first entry in the log"),
319-
)
320-
.unwrap();
360+
let byte_offset = LogFileHeader::absolute_byte_offset(current_frame);
361+
tracing::trace!("writing frame {current_frame} at offset {byte_offset}");
362+
self.log_file.write_all_at(&buffer, byte_offset)?;
321363

322-
current_offset += 1;
364+
current_frame += 1;
323365
current_checksum = checksum;
324366

325367
buffer.clear();
326368
}
327369

328-
self.current_checksum
329-
.store(current_checksum, Ordering::Relaxed);
330-
331-
*lock = current_offset;
370+
Ok((
371+
log_header.frame_count + pages.len() as u64,
372+
current_checksum,
373+
))
332374
}
333375

334376
/// Returns bytes represening a WalFrame for frame `id`
335377
///
336378
/// If the requested frame is before the first frame in the log, or after the last frame,
337379
/// Ok(None) is returned.
338380
pub fn frame_bytes(&self, id: FrameId) -> anyhow::Result<Option<Bytes>> {
339-
if id < self.start_frame_id {
381+
let header = { *self.log_header.lock() };
382+
if id < header.start_frame_id {
340383
return Ok(None);
341384
}
342385

343-
if id >= *self.next_frame_id.lock() {
386+
if id >= header.start_frame_id + header.frame_count {
344387
return Ok(None);
345388
}
346389

347390
let mut buffer = BytesMut::zeroed(Self::FRAME_SIZE);
348391
self.log_file
349-
.read_exact_at(&mut buffer, self.byte_offset(id).unwrap())?;
392+
.read_exact_at(&mut buffer, header.byte_offset(id).unwrap())?; // unwrap: we checked
393+
// that the frame index
394+
// in in the file before
350395

351396
Ok(Some(buffer.freeze()))
352397
}
353398

354-
/// Returns the bytes position of the `nth` entry in the log
355-
fn absolute_byte_offset(nth: u64) -> u64 {
356-
std::mem::size_of::<LogFileHeader>() as u64 + nth * ReplicationLogger::FRAME_SIZE as u64
357-
}
358-
359-
fn byte_offset(&self, id: FrameId) -> Option<u64> {
360-
if id < self.start_frame_id {
361-
return None;
362-
}
363-
Self::absolute_byte_offset(id - self.start_frame_id).into()
364-
}
365-
366399
/// Returns an iterator over the WAL frame headers
367400
fn frames_iter(
368401
file: &File,
@@ -383,7 +416,7 @@ impl ReplicationLogger {
383416
let mut current_offset = 0;
384417

385418
Ok(std::iter::from_fn(move || {
386-
let read_offset = Self::absolute_byte_offset(current_offset);
419+
let read_offset = LogFileHeader::absolute_byte_offset(current_offset);
387420
if read_offset >= file_len {
388421
return None;
389422
}
@@ -407,6 +440,19 @@ impl ReplicationLogger {
407440
Ok(cs)
408441
})
409442
}
443+
444+
fn commit(&self, new_frame_count: u64, new_current_checksum: u64) {
445+
let mut header = { *self.log_header.lock() };
446+
header.frame_count = new_frame_count;
447+
448+
self.log_file
449+
.write_all_at(bytes_of(&header), 0)
450+
.expect("fatal error, failed to commit to log");
451+
452+
self.current_checksum
453+
.store(new_current_checksum, Ordering::Relaxed);
454+
*self.log_header.lock() = header;
455+
}
410456
}
411457

412458
#[cfg(test)]
@@ -418,24 +464,24 @@ mod test {
418464
let dir = tempfile::tempdir().unwrap();
419465
let logger = ReplicationLogger::open(dir.path()).unwrap();
420466

421-
assert_eq!(*logger.next_frame_id.lock(), 0);
422-
423467
let frames = (0..10)
424468
.map(|i| WalPage {
425469
page_no: i,
426470
size_after: 0,
427471
data: Bytes::from(vec![i as _; 4096]),
428472
})
429473
.collect::<Vec<_>>();
430-
logger.push_page(&frames);
474+
let (count, chk) = logger.write_pages(&frames).unwrap();
475+
logger.commit(count, chk);
431476

432477
for i in 0..10 {
433478
let frame = WalFrame::decode(logger.frame_bytes(i).unwrap().unwrap()).unwrap();
434479
assert_eq!(frame.header.page_no, i as u32);
435480
assert!(frame.data.iter().all(|x| i as u8 == *x));
436481
}
437482

438-
assert_eq!(*logger.next_frame_id.lock(), 10);
483+
let header = { *logger.log_header.lock() };
484+
assert_eq!(header.start_frame_id + header.frame_count, 10);
439485
}
440486

441487
#[test]
@@ -455,6 +501,8 @@ mod test {
455501
size_after: 0,
456502
data: vec![0; 3].into(),
457503
};
458-
logger.push_page(&[entry]);
504+
505+
let (count, chk) = logger.write_pages(&[entry]).unwrap();
506+
logger.commit(count, chk);
459507
}
460508
}

sqld/src/rpc/replication_log.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ impl ReplicationLog for ReplicationLogService {
8989
guard.insert(replica_addr);
9090
}
9191
let response = HelloResponse {
92-
database_id: self.logger.database_id.to_string(),
92+
database_id: self.logger.database_id().to_string(),
9393
generation_start_index: self.logger.generation.start_index,
9494
generation_id: self.logger.generation.id.to_string(),
9595
};

0 commit comments

Comments
 (0)