Skip to content

Commit e11bbc2

Browse files
committed
Add shared KvStore with Arc type, thread safe move to heap
1 parent 3c7428e commit e11bbc2

File tree

7 files changed

+159
-51
lines changed

7 files changed

+159
-51
lines changed

kvs-client-server/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ edition = "2018"
99
[dependencies]
1010
clap = "2.33.0"
1111
failure = "0.1.5"
12-
serde = { version = "1.0.89", features = ["derive"] }
12+
serde = { version = "1.0.89", features = ["derive", "rc"] }
1313
serde_json = "1.0.39"
1414
bincode = "1.2.1"
1515
structopt = "0.3"

kvs-client-server/src/bin/kvs-server.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ fn main() -> Result<()> {
7979
}
8080

8181
fn start_server_with<E: KvsEngine>(addr: &str, engine: E) -> Result<()> {
82-
let mut server = Server::new(addr, engine);
82+
let server = Server::new(addr, engine);
8383
server.serve()?;
8484
Ok(())
8585
}

kvs-client-server/src/engines/kvs.rs

+50-30
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,18 @@ use std::collections::BTreeMap;
55
use std::fs::{self, File};
66
use std::io::{BufReader, BufWriter, Seek, SeekFrom, Write};
77
use std::path::PathBuf;
8+
use std::sync::{Arc, Mutex};
89

910
const COMPACTION_THRESHOLD: u64 = 1024;
1011
const LOG_FILE_NAME: &str = "current.db";
1112

12-
#[derive(Debug)]
13+
#[derive(Debug, Clone)]
1314
pub struct KvStore {
15+
inner: Arc<Mutex<InnerKvStore>>,
16+
}
17+
18+
#[derive(Debug)]
19+
struct InnerKvStore {
1420
path: PathBuf,
1521
log: File,
1622
map: BTreeMap<String, LogPointer>,
@@ -36,44 +42,53 @@ struct LogPointer {
3642
}
3743

3844
impl KvsEngine for KvStore {
39-
fn set(&mut self, key: String, value: String) -> Result<()> {
40-
let offset = self.log.seek(SeekFrom::End(0))?;
45+
fn set(&self, key: String, value: String) -> Result<()> {
46+
let mut inner = self.inner.lock().unwrap();
47+
48+
let offset = inner.log.seek(SeekFrom::End(0))?;
4149
let command = Command {
4250
cmd: CommandType::Set,
4351
key: key.clone(),
4452
value,
4553
};
4654
// encoding before writing to log
47-
serde_json::to_writer(&mut self.log, &command)?;
48-
self.log.flush()?;
49-
let current_offset = self.log.seek(SeekFrom::End(0))?;
50-
self.map.insert(
55+
serde_json::to_writer(&mut inner.log, &command)?;
56+
inner.log.flush()?;
57+
let current_offset = inner.log.seek(SeekFrom::End(0))?;
58+
inner.map.insert(
5159
key,
5260
LogPointer {
5361
offset,
5462
len: current_offset - offset,
5563
},
5664
);
5765
if current_offset > COMPACTION_THRESHOLD {
58-
self.compact()?;
66+
inner.compact()?;
5967
}
6068
Ok(())
6169
}
6270

63-
fn get(&mut self, key: String) -> Result<Option<String>> {
64-
if let Some(pointer) = self.map.get(&key) {
65-
&self.log.seek(SeekFrom::Start(pointer.offset));
66-
let mut de = serde_json::Deserializer::from_reader(&self.log);
67-
let cmd: Command = serde::de::Deserialize::deserialize(&mut de)?;
68-
Ok(Some(cmd.value))
69-
} else {
70-
Ok(None)
71+
fn get(&self, key: String) -> Result<Option<String>> {
72+
let mut inner = self.inner.lock().unwrap();
73+
74+
if !inner.map.contains_key(&key) {
75+
return Ok(None);
7176
}
77+
78+
let pointer = inner.map.get(&key).unwrap();
79+
let pos = SeekFrom::Start(pointer.offset);
80+
81+
inner.log.seek(pos)?;
82+
let mut de = serde_json::Deserializer::from_reader(&inner.log);
83+
let cmd: Command = serde::de::Deserialize::deserialize(&mut de)?;
84+
Ok(Some(cmd.value))
7285
}
7386

74-
fn remove(&mut self, key: String) -> Result<()> {
75-
self.log.seek(SeekFrom::End(0))?;
76-
if self.map.get(&key).is_none() {
87+
fn remove(&self, key: String) -> Result<()> {
88+
let mut inner = self.inner.lock().unwrap();
89+
90+
inner.log.seek(SeekFrom::End(0))?;
91+
if inner.map.get(&key).is_none() {
7792
return Err(KvsError::KeyNotFound);
7893
}
7994
let command = Command {
@@ -82,30 +97,35 @@ impl KvsEngine for KvStore {
8297
value: String::new(),
8398
};
8499
// encoding before writing to log
85-
serde_json::to_writer(&mut self.log, &command)?;
86-
self.log.flush()?;
87-
self.map.remove(&key);
100+
serde_json::to_writer(&mut inner.log, &command)?;
101+
inner.log.flush()?;
102+
inner.map.remove(&key);
88103
Ok(())
89104
}
90105
}
91106

92107
impl KvStore {
93108
pub fn open(path: impl Into<PathBuf>) -> Result<KvStore> {
94109
let path = path.into();
95-
fs::create_dir_all(&path)?;
96-
let log = Self::new_log_file(&path)?;
110+
fs::create_dir_all(&*path)?;
111+
let log = InnerKvStore::new_log_file(&path)?;
97112

98-
let mut store = KvStore {
99-
path: path,
100-
log: log,
113+
let mut inner = InnerKvStore {
114+
path,
115+
log,
101116
map: BTreeMap::new(),
102117
};
103118

104119
// Load from log files
105-
store.load_from_log()?;
106-
Ok(store)
120+
inner.load_from_log()?;
121+
122+
Ok(KvStore {
123+
inner: Arc::new(Mutex::new(inner)),
124+
})
107125
}
126+
}
108127

128+
impl InnerKvStore {
109129
fn load_from_log(&mut self) -> Result<()> {
110130
let mut reader = BufReader::new(File::open(&self.path.join(LOG_FILE_NAME))?);
111131
let mut offset = reader.seek(SeekFrom::Start(0))?;
@@ -141,7 +161,7 @@ impl KvStore {
141161
Ok(())
142162
}
143163

144-
fn compact(&mut self) -> Result<()> {
164+
fn compact(&self) -> Result<()> {
145165
let tmp_path: PathBuf = self.path.join("tmp.db");
146166
let file_path: PathBuf = self.path.join(LOG_FILE_NAME);
147167
let mut work_file = self.log.try_clone()?;

kvs-client-server/src/engines/mod.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,23 @@
11
use crate::Result;
22

33
/// Trait for a key value storage engine.
4-
pub trait KvsEngine {
4+
pub trait KvsEngine: Clone + Send + 'static {
55
/// Sets the value of a string key to a string.
66
///
77
/// If the key already exists, the previous value will be overwritten.
8-
fn set(&mut self, key: String, value: String) -> Result<()>;
8+
fn set(&self, key: String, value: String) -> Result<()>;
99

1010
/// Gets the string value of a given string key.
1111
///
1212
/// Returns `None` if the given key does not exist.
13-
fn get(&mut self, key: String) -> Result<Option<String>>;
13+
fn get(&self, key: String) -> Result<Option<String>>;
1414

1515
/// Removes a given key.
1616
///
1717
/// # Errors
1818
///
1919
/// It returns `KvsError::KeyNotFound` if the given key is not found.
20-
fn remove(&mut self, key: String) -> Result<()>;
20+
fn remove(&self, key: String) -> Result<()>;
2121
}
2222

2323
mod kvs;

kvs-client-server/src/engines/sled.rs

+5-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use crate::{KvsEngine, KvsError, Result};
22
use sled::Db;
33
use std::path::PathBuf;
4+
5+
#[derive(Clone)]
46
pub struct SledKvsEngine {
57
tree: Db,
68
}
@@ -13,13 +15,13 @@ impl SledKvsEngine {
1315
}
1416

1517
impl KvsEngine for SledKvsEngine {
16-
fn set(&mut self, key: String, value: String) -> Result<()> {
18+
fn set(&self, key: String, value: String) -> Result<()> {
1719
self.tree.insert(key, value.as_bytes())?;
1820
self.tree.flush()?;
1921
Ok(())
2022
}
2123

22-
fn get(&mut self, key: String) -> Result<Option<String>> {
24+
fn get(&self, key: String) -> Result<Option<String>> {
2325
Ok(self
2426
.tree
2527
.get(key)?
@@ -28,7 +30,7 @@ impl KvsEngine for SledKvsEngine {
2830
.transpose()?)
2931
}
3032

31-
fn remove(&mut self, key: String) -> Result<()> {
33+
fn remove(&self, key: String) -> Result<()> {
3234
self.tree.remove(key)?.ok_or(KvsError::KeyNotFound)?;
3335
self.tree.flush()?;
3436
Ok(())

kvs-client-server/src/server.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ impl<E: KvsEngine> Server<E> {
2222
Server { listener, engine }
2323
}
2424

25-
pub fn serve(&mut self) -> Result<()> {
25+
pub fn serve(&self) -> Result<()> {
2626
debug!("Waiting for connections...");
2727
let listnr = self.listener.try_clone().unwrap();
2828
for stream in listnr.incoming() {
@@ -38,7 +38,7 @@ impl<E: KvsEngine> Server<E> {
3838
Ok(())
3939
}
4040

41-
fn handle_client(&mut self, stream: TcpStream) -> Result<()> {
41+
fn handle_client(&self, stream: TcpStream) -> Result<()> {
4242
debug!(
4343
"Connection established from {}, waiting for data...",
4444
stream.peer_addr()?

0 commit comments

Comments
 (0)