Skip to content

Commit 38af63f

Browse files
authored
Merge pull request #3 from hindenbug/project-4-threadpool
Project 4 - Threadpool
2 parents 91b9f12 + 17679be commit 38af63f

File tree

14 files changed

+436
-152
lines changed

14 files changed

+436
-152
lines changed

kvs-2/src/kv.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ impl KvStore {
4141
let log = Self::new_log_file(&path)?;
4242

4343
let mut store = KvStore {
44-
path: path,
45-
log: log,
44+
path,
45+
log,
4646
map: BTreeMap::new(),
4747
};
4848

kvs-client-server/Cargo.lock

+103-88
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

kvs-client-server/Cargo.toml

+10-5
Original file line numberDiff line numberDiff line change
@@ -7,23 +7,28 @@ edition = "2018"
77
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
88

99
[dependencies]
10-
clap = "2.33.0"
10+
clap = "2.33.1"
1111
failure = "0.1.5"
12-
serde = { version = "1.0.89", features = ["derive"] }
13-
serde_json = "1.0.39"
12+
serde = { version = "1.0.111", features = ["derive", "rc"] }
13+
serde_json = "1.0.53"
1414
bincode = "1.2.1"
1515
structopt = "0.3"
1616
log = "0.4"
1717
env_logger = "0.7"
1818
sled = "0.31"
19+
num_cpus = "1.13.0"
20+
crossbeam = "0.7.3"
21+
1922

2023
[dev-dependencies]
21-
assert_cmd = "0.11.0"
24+
assert_cmd = "1.0.1"
2225
predicates = "1.0.0"
23-
tempfile = "3.0.7"
26+
tempfile = "3.1.0"
2427
walkdir = "2.2.7"
2528
rand = "0.6.5"
2629
criterion = "0.3.0"
30+
crossbeam-utils = "0.7.2"
31+
panic-control = "0.1.4"
2732

2833
[lib]
2934
test = false

kvs-client-server/src/bin/kvs-server.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ fn main() -> Result<()> {
7979
}
8080

8181
fn start_server_with<E: KvsEngine>(addr: &str, engine: E) -> Result<()> {
82-
let mut server = Server::new(addr, engine);
82+
let server = Server::new(addr, engine);
8383
server.serve()?;
8484
Ok(())
8585
}

kvs-client-server/src/engines/kvs.rs

+50-30
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,18 @@ use std::collections::BTreeMap;
55
use std::fs::{self, File};
66
use std::io::{BufReader, BufWriter, Seek, SeekFrom, Write};
77
use std::path::PathBuf;
8+
use std::sync::{Arc, Mutex};
89

910
const COMPACTION_THRESHOLD: u64 = 1024;
1011
const LOG_FILE_NAME: &str = "current.db";
1112

12-
#[derive(Debug)]
13+
#[derive(Debug, Clone)]
1314
pub struct KvStore {
15+
inner: Arc<Mutex<InnerKvStore>>,
16+
}
17+
18+
#[derive(Debug)]
19+
struct InnerKvStore {
1420
path: PathBuf,
1521
log: File,
1622
map: BTreeMap<String, LogPointer>,
@@ -36,44 +42,53 @@ struct LogPointer {
3642
}
3743

3844
impl KvsEngine for KvStore {
39-
fn set(&mut self, key: String, value: String) -> Result<()> {
40-
let offset = self.log.seek(SeekFrom::End(0))?;
45+
fn set(&self, key: String, value: String) -> Result<()> {
46+
let mut inner = self.inner.lock().unwrap();
47+
48+
let offset = inner.log.seek(SeekFrom::End(0))?;
4149
let command = Command {
4250
cmd: CommandType::Set,
4351
key: key.clone(),
4452
value,
4553
};
4654
// encoding before writing to log
47-
serde_json::to_writer(&mut self.log, &command)?;
48-
self.log.flush()?;
49-
let current_offset = self.log.seek(SeekFrom::End(0))?;
50-
self.map.insert(
55+
serde_json::to_writer(&mut inner.log, &command)?;
56+
inner.log.flush()?;
57+
let current_offset = inner.log.seek(SeekFrom::End(0))?;
58+
inner.map.insert(
5159
key,
5260
LogPointer {
5361
offset,
5462
len: current_offset - offset,
5563
},
5664
);
5765
if current_offset > COMPACTION_THRESHOLD {
58-
self.compact()?;
66+
inner.compact()?;
5967
}
6068
Ok(())
6169
}
6270

63-
fn get(&mut self, key: String) -> Result<Option<String>> {
64-
if let Some(pointer) = self.map.get(&key) {
65-
&self.log.seek(SeekFrom::Start(pointer.offset));
66-
let mut de = serde_json::Deserializer::from_reader(&self.log);
67-
let cmd: Command = serde::de::Deserialize::deserialize(&mut de)?;
68-
Ok(Some(cmd.value))
69-
} else {
70-
Ok(None)
71+
fn get(&self, key: String) -> Result<Option<String>> {
72+
let mut inner = self.inner.lock().unwrap();
73+
74+
if !inner.map.contains_key(&key) {
75+
return Ok(None);
7176
}
77+
78+
let pointer = inner.map.get(&key).unwrap();
79+
let pos = SeekFrom::Start(pointer.offset);
80+
81+
inner.log.seek(pos)?;
82+
let mut de = serde_json::Deserializer::from_reader(&inner.log);
83+
let cmd: Command = serde::de::Deserialize::deserialize(&mut de)?;
84+
Ok(Some(cmd.value))
7285
}
7386

74-
fn remove(&mut self, key: String) -> Result<()> {
75-
self.log.seek(SeekFrom::End(0))?;
76-
if self.map.get(&key).is_none() {
87+
fn remove(&self, key: String) -> Result<()> {
88+
let mut inner = self.inner.lock().unwrap();
89+
90+
inner.log.seek(SeekFrom::End(0))?;
91+
if inner.map.get(&key).is_none() {
7792
return Err(KvsError::KeyNotFound);
7893
}
7994
let command = Command {
@@ -82,30 +97,35 @@ impl KvsEngine for KvStore {
8297
value: String::new(),
8398
};
8499
// encoding before writing to log
85-
serde_json::to_writer(&mut self.log, &command)?;
86-
self.log.flush()?;
87-
self.map.remove(&key);
100+
serde_json::to_writer(&mut inner.log, &command)?;
101+
inner.log.flush()?;
102+
inner.map.remove(&key);
88103
Ok(())
89104
}
90105
}
91106

92107
impl KvStore {
93108
pub fn open(path: impl Into<PathBuf>) -> Result<KvStore> {
94109
let path = path.into();
95-
fs::create_dir_all(&path)?;
96-
let log = Self::new_log_file(&path)?;
110+
fs::create_dir_all(&*path)?;
111+
let log = InnerKvStore::new_log_file(&path)?;
97112

98-
let mut store = KvStore {
99-
path: path,
100-
log: log,
113+
let mut inner = InnerKvStore {
114+
path,
115+
log,
101116
map: BTreeMap::new(),
102117
};
103118

104119
// Load from log files
105-
store.load_from_log()?;
106-
Ok(store)
120+
inner.load_from_log()?;
121+
122+
Ok(KvStore {
123+
inner: Arc::new(Mutex::new(inner)),
124+
})
107125
}
126+
}
108127

128+
impl InnerKvStore {
109129
fn load_from_log(&mut self) -> Result<()> {
110130
let mut reader = BufReader::new(File::open(&self.path.join(LOG_FILE_NAME))?);
111131
let mut offset = reader.seek(SeekFrom::Start(0))?;
@@ -141,7 +161,7 @@ impl KvStore {
141161
Ok(())
142162
}
143163

144-
fn compact(&mut self) -> Result<()> {
164+
fn compact(&self) -> Result<()> {
145165
let tmp_path: PathBuf = self.path.join("tmp.db");
146166
let file_path: PathBuf = self.path.join(LOG_FILE_NAME);
147167
let mut work_file = self.log.try_clone()?;

kvs-client-server/src/engines/mod.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,23 @@
11
use crate::Result;
22

33
/// Trait for a key value storage engine.
4-
pub trait KvsEngine {
4+
pub trait KvsEngine: Clone + Send + 'static {
55
/// Sets the value of a string key to a string.
66
///
77
/// If the key already exists, the previous value will be overwritten.
8-
fn set(&mut self, key: String, value: String) -> Result<()>;
8+
fn set(&self, key: String, value: String) -> Result<()>;
99

1010
/// Gets the string value of a given string key.
1111
///
1212
/// Returns `None` if the given key does not exist.
13-
fn get(&mut self, key: String) -> Result<Option<String>>;
13+
fn get(&self, key: String) -> Result<Option<String>>;
1414

1515
/// Removes a given key.
1616
///
1717
/// # Errors
1818
///
1919
/// It returns `KvsError::KeyNotFound` if the given key is not found.
20-
fn remove(&mut self, key: String) -> Result<()>;
20+
fn remove(&self, key: String) -> Result<()>;
2121
}
2222

2323
mod kvs;

kvs-client-server/src/engines/sled.rs

+5-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use crate::{KvsEngine, KvsError, Result};
22
use sled::Db;
33
use std::path::PathBuf;
4+
5+
#[derive(Clone)]
46
pub struct SledKvsEngine {
57
tree: Db,
68
}
@@ -13,13 +15,13 @@ impl SledKvsEngine {
1315
}
1416

1517
impl KvsEngine for SledKvsEngine {
16-
fn set(&mut self, key: String, value: String) -> Result<()> {
18+
fn set(&self, key: String, value: String) -> Result<()> {
1719
self.tree.insert(key, value.as_bytes())?;
1820
self.tree.flush()?;
1921
Ok(())
2022
}
2123

22-
fn get(&mut self, key: String) -> Result<Option<String>> {
24+
fn get(&self, key: String) -> Result<Option<String>> {
2325
Ok(self
2426
.tree
2527
.get(key)?
@@ -28,7 +30,7 @@ impl KvsEngine for SledKvsEngine {
2830
.transpose()?)
2931
}
3032

31-
fn remove(&mut self, key: String) -> Result<()> {
33+
fn remove(&self, key: String) -> Result<()> {
3234
self.tree.remove(key)?.ok_or(KvsError::KeyNotFound)?;
3335
self.tree.flush()?;
3436
Ok(())

kvs-client-server/src/lib.rs

+2
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ pub use engines::{KvStore, KvsEngine, SledKvsEngine};
33
pub use error::{KvsError, Result};
44
pub use network::Request;
55
pub use server::Server;
6+
pub use thread_pool::{NaiveThreadPool, SharedQueueThreadPool, ThreadPool};
67

78
mod client;
89
mod engines;
910
mod error;
1011
mod network;
1112
mod server;
13+
mod thread_pool;

kvs-client-server/src/server.rs

+11-9
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::network::{GetResponse, RemoveResponse, Request, SetResponse};
2-
use crate::{KvsEngine, Result};
2+
use crate::{KvsEngine, Result, SharedQueueThreadPool, ThreadPool};
33
use log::{debug, error, info};
44
use serde_json::Deserializer;
55
use std::io::{BufReader, BufWriter, Write};
@@ -22,23 +22,25 @@ impl<E: KvsEngine> Server<E> {
2222
Server { listener, engine }
2323
}
2424

25-
pub fn serve(&mut self) -> Result<()> {
25+
pub fn serve(&self) -> Result<()> {
2626
debug!("Waiting for connections...");
27+
let thread_pool = SharedQueueThreadPool::new(num_cpus::get() as u32)?;
2728
let listnr = self.listener.try_clone().unwrap();
2829
for stream in listnr.incoming() {
29-
match stream {
30+
let engine = self.engine.clone();
31+
thread_pool.spawn(move || match stream {
3032
Ok(stream) => {
31-
if let Err(e) = self.handle_client(stream) {
33+
if let Err(e) = Self::handle_client(engine, stream) {
3234
error!("Error on serving client: {}", e);
3335
}
3436
}
3537
Err(e) => error!("Connection failed, reason: {:?}", e),
36-
}
38+
})
3739
}
3840
Ok(())
3941
}
4042

41-
fn handle_client(&mut self, stream: TcpStream) -> Result<()> {
43+
fn handle_client(engine: E, stream: TcpStream) -> Result<()> {
4244
debug!(
4345
"Connection established from {}, waiting for data...",
4446
stream.peer_addr()?
@@ -64,21 +66,21 @@ impl<E: KvsEngine> Server<E> {
6466
debug!("Received request from {}: {:?}", peer_addr, req);
6567
match req {
6668
Request::Get { key } => {
67-
let engine_response = match self.engine.get(key) {
69+
let engine_response = match engine.get(key) {
6870
Ok(value) => GetResponse::Ok(value),
6971
Err(err) => GetResponse::Err(format!("{}", err)),
7072
};
7173
send_response!(engine_response);
7274
}
7375
Request::Set { key, value } => {
74-
let engine_response = match self.engine.set(key, value) {
76+
let engine_response = match engine.set(key, value) {
7577
Ok(_) => SetResponse::Ok(()),
7678
Err(err) => SetResponse::Err(format!("{}", err)),
7779
};
7880
send_response!(engine_response);
7981
}
8082
Request::Remove { key } => {
81-
let engine_response = match self.engine.remove(key) {
83+
let engine_response = match engine.remove(key) {
8284
Ok(_) => RemoveResponse::Ok(()),
8385
Err(err) => RemoveResponse::Err(format!("{}", err)),
8486
};
+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
use crate::Result;
2+
3+
mod naive;
4+
mod shared_queue;
5+
6+
pub use self::naive::NaiveThreadPool;
7+
pub use self::shared_queue::SharedQueueThreadPool;
8+
9+
pub trait ThreadPool {
10+
/// Creates a thread pool
11+
///
12+
/// return error if failed to create any thread
13+
fn new(threads: u32) -> Result<Self>
14+
where
15+
Self: Sized;
16+
17+
fn spawn<F>(&self, job: F)
18+
where
19+
F: FnOnce() + Send + 'static;
20+
}
+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
use crate::{Result, ThreadPool};
2+
3+
use std::thread;
4+
5+
pub struct NaiveThreadPool;
6+
7+
impl ThreadPool for NaiveThreadPool {
8+
fn new(_threads: u32) -> Result<Self> {
9+
Ok(NaiveThreadPool)
10+
}
11+
12+
fn spawn<F>(&self, job: F)
13+
where
14+
F: FnOnce() + Send + 'static,
15+
{
16+
thread::spawn(job);
17+
}
18+
}

0 commit comments

Comments
 (0)