diff --git a/src/main.rs b/src/main.rs index 10a55d4..623cebf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,7 +7,8 @@ mod cache; mod sieve; use bytes::Bytes; -use std::net::SocketAddr; +use cache::CacheWithTTLEntry; +use std::{cell::RefCell, net::SocketAddr, sync::Arc}; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, net::{TcpSocket, TcpStream}, @@ -18,7 +19,9 @@ use std::io; use crate::{cache::CacheWithTTL, sieve::ESieve}; -async fn service(mut socket: TcpStream, addr: SocketAddr) { +type SharedCache = Arc>>>>; + +async fn service(mut socket: TcpStream, addr: SocketAddr, cache: SharedCache) { println!("new client: {}", addr); let mut buf = [0; 1024]; @@ -36,9 +39,19 @@ async fn service(mut socket: TcpStream, addr: SocketAddr) { }; // Write the data back - if let Err(e) = socket.write_all(&buf[0..n]).await { - eprintln!("failed to write to socket; err = {:?}", e); - return; + // FIXME: this is not cancel safe, should move the select! on the reading part only + eprintln!( + "Receieved this buffer from {} => {:?} - but returning a value from cache", + addr, + &buf[0..n] + ); + if let Some(bytes) = cache.borrow_mut().get("user-id-1") { + if let Err(e) = socket.write_all(&bytes).await { + eprintln!("failed to write to socket; err = {:?}", e); + return; + } + } else { + eprintln!("no cache value found!") } }; @@ -60,6 +73,7 @@ async fn main() -> io::Result<()> { // TODO: 1 shared cache, should be multiple // TODO: capacity should be total_capacity / cache_count let mut cache = CacheWithTTL::>::new(1000); + let cache = Arc::new(RefCell::new(cache)); // cache.set( // "key-1", // Bytes::from_static(b"value-1"), @@ -87,7 +101,9 @@ async fn main() -> io::Result<()> { loop { let listen = async { let (socket, addr) = listener.accept().await.unwrap(); - tasks.push(tokio::spawn(async move { service(socket, addr).await })); + tasks.push(tokio::spawn(async move { + service(socket, addr, cache.clone()).await + })); }; select! { diff --git a/src/sieve.rs b/src/sieve.rs index 0c4bb11..73c3bd2 100644 --- a/src/sieve.rs +++ b/src/sieve.rs @@ -4,8 +4,7 @@ use crate::cache::Cache; use std::{ cell::RefCell, collections::{HashMap, VecDeque}, - rc::Rc, - sync::{Mutex, RwLock}, + sync::{Arc, Mutex, RwLock}, }; struct SieveNode { @@ -15,8 +14,8 @@ struct SieveNode { struct Sieve { capacity: usize, - cache: HashMap<&'static str, Rc>>, - log: VecDeque>>, + cache: HashMap<&'static str, Arc>>, + log: VecDeque>>, hand: Option, } @@ -30,7 +29,7 @@ impl Sieve { } } - fn get_mut_node(&mut self, key: &'static str) -> Option>> { + fn get_mut_node(&mut self, key: &'static str) -> Option>> { self.cache.get_mut(key).cloned() } @@ -74,7 +73,7 @@ impl Sieve { // if we do not hit full capacity we have this shortcut if self.log.len() < self.capacity { - let node = Rc::new(RefCell::new(node)); + let node = Arc::new(RefCell::new(node)); self.log.push_front(node.clone()); self.cache.insert(key, node); return; @@ -82,7 +81,7 @@ impl Sieve { // otherwise, we use sieve algorithm and then push self.evict(); - let node = Rc::new(RefCell::new(node)); + let node = Arc::new(RefCell::new(node)); self.log.push_front(node.clone()); self.cache.insert(key, node); } @@ -130,7 +129,10 @@ pub struct ESieve { sieve: Mutex, } -// TODO: miss ttl support, maybe have a wrapper to handle that since it'll be the same impl for every cache +// FIXME: do we need this? +// unsafe impl Send for ESieve {} +// unsafe impl Sync for ESieve {} + impl Cache for ESieve { fn new(capacity: usize) -> Self { Self {