Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit 8a01508

Browse files
Merge #293
293: log compaction r=MarinPostma a=MarinPostma # Log compaction and snapshotting This PR enables log compaction and snapshotting. ## Motivation The replication log follows sqlite WAL and grows indefinitely. Fortunately, it contains a lot of duplicate data, so we can compress it by getting by keeping only the most recent version of each page. This is what this PR does: whenever the replication log grows above some threshold, a new log is created, and the old log is compacted. This operation is done atomically, so the compaction can happen in the background, while we keep writing to the old log. ## Log compaction: The log compaction is very straightforward. We iterate backwards through the replication log, and write the frames to the snapshot file. We keep track of what pages we have already seen, and ignore older version of them. When the snapshot is finished, we remove the old log file. Notice how the frames are now in a reverse order in the snapshot: starting with the most recent, and ending with the oldest. ## Snapshoting: Whenever a replica asks for a frame that is not present in the current log (i.e, it asks for a frame index less than the log starting frame id), then it sends the replica an error, asking it to ask for a snapshot. The replica receives this message, and immediately asks for a snapshot, sending over the frame id that got it rejected in the first place. The primary looks for a snapshot containing the request frame, and starts iterating through it, until it reaches a frame that is less than the requested one. This mechanism allows us to send partial snapshot: the replica gets minimal amount of frames required to get up to speed. The replica writes the snapshot frames to a file, and then `mmap` that file to build a chained list of pages to append to the WAL. ## Future work: This PR was getting a bit too long, so I left out some work for followup PRs: - Even though the snapshot significantly compresses the size of the log, a new log is created, that will lead to the creation of a new snapshot. Those snapshot will pile up and we're back at step one. The next step is to merge snapshots into bigger snapshots, and get rid of the older snapshots. - Every query for a frame causes a read to the snapshot/log. To speed things up, let's add a MRU cache. - Explore compression Co-authored-by: ad hoc <[email protected]>
2 parents 54cbc98 + 329680a commit 8a01508

File tree

12 files changed

+982
-242
lines changed

12 files changed

+982
-242
lines changed

Cargo.lock

Lines changed: 34 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sqld/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ tower-http = { version = "0.3.5", features = ["compression-full", "cors", "trace
5454
tracing = "0.1.37"
5555
tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
5656
uuid = { version = "1.3", features = ["v4", "serde"] }
57+
nix = { version = "0.26.2", features = ["fs"] }
58+
tempfile = "3.3.0"
59+
memmap = "0.7.0"
5760

5861
[dev-dependencies]
5962
proptest = "1.0.0"

sqld/proto/replication_log.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,5 @@ message Frame {
2424
service ReplicationLog {
2525
rpc Hello(HelloRequest) returns (HelloResponse) {}
2626
rpc LogEntries(LogOffset) returns (stream Frame) {}
27+
rpc Snapshot(LogOffset) returns (stream Frame) {}
2728
}

sqld/src/lib.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use database::write_proxy::WriteProxyDbFactory;
1313
use once_cell::sync::Lazy;
1414
#[cfg(feature = "mwal_backend")]
1515
use once_cell::sync::OnceCell;
16-
use replication::logger::{ReplicationLogger, ReplicationLoggerHook};
16+
use replication::{ReplicationLogger, ReplicationLoggerHook};
1717
use rpc::run_rpc_server;
1818
use tokio::sync::{mpsc, Notify};
1919
use tokio::task::JoinSet;
@@ -89,6 +89,7 @@ pub struct Config {
8989
pub create_local_http_tunnel: bool,
9090
pub idle_shutdown_timeout: Option<Duration>,
9191
pub load_from_dump: Option<PathBuf>,
92+
pub max_log_size: u64,
9293
}
9394

9495
async fn run_service(
@@ -214,7 +215,10 @@ async fn start_primary(
214215
idle_shutdown_layer: Option<IdleShutdownLayer>,
215216
) -> anyhow::Result<()> {
216217
let is_fresh_db = check_fresh_db(&config.db_path);
217-
let logger = Arc::new(ReplicationLogger::open(&config.db_path)?);
218+
let logger = Arc::new(ReplicationLogger::open(
219+
&config.db_path,
220+
config.max_log_size,
221+
)?);
218222
let logger_clone = logger.clone();
219223
let path_clone = config.db_path.clone();
220224
#[cfg(feature = "bottomless")]

sqld/src/main.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,11 @@ struct Cli {
110110
/// Requires that the node is not in replica mode
111111
#[clap(long, env = "SQLD_LOAD_DUMP_PATH", conflicts_with = "primary_grpc_url")]
112112
load_from_dump: Option<PathBuf>,
113+
114+
/// Maximum size the replication log is allowed to grow (in MB).
115+
/// defaults to 200MB.
116+
#[clap(long, env = "SQLD_MAX_LOG_SIZE", default_value = "200")]
117+
max_log_size: u64,
113118
}
114119

115120
impl Cli {
@@ -198,6 +203,7 @@ fn config_from_args(args: Cli) -> Result<Config> {
198203
create_local_http_tunnel: args.create_local_http_tunnel,
199204
idle_shutdown_timeout: args.idle_shutdown_timeout_s.map(Duration::from_secs),
200205
load_from_dump: args.load_from_dump,
206+
max_log_size: args.max_log_size,
201207
})
202208
}
203209

0 commit comments

Comments
 (0)