Skip to content

Commit 17679be

Browse files
committed
Add SharedQueuePool for part 4
1 parent 6a939f8 commit 17679be

File tree

6 files changed

+61
-15
lines changed

6 files changed

+61
-15
lines changed

kvs-client-server/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ pub use engines::{KvStore, KvsEngine, SledKvsEngine};
33
pub use error::{KvsError, Result};
44
pub use network::Request;
55
pub use server::Server;
6-
pub use thread_pool::{NaiveThreadPool, ThreadPool};
6+
pub use thread_pool::{NaiveThreadPool, SharedQueueThreadPool, ThreadPool};
77

88
mod client;
99
mod engines;

kvs-client-server/src/server.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::network::{GetResponse, RemoveResponse, Request, SetResponse};
2-
use crate::{KvsEngine, NaiveThreadPool, Result, ThreadPool};
2+
use crate::{KvsEngine, Result, SharedQueueThreadPool, ThreadPool};
33
use log::{debug, error, info};
44
use serde_json::Deserializer;
55
use std::io::{BufReader, BufWriter, Write};
@@ -24,7 +24,7 @@ impl<E: KvsEngine> Server<E> {
2424

2525
pub fn serve(&self) -> Result<()> {
2626
debug!("Waiting for connections...");
27-
let thread_pool = NaiveThreadPool::new(1)?;
27+
let thread_pool = SharedQueueThreadPool::new(num_cpus::get() as u32)?;
2828
let listnr = self.listener.try_clone().unwrap();
2929
for stream in listnr.incoming() {
3030
let engine = self.engine.clone();

kvs-client-server/src/thread_pool/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
use crate::Result;
22

33
mod naive;
4+
mod shared_queue;
45

56
pub use self::naive::NaiveThreadPool;
7+
pub use self::shared_queue::SharedQueueThreadPool;
68

79
pub trait ThreadPool {
810
/// Creates a thread pool

kvs-client-server/src/thread_pool/naive.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
use super::ThreadPool;
2-
use crate::Result;
1+
use crate::{Result, ThreadPool};
32

43
use std::thread;
4+
55
pub struct NaiveThreadPool;
66

77
impl ThreadPool for NaiveThreadPool {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
use crate::{Result, ThreadPool};
2+
use crossbeam::crossbeam_channel::{unbounded, Receiver, Sender};
3+
4+
use std::thread;
5+
6+
pub struct SharedQueueThreadPool(Sender<Box<dyn FnOnce() + Send + 'static>>);
7+
#[derive(Clone)]
8+
struct TaskReceiver(Receiver<Box<dyn FnOnce() + Send + 'static>>);
9+
10+
impl ThreadPool for SharedQueueThreadPool {
11+
fn new(threads: u32) -> Result<Self> {
12+
let (sender, recv) = unbounded::<Box<dyn FnOnce() + Send + 'static>>();
13+
for _ in 0..threads {
14+
let r_clone = TaskReceiver(recv.clone());
15+
thread::spawn(move || {
16+
while let Ok(job) = r_clone.0.recv() {
17+
job();
18+
}
19+
});
20+
}
21+
22+
Ok(SharedQueueThreadPool(sender))
23+
}
24+
25+
fn spawn<F>(&self, job: F)
26+
where
27+
F: FnOnce() + Send + 'static,
28+
{
29+
self.0.send(Box::new(job)).unwrap();
30+
}
31+
}
32+
33+
impl Drop for TaskReceiver {
34+
fn drop(&mut self) {
35+
if thread::panicking() {
36+
let r_clone = self.clone();
37+
thread::spawn(move || {
38+
while let Ok(job) = r_clone.0.recv() {
39+
job();
40+
}
41+
});
42+
}
43+
}
44+
}

kvs-client-server/tests/thread_pool.rs

+10-10
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
22
use std::sync::Arc;
33

44
use kvs::Result;
5-
use kvs::{NaiveThreadPool, ThreadPool};
5+
use kvs::{NaiveThreadPool, SharedQueueThreadPool, ThreadPool};
66

77
use crossbeam_utils::sync::WaitGroup;
88

@@ -52,19 +52,19 @@ fn naive_thread_pool_spawn_counter() -> Result<()> {
5252
spawn_counter(pool)
5353
}
5454

55-
// #[test]
56-
// fn shared_queue_thread_pool_spawn_counter() -> Result<()> {
57-
// let pool = SharedQueueThreadPool::new(4)?;
58-
// spawn_counter(pool)
59-
// }
55+
#[test]
56+
fn shared_queue_thread_pool_spawn_counter() -> Result<()> {
57+
let pool = SharedQueueThreadPool::new(4)?;
58+
spawn_counter(pool)
59+
}
6060

6161
// #[test]
6262
// fn rayon_thread_pool_spawn_counter() -> Result<()> {
6363
// let pool = RayonThreadPool::new(4)?;
6464
// spawn_counter(pool)
6565
// }
6666

67-
// #[test]
68-
// fn shared_queue_thread_pool_panic_task() -> Result<()> {
69-
// spawn_panic_task::<SharedQueueThreadPool>()
70-
// }
67+
#[test]
68+
fn shared_queue_thread_pool_panic_task() -> Result<()> {
69+
spawn_panic_task::<SharedQueueThreadPool>()
70+
}

0 commit comments

Comments
 (0)