1
1
use crate :: network:: { GetResponse , RemoveResponse , Request , SetResponse } ;
2
- use crate :: { KvsEngine , Result } ;
2
+ use crate :: { KvsEngine , NaiveThreadPool , Result , ThreadPool } ;
3
3
use log:: { debug, error, info} ;
4
4
use serde_json:: Deserializer ;
5
5
use std:: io:: { BufReader , BufWriter , Write } ;
@@ -24,21 +24,23 @@ impl<E: KvsEngine> Server<E> {
24
24
25
25
pub fn serve ( & self ) -> Result < ( ) > {
26
26
debug ! ( "Waiting for connections..." ) ;
27
+ let thread_pool = NaiveThreadPool :: new ( 1 ) ?;
27
28
let listnr = self . listener . try_clone ( ) . unwrap ( ) ;
28
29
for stream in listnr. incoming ( ) {
29
- match stream {
30
+ let engine = self . engine . clone ( ) ;
31
+ thread_pool. spawn ( move || match stream {
30
32
Ok ( stream) => {
31
- if let Err ( e) = self . handle_client ( stream) {
33
+ if let Err ( e) = Self :: handle_client ( engine , stream) {
32
34
error ! ( "Error on serving client: {}" , e) ;
33
35
}
34
36
}
35
37
Err ( e) => error ! ( "Connection failed, reason: {:?}" , e) ,
36
- }
38
+ } )
37
39
}
38
40
Ok ( ( ) )
39
41
}
40
42
41
- fn handle_client ( & self , stream : TcpStream ) -> Result < ( ) > {
43
+ fn handle_client ( engine : E , stream : TcpStream ) -> Result < ( ) > {
42
44
debug ! (
43
45
"Connection established from {}, waiting for data..." ,
44
46
stream. peer_addr( ) ?
@@ -64,21 +66,21 @@ impl<E: KvsEngine> Server<E> {
64
66
debug ! ( "Received request from {}: {:?}" , peer_addr, req) ;
65
67
match req {
66
68
Request :: Get { key } => {
67
- let engine_response = match self . engine . get ( key) {
69
+ let engine_response = match engine. get ( key) {
68
70
Ok ( value) => GetResponse :: Ok ( value) ,
69
71
Err ( err) => GetResponse :: Err ( format ! ( "{}" , err) ) ,
70
72
} ;
71
73
send_response ! ( engine_response) ;
72
74
}
73
75
Request :: Set { key, value } => {
74
- let engine_response = match self . engine . set ( key, value) {
76
+ let engine_response = match engine. set ( key, value) {
75
77
Ok ( _) => SetResponse :: Ok ( ( ) ) ,
76
78
Err ( err) => SetResponse :: Err ( format ! ( "{}" , err) ) ,
77
79
} ;
78
80
send_response ! ( engine_response) ;
79
81
}
80
82
Request :: Remove { key } => {
81
- let engine_response = match self . engine . remove ( key) {
83
+ let engine_response = match engine. remove ( key) {
82
84
Ok ( _) => RemoveResponse :: Ok ( ( ) ) ,
83
85
Err ( err) => RemoveResponse :: Err ( format ! ( "{}" , err) ) ,
84
86
} ;
0 commit comments