Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add file snapshot #37

Merged
merged 1 commit into from
Sep 9, 2024
Merged

Conversation

dierbei
Copy link
Collaborator

@dierbei dierbei commented Sep 5, 2024

The process was rushed with one write-up, sorry; please review it for me, thanks.

I decided to remove the original log field, my idea was to just make sure that the state_machine data is complete, which has the advantage of saving memory(storing both the log and the state_machine would be two copies of the data), I didn't think about whether there would be a problem at the moment.

I added timed snapshots, maybe 1 minute, 5 minutes, ....

After the snapshot is created, clear the data in memory, in order to save memory.

I haven't figured out where to put need_create_snapshot and create_snapshot yet, wait for me some time, it's rather rushed;

/cloudide/workspace/raft-rs$ cargo run --example simulate_add_node
   Compiling raft_rs v0.1.0 (/cloudide/workspace/raft-rs)
    Finished dev [unoptimized + debuginfo] target(s) in 1.72s
     Running `target/debug/examples/simulate_add_node`
2024-09-06 05:45:04 INFO Log after reading from disk: Ok([LogEntry { leader_id: 1, server_id: 3, term: 2, command: Set, data: 42 }]), current term: 2, id: 3, port: 5003, ip: 127.0.0.1
2024-09-06 05:45:04 INFO Log after reading from disk: Ok([LogEntry { leader_id: 1, server_id: 4, term: 2, command: Set, data: 42 }]), current term: 2, id: 4, port: 5004, ip: 127.0.0.1
2024-09-06 05:45:04 INFO Server 3 is a follower, id: 3, port: 5003, ip: 127.0.0.1
2024-09-06 05:45:04 INFO Server 4 is a follower, id: 4, port: 5004, ip: 127.0.0.1
2024-09-06 05:45:04 INFO Log after reading from disk: Ok([LogEntry { leader_id: 1, server_id: 1, term: 2, command: Set, data: 42 }]), current term: 2, id: 1, port: 5001, ip: 127.0.0.1
2024-09-06 05:45:04 INFO Log after reading from disk: Ok([LogEntry { leader_id: 1, server_id: 5, term: 2, command: Set, data: 42 }]), current term: 2, id: 5, port: 5005, ip: 127.0.0.1
2024-09-06 05:45:04 INFO Server 1 is a follower, id: 1, port: 5001, ip: 127.0.0.1
2024-09-06 05:45:04 INFO Server 5 is a follower, id: 5, port: 5005, ip: 127.0.0.1
2024-09-06 05:45:04 INFO Log after reading from disk: Ok([LogEntry { leader_id: 1, server_id: 2, term: 2, command: Set, data: 42 }]), current term: 2, id: 2, port: 5002, ip: 127.0.0.1
2024-09-06 05:45:04 INFO Server 2 is a follower, id: 2, port: 5002, ip: 127.0.0.1
2024-09-06 05:45:05 INFO Server 1 is a candidate, id: 1, port: 5001, ip: 127.0.0.1
2024-09-06 05:45:05 INFO Starting election, id: 1, term: 3, id: 1, port: 5001, ip: 127.0.0.1
2024-09-06 05:45:05 INFO Votes received: {1: true, 2: true}, id: 1, port: 5001, ip: 127.0.0.1
2024-09-06 05:45:05 INFO Votes received: {4: true, 1: true, 2: true}, id: 1, port: 5001, ip: 127.0.0.1
2024-09-06 05:45:05 INFO Quorum reached, id: 1, port: 5001, ip: 127.0.0.1
2024-09-06 05:45:05 INFO I am the leader 1, id: 1, port: 5001, ip: 127.0.0.1
2024-09-06 05:45:05 INFO Server 1 is the leader, term: 4, id: 1, port: 5001, ip: 127.0.0.1
2024-09-06 05:45:14 INFO Log after reading from disk: Ok([]), current term: 0, id: 6, port: 5006, ip: 127.0.0.1
2024-09-06 05:45:14 INFO Server 6 is a follower, id: 6, port: 5006, ip: 127.0.0.1
2024-09-06 05:45:15 INFO Server 6 is a candidate, id: 6, port: 5006, ip: 127.0.0.1
2024-09-06 05:45:15 INFO Starting election, id: 6, term: 2, id: 6, port: 5006, ip: 127.0.0.1
2024-09-06 05:45:16 INFO Election timeout, id: 6, port: 5006, ip: 127.0.0.1
2024-09-06 05:45:16 INFO Log after reading from disk: Ok([]), current term: 2, id: 6, port: 5006, ip: 127.0.0.1
2024-09-06 05:45:16 INFO Server 6 is a follower, id: 6, port: 5006, ip: 127.0.0.1
2024-09-06 05:45:17 INFO Received join request: "\0\0\0\u{6}\0\0\0\0\0\0\0\n127.0.0.1:5006", id: 1, port: 5001, ip: 127.0.0.1
2024-09-06 05:45:17 INFO Current cluster nodes: ["127.0.0.1:5001", "127.0.0.1:5002", "127.0.0.1:5003", "127.0.0.1:5004", "127.0.0.1:5005"], want join node: 127.0.0.1:5006, id: 1, port: 5001, ip: 127.0.0.1
2024-09-06 05:45:17 INFO Sending log entrys to new node: [LogEntry { leader_id: 1, server_id: 1, term: 2, command: Set, data: 42 }], id: 1, port: 5001, ip: 127.0.0.1
2024-09-06 05:45:17 INFO Joined the cluster with leader: 1, own id: 6, id: 6, port: 5006, ip: 127.0.0.1
2024-09-06 05:45:17 INFO Received batch append entries response from peer: 6, current peer last_included_term: 2, id: 1, port: 5001, ip: 127.0.0.1
2024-09-06 05:46:04 INFO Node: 5, snapshot created successfully., id: 5, port: 5005, ip: 127.0.0.1
2024-09-06 05:46:04 INFO Node: 4, snapshot created successfully., id: 4, port: 5004, ip: 127.0.0.1
2024-09-06 05:46:04 INFO Node: 3, snapshot created successfully., id: 3, port: 5003, ip: 127.0.0.1
2024-09-06 05:46:04 INFO Node: 1, snapshot created successfully., id: 1, port: 5001, ip: 127.0.0.1
2024-09-06 05:46:04 INFO Node: 2, snapshot created successfully., id: 2, port: 5002, ip: 127.0.0.1
2024-09-06 05:46:14 INFO Node: 6, snapshot created successfully., id: 6, port: 5006, ip: 127.0.0.1
^C

@dierbei dierbei force-pushed the add-log-snapshot branch 2 times, most recently from 934dcc0 to f5ee779 Compare September 6, 2024 05:55
@vaibhawvipul vaibhawvipul requested review from vaibhawvipul and dharanad and removed request for vaibhawvipul September 6, 2024 06:01
Signed-off-by: dierbei <[email protected]>
@dierbei
Copy link
Collaborator Author

dierbei commented Sep 6, 2024

I implemented a timed way to create snapshots (which will be in the same directory as the log);

raft-rs/src/server.rs

Lines 150 to 170 in 841fab0

// if storage location is provided, use it else set empty string to use default location
let storage_location = match config.storage_location.clone() {
Some(location) => location + &format!("server_{}.log", id),
None => format!("server_{}.log", id),
};
let storage = LocalStorage::new(storage_location.clone()).await;
let parent_path = PathBuf::from(storage_location)
.parent() // This returns Option<&Path>
.map(|p| p.to_path_buf()) // Convert &Path to PathBuf
.unwrap_or_else(|| PathBuf::from("logs")); // Provide default path
// Use the provided state_machine or default to FileStateMachine if none is provided
let state_machine = state_machine.unwrap_or_else(|| {
// Default FileStateMachine initialization
let snapshot_path = parent_path.join(format!("server_{}_snapshot.log", id));
Box::new(state_mechine::FileStateMachine::new(
&snapshot_path,
Duration::from_secs(60 * 60),
))
});


I've modified the function parameters to customize an implementation (without passing in it will use the self-contained one)

raft-rs/src/server.rs

Lines 139 to 144 in 841fab0

pub async fn new(
id: u32,
config: ServerConfig,
cluster_config: ClusterConfig,
state_machine: Option<Box<dyn StateMachine>>,
) -> Server {


Determine whether to create a snapshot here((follower, leader)):

raft-rs/src/server.rs

Lines 318 to 338 in 841fab0

let state_machine = Arc::clone(&self.state.state_machine);
loop {
if state_machine.lock().await.need_create_snapshot().await {
let state_machine_clone = Arc::clone(&self.state.state_machine);
let log_clone = self.log.clone();
let node_id_clone = self.id;
tokio::spawn(async move {
let mut state_machine_lock = state_machine_clone.lock().await;
if let Err(e) = state_machine_lock.create_snapshot().await {
error!(
log_clone,
"Node: {}, failed to create snapshot: {:?}", node_id_clone, e
);
} else {
info!(
log_clone,
"Node: {}, snapshot created successfully.", node_id_clone
);
}
});
}


Here the full amount of data is sent to the newly joined node:

raft-rs/src/server.rs

Lines 1032 to 1049 in 841fab0

let mut batch_append_entry_request: Vec<u8> = Vec::new();
batch_append_entry_request.extend_from_slice(&self.id.to_be_bytes());
batch_append_entry_request
.extend_from_slice(&state_machine.lock().await.get_term().await.to_be_bytes());
batch_append_entry_request.extend_from_slice(&12u32.to_be_bytes());
batch_append_entry_request
.extend_from_slice(&state_machine.lock().await.get_index().await.to_be_bytes());
batch_append_entry_request.extend_from_slice(&log_entry_bytes);
if let Err(e) = self
.network_manager
.send(&peer_address, &batch_append_entry_request)
.await
{
error!(
self.log,
"Failed send batch append entry request to {}, err: {}", peer_address, e
);
}

@vaibhawvipul
Copy link
Contributor

This is looking very cool.

Copy link
Contributor

@vaibhawvipul vaibhawvipul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was great work!

@vaibhawvipul vaibhawvipul merged commit 159a18e into spacewalkhq:main Sep 9, 2024
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants