diff --git a/.githooks/pre-commit b/.githooks/pre-commit new file mode 100755 index 0000000..d33b42b --- /dev/null +++ b/.githooks/pre-commit @@ -0,0 +1,44 @@ +#!/bin/sh + +echo "Running pre-commit checks..." +# Format all Rust files +echo "\nChecking formatting..." +cargo fmt -- --check +if [ $? -ne 0 ]; then + echo "Formatting check failed. Please run 'cargo fmt' and commit again." + exit 1 +fi + +# Check compilation +echo "\nChecking compilation..." +cargo check +if [ $? -ne 0 ]; then + echo "Compilation check failed. Please fix the errors and commit again." + exit 1 +fi + +echo "\nChecking release compilation..." +cargo check --release +if [ $? -ne 0 ]; then + echo "Release compilation check failed. Please fix the errors and commit again." + exit 1 +fi + +# Run clippy +echo "\nRunning clippy..." +cargo clippy -- -D warnings +if [ $? -ne 0 ]; then + echo "Clippy check failed. Please fix the warnings and commit again." + exit 1 +fi + +# Run all tests +echo "\nRunning tests..." +cargo test +if [ $? -ne 0 ]; then + echo "Tests failed. Please fix the failing tests and commit again." + exit 1 +fi + +echo "\nAll checks completed successfully." +exit 0 \ No newline at end of file diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..a0ee823 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,59 @@ +name: Chat Server Test + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + +jobs: + test: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + + - name: Set up Rust + uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: stable + + - name: Build server and client + run: cargo build + + - name: run tests + run: cargo test + + - name: Set log level + run: export RUST_LOG=info + + - name: Run server + run: cargo run --bin server 0.0.0.0:12345 & + + - name: Wait for server to start + run: sleep 5 + + - name: Run client and send test message + run: | + cargo run --bin client 127.0.0.1:12345 TestUser > client_log.txt 2>&1 & + sleep 5 # Give some time for the message to be sent and processed + + - name: Check client logs + run: | + if grep -q "Welcome to the chat, TestUser!" client_log.txt; then + echo "client connected to server" + else + echo "client did not connect to server" + cat client_log.txt + exit 1 + fi + + - name: Check for failures + run: | + if [ $? -ne 0 ]; then + echo "Client or server exited with a failure" + exit 1 + else + echo "Client and server ran successfully" + fi diff --git a/.gitignore b/.gitignore index 6985cf1..204b7bb 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,5 @@ Cargo.lock # MSVC Windows builds of rustc generate these, which store debugging information *.pdb + +*.vscode diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..162eb6b --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,8 @@ +[workspace] +members = [ + "server", + "client", + "common", +] + +resolver = "2" diff --git a/README.md b/README.md index 8c4d4e1..fd17874 100644 --- a/README.md +++ b/README.md @@ -69,3 +69,27 @@ without error, and is free of clippy errors. send a message to the server from the client. Make sure that niether the server or client exit with a failure. This action should be run anytime new code is pushed to a branch or landed on the main branch. + +## Run pre-commit hook: +``` +sh .githooks/pre-commit +``` + +## Run tests: +``` +cargo test +``` + +## Launch server: +``` +cargo run --bin server : +``` + +## Launch client: +``` +cargo run --bin client : +``` + +## GitHub Actions + +The GitHub Actions workflow is defined in `.github/workflows/ci.yml`. It will run the server and client and check if the client can send a message to the server. diff --git a/client.sh b/client.sh new file mode 100755 index 0000000..0fdcc64 --- /dev/null +++ b/client.sh @@ -0,0 +1,2 @@ +#!/bin/bash +RUST_LOG=info cargo run --bin client $1:$2 $3 diff --git a/client/Cargo.toml b/client/Cargo.toml new file mode 100644 index 0000000..de82b91 --- /dev/null +++ b/client/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "client" +version = "0.1.0" +edition = "2021" + +[dependencies] +env_logger = "0.11.5" +futures-util = "0.3.31" +log = "0.4.22" +tokio = { version = "1.40.0", features = ["full"] } +tokio-tungstenite = "0.24.0" +common = { path = "../common" } +[dev-dependencies] +tokio-test = "0.4.4" +futures = "0.3.31" \ No newline at end of file diff --git a/client/src/main.rs b/client/src/main.rs new file mode 100644 index 0000000..850b267 --- /dev/null +++ b/client/src/main.rs @@ -0,0 +1,311 @@ +//! This module implements a WebSocket-based chat client. +//! +//! The client connects to a chat server, allows users to join with a username, +//! send messages, and leave the chat. It handles incoming and outgoing messages, +//! parses user input, and manages the WebSocket connection. +//! +//! Key features: +//! - Asynchronous I/O using tokio +//! - WebSocket communication with tokio-tungstenite +//! - Message serialization and deserialization with bincode +//! - Handling of stdin for user input and stdout for displaying messages +//! - Graceful shutdown on Ctrl+C + +use futures_util::{SinkExt, StreamExt}; +use log::{error, info}; +use std::env; +use std::io::{self, Write}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::signal; +use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; + +use common::utils::{ClientMessage, ServerMessage}; + +#[tokio::main] +async fn main() { + // Initialize the logger (you'll need to add this) + env_logger::init(); + + let url = env::args().nth(1).unwrap_or_else(|| { + print!("Please enter the client URL (e.g., 127.0.0.1:12345): "); + io::stdout().flush().unwrap(); + let mut input = String::new(); + io::stdin() + .read_line(&mut input) + .expect("Failed to read line"); + input.trim().to_string() + }); + + let username = env::args().nth(2).unwrap_or_else(|| { + print!("Please enter your username (e.g., John): "); + io::stdout().flush().unwrap(); + let mut input = String::new(); + io::stdin() + .read_line(&mut input) + .expect("Failed to read line"); + input.trim().to_string() + }); + let url = format!("ws://{}", url); + let (ws_stream, _) = connect_async(&url).await.expect("Failed to connect"); + info!("WebSocket handshake has been successfully completed"); + let (stdin_tx, mut stdin_rx) = tokio::sync::mpsc::unbounded_channel::(); + tokio::spawn(read_stdin(stdin_tx.clone())); + let (stdout_tx, mut stdout_rx) = tokio::sync::mpsc::unbounded_channel::(); + tokio::spawn(async move { + while let Some(message) = stdout_rx.recv().await { + let mut stdout = tokio::io::stdout(); + if let Err(e) = stdout.write_all(message.as_bytes()).await { + error!("Failed to write to stdout: {}", e); + } + if let Err(e) = stdout.write_all(b"\n").await { + error!("Failed to write to stdout: {}", e); + } + } + }); + + let (mut write, read) = ws_stream.split(); + + // Send Join message + let join_msg = ClientMessage::Join { + username: username.to_string(), + }; + let join_msg = join_msg.to_json().unwrap(); + { + if let Err(e) = write + .send(tokio_tungstenite::tungstenite::Message::Binary(join_msg)) + .await + { + error!("Failed to send join message: {}", e); + return; + } + } + let stdin_to_ws = async { + while let Some(message) = stdin_rx.recv().await { + if write.send(message.clone()).await.is_err() { + error!("Failed to send message to server"); + return; + } + if let Ok(msg) = ClientMessage::from_json(&message.into_data()) { + if msg == ClientMessage::Leave { + if let Err(e) = write.close().await { + error!("Failed to close WebSocket connection: {}", e); + } + return; + } + } + } + }; + + let ws_to_stdout = { + read.for_each(|message| async { + if let Ok(message) = message { + if message.is_close() { + info!("Server closed the connection"); + std::process::exit(0); + } + let server_message = String::from_utf8(message.clone().into_data()); + if let Ok(msg) = ServerMessage::from_json(&message.into_data()) { + if let Err(e) = stdout_tx.send(format!("{}: {}", msg.from, msg.message)) { + error!("Failed to send message to stdout: {}", e); + } + } else if let Err(e) = stdout_tx.send(server_message.unwrap()) { + error!("Failed to send message to stdout: {}", e); + } + } else { + error!( + "Error receiving message from server: {}", + message.unwrap_err().to_string() + ); + } + }) + }; + + tokio::spawn(handle_ctrl_c(stdin_tx.clone())); + tokio::select! { + _ = stdin_to_ws => (), + _ = ws_to_stdout => (), + } + info!("Exiting"); + std::process::exit(0); +} + +async fn read_stdin(tx: tokio::sync::mpsc::UnboundedSender) { + let stdin = tokio::io::stdin(); + let mut reader = BufReader::new(stdin); + let mut buffer = String::new(); + loop { + buffer.clear(); + match reader.read_line(&mut buffer).await { + Ok(0) => continue, // EOF + Ok(_) => { + if buffer.trim().is_empty() { + continue; + } + let input = buffer.trim(); + ClientMessage::parse(input).map_or_else( + |error| error!("Failed to parse message: {}", error.to_string()), + |message| { + message.to_json().map_or_else( + |error| error!("Failed to serialize message: {}", error.to_string()), + |serialized| { + if let Err(e) = tx.send(Message::binary(serialized)) { + error!("Failed to send message: {}", e); + } + }, + ); + }, + ); + } + Err(e) => { + error!("Failed to read from stdin: {}", e); + continue; + } + } + } +} + +async fn handle_ctrl_c(tx: tokio::sync::mpsc::UnboundedSender) { + signal::ctrl_c().await.expect("Failed to listen for Ctrl+C"); + send_leave_message(tx).await; +} + +async fn send_leave_message(tx: tokio::sync::mpsc::UnboundedSender) { + info!("Sending leave message."); + let leave_msg = ClientMessage::Leave; + if let Err(e) = tx.send(Message::binary(leave_msg.to_json().unwrap())) { + error!("Failed to send leave message: {}", e); + } + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::channel::mpsc; + use futures_util::SinkExt; + + #[test] + fn test_client_message_parse_join() { + let input = "join testuser"; + let expected = ClientMessage::Join { + username: "testuser".to_string(), + }; + assert_eq!(ClientMessage::parse(input).unwrap(), expected); + } + + #[test] + fn test_client_message_parse_send() { + let input = "send Hello, world!"; + let expected = ClientMessage::Send { + message: "Hello, world!".to_string(), + }; + assert_eq!(ClientMessage::parse(input).unwrap(), expected); + } + + #[test] + fn test_client_message_parse_leave() { + let input = "leave"; + let expected = ClientMessage::Leave; + assert_eq!(ClientMessage::parse(input).unwrap(), expected); + } + + #[test] + fn test_client_message_parse_implicit_send() { + let input = "Hello, everyone!"; + let expected = ClientMessage::Send { + message: "Hello, everyone!".to_string(), + }; + assert_eq!(ClientMessage::parse(input).unwrap(), expected); + } + + #[test] + fn test_client_message_serialization() { + let message = ClientMessage::Join { + username: "testuser".to_string(), + }; + let serialized = message.to_json().unwrap(); + let deserialized = ClientMessage::from_json(&serialized).unwrap(); + assert_eq!(message, deserialized); + } + + #[test] + #[should_panic(expected = "Invalid join message format")] + fn test_client_message_parse_invalid_join() { + ClientMessage::parse("join").unwrap(); + } + + #[tokio::test] + async fn test_client_message_handling() { + let (mut tx, mut rx) = mpsc::unbounded(); + + // Test Join message + let join_msg = "join testuser"; + read_and_send_message(join_msg, &mut tx).await; + + if let Some(message) = rx.next().await { + assert_eq!( + message, + Message::Binary( + ClientMessage::Join { + username: "testuser".to_string() + } + .to_json() + .unwrap() + ) + ); + } else { + panic!("No message received for join"); + } + + // Test Send message + let send_msg = "send Hello, world!"; + read_and_send_message(send_msg, &mut tx).await; + + if let Some(message) = rx.next().await { + assert_eq!( + message, + Message::Binary( + ClientMessage::Send { + message: "Hello, world!".to_string() + } + .to_json() + .unwrap() + ) + ); + } else { + panic!("No message received for send"); + } + + // Test Leave message + let leave_msg = "leave"; + read_and_send_message(leave_msg, &mut tx).await; + + if let Some(message) = rx.next().await { + assert_eq!( + message, + Message::Binary(ClientMessage::Leave.to_json().unwrap()) + ); + } else { + panic!("No message received for leave"); + } + } + + async fn read_and_send_message(input: &str, tx: &mut mpsc::UnboundedSender) { + let message = ClientMessage::parse(input).unwrap(); + tx.send(Message::Binary(message.to_json().unwrap())) + .await + .unwrap(); + } + + #[test] + fn test_server_message_deserialization() { + let server_message = ServerMessage { + from: "Server".to_string(), + message: "Welcome to the chat!".to_string(), + }; + let serialized = server_message.to_json().unwrap(); + let deserialized = ServerMessage::from_json(&serialized).unwrap(); + assert_eq!(server_message.from, deserialized.from); + assert_eq!(server_message.message, deserialized.message); + } +} diff --git a/common/Cargo.toml b/common/Cargo.toml new file mode 100644 index 0000000..a19a94e --- /dev/null +++ b/common/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "common" +version = "0.1.0" +edition = "2021" + +[lib] +path = "lib.rs" +[dependencies] +bincode = "1.3.3" +serde = {version = "1.0.210",features = ["derive"]} \ No newline at end of file diff --git a/common/lib.rs b/common/lib.rs new file mode 100644 index 0000000..b5614dd --- /dev/null +++ b/common/lib.rs @@ -0,0 +1 @@ +pub mod utils; diff --git a/common/utils.rs b/common/utils.rs new file mode 100644 index 0000000..b8bc0a2 --- /dev/null +++ b/common/utils.rs @@ -0,0 +1,94 @@ +use bincode::Result; +use serde::de::Error as SerdeError; +use serde::{Deserialize, Serialize}; +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] +pub enum ClientMessage { + Join { username: String }, + Send { message: String }, + Leave, +} + +impl ClientMessage { + pub fn from_json(json: &[u8]) -> Result { + bincode::deserialize(json) + } + + pub fn to_json(&self) -> Result> { + bincode::serialize(self) + } + + pub fn parse(json: &str) -> Result { + let parts: Vec<&str> = json.split_whitespace().collect(); + match parts[0].to_lowercase().as_str() { + "join" => { + if parts.len() == 2 { + Ok(ClientMessage::Join { + username: parts[1].to_string(), + }) + } else { + Err(bincode::Error::custom("Invalid join message format")) + } + } + "send" => { + if parts.len() > 1 { + Ok(ClientMessage::Send { + message: parts[1..].join(" "), + }) + } else { + Err(bincode::Error::custom("Invalid send message format")) + } + } + "leave" => { + if parts.len() == 1 { + Ok(ClientMessage::Leave) + } else { + Err(bincode::Error::custom("Invalid leave message format")) + } + } + _ => { + if parts[0] == "send" { + Ok(ClientMessage::Send { + message: parts[1..].join(" "), + }) + } else { + Ok(ClientMessage::Send { + message: parts[0..].join(" "), + }) + } + } + } + } + + pub fn parse_to_server_message(&self, user: &String) -> ServerMessage { + match self { + ClientMessage::Join { username } => ServerMessage { + from: username.clone(), + message: "joined the chat".to_string(), + }, + ClientMessage::Send { message } => ServerMessage { + from: user.to_string(), + message: message.clone(), + }, + ClientMessage::Leave => ServerMessage { + from: user.to_string(), + message: "left the chat".to_string(), + }, + } + } +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +pub struct ServerMessage { + pub from: String, + pub message: String, +} + +impl ServerMessage { + pub fn to_json(&self) -> Result> { + bincode::serialize(self) + } + + pub fn from_json(json: &[u8]) -> Result { + bincode::deserialize(json) + } +} diff --git a/pre-commit-config.yaml b/pre-commit-config.yaml new file mode 100644 index 0000000..fde5a9c --- /dev/null +++ b/pre-commit-config.yaml @@ -0,0 +1,32 @@ +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.4.0 + hooks: + - id: trailing-whitespace + - id: end-of-file-fixer + - id: check-yaml + - id: check-added-large-files + + - repo: local + hooks: + - id: cargo-fmt + name: Cargo fmt + entry: cargo fmt --all -- --check + language: system + types: [rust] + pass_filenames: false + + - id: cargo-check + name: Cargo check + entry: cargo check + language: system + types: [rust] + pass_filenames: false + + - id: cargo-clippy + name: Cargo clippy + entry: cargo clippy -- -D warnings + language: system + types: [rust] + pass_filenames: false + diff --git a/server.sh b/server.sh new file mode 100755 index 0000000..ed704a0 --- /dev/null +++ b/server.sh @@ -0,0 +1,2 @@ +#!/bin/bash +RUST_LOG=info cargo run --bin server $1:$2 diff --git a/server/Cargo.toml b/server/Cargo.toml new file mode 100644 index 0000000..1c55838 --- /dev/null +++ b/server/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "server" +version = "0.1.0" +edition = "2021" + +[dependencies] +dashmap = "6.1.0" +env_logger = "0.11.5" +futures-util = "0.3.31" +log = "0.4.22" +tokio = { version = "1.40.0", features = ["full"] } +tokio-tungstenite = "0.24.0" +common = { path = "../common" } + + +[dev-dependencies] +tokio-util = "0.7.8" \ No newline at end of file diff --git a/server/src/main.rs b/server/src/main.rs new file mode 100644 index 0000000..0d000f1 --- /dev/null +++ b/server/src/main.rs @@ -0,0 +1,408 @@ +//! A simple WebSocket chat server implementation. +//! +//! This module provides a WebSocket-based chat server that allows multiple clients to connect, +//! join the chat with a username, send messages, and leave the chat. +//! +//! # Features +//! +//! - WebSocket-based communication +//! - Username registration +//! - Broadcasting messages to all connected clients +//! - Graceful handling of client disconnections +//! +//! # Examples +//! +//! To run the server: +//! +//! ```bash +//! cargo run --example server 127.0.0.1:12345 +//! ``` +//! +//! To run a client (in a separate terminal): +//! +//! ```bash +//! cargo run --example client ws://127.0.0.1:12345/ +//! ``` +//! + +use common::utils::{ClientMessage, ServerMessage}; +use dashmap::DashMap; +use futures_util::{SinkExt, StreamExt}; +use log::info; +use std::io::Write; +use std::{env, net::SocketAddr, sync::Arc}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::signal; +use tokio::sync::broadcast; +use tokio_tungstenite::tungstenite::protocol::Message; + +type PeerMap = Arc>; + +async fn handle_connection( + peer_map: PeerMap, + raw_stream: TcpStream, + addr: SocketAddr, + tx: broadcast::Sender, + tx_close: broadcast::Sender, +) { + info!("Incoming TCP connection from: {}", addr); + + let ws_stream = tokio_tungstenite::accept_async(raw_stream) + .await + .expect("Error during the websocket handshake occurred"); + info!("WebSocket connection established: {}", addr); + + let mut rx = tx.subscribe(); // Each client gets a subscription + let mut rx_close = tx_close.subscribe(); + let (mut outgoing, mut incoming) = ws_stream.split(); + loop { + tokio::select! { + Some(Ok(msg)) = incoming.next() => { + if msg.is_close() { + info!("{} disconnected", addr); + if let Some(username) = peer_map.get(&addr) { + let _ = tx.send(ClientMessage::Leave.parse_to_server_message(&username)); + } + peer_map.remove(&addr); + let _ = outgoing.close().await; + break; + } + + if let Ok(msg) = ClientMessage::from_json(&msg.into_data()) + { + if msg == ClientMessage::Leave { + if let Some(username) = peer_map.get(&addr) { + let _ = tx.send(msg.parse_to_server_message(&username)); + } + peer_map.remove(&addr); + } + if let ClientMessage::Join { username } = msg.clone() { + if peer_map.iter().any(|entry| entry.value() == &username) { + let _ = outgoing + .send(Message::text(format!("Sorry, username {} already taken.", username))) + .await; + } else if peer_map.contains_key(&addr) { + let joined_user = peer_map.get(&addr).unwrap().clone(); + let _ = outgoing + .send(Message::text(format!("{} already joined in chat ,you can leave and join as {}", joined_user,username))) + .await; + }else{ + peer_map.insert(addr, username.clone()); + + // Notify current client of successful join + // Notify current client of successful join with additional instructions + let welcome_message = format!( + "Welcome to the chat, {}!\n\nYou can interact with Server as follows:\n1. leave - to leave from room.\n2. join - to join to room.\n3. send or - to send message in the room", + username + ); + let _ = outgoing + .send(Message::text(welcome_message)) + .await; + if let Some(username) = peer_map.get(&addr) { + let _ = tx.send(msg.parse_to_server_message(&username)); + } + } + } else if let Some(username) = peer_map.get(&addr) { + let _ = tx.send(msg.parse_to_server_message(&username)); + } + } + }, + + Ok(message) = rx.recv() => { + + if let Some(info) = peer_map.get(&addr) { + + if info.clone() != message.from { + let _ = outgoing + .send(Message::binary(message.to_json().unwrap())) + .await; + } + } + } + Ok(message) = rx_close.recv() => { //this is for ctrl+c + if peer_map.get(&addr).is_some() { + let _ = outgoing + .send(message) + .await; + peer_map.remove(&addr); + let _ = outgoing.close().await; + break; + } + } + + else => { + if let Some(username) = peer_map.get(&addr) { + let _ = tx.send(ClientMessage::Leave.parse_to_server_message(&username)); + info!("{:?} disconnected", username); + } + peer_map.remove(&addr); + let _ = outgoing.close().await; + break; + } + } + } +} + +#[tokio::main] +async fn main() { + env_logger::init(); + + let addr = env::args().nth(1).unwrap_or_else(|| { + print!("Please enter the server URL (e.g., 0.0.0.0:12345): "); + std::io::stdout().flush().unwrap(); + let mut input = String::new(); + std::io::stdin() + .read_line(&mut input) + .expect("Failed to read line"); + input.trim().to_string() + }); + + let state = Arc::new(DashMap::new()); + // Create the event loop and TCP listener we'll accept connections on. + + // Setup TCP listener for websocket connections + + let try_socket = TcpListener::bind(&addr).await; + let listener = try_socket.expect("Failed to bind"); + let (tx, _) = broadcast::channel::(100); // Broadcast channel for messages + let (tx_close, _) = broadcast::channel::(100); + println!("****************************************************************"); + println!( + "* Listening on: {} *", + addr + ); + println!("* To connect to chat use below command in separate terminal: *"); + println!("* cargo run --bin client : *"); + println!("****************************************************************"); + loop { + tokio::select! { + Ok((stream, addr)) = listener.accept() => { + tokio::spawn(handle_connection( + state.clone(), + stream, + addr, + tx.clone(), + tx_close.clone(), + )); + } + _ = handle_ctrl_c(tx_close.clone()) => { + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + break; + } + } + } +} + +async fn handle_ctrl_c(tx_close: broadcast::Sender) { + signal::ctrl_c().await.expect("Failed to listen for Ctrl+C"); + info!("Received Ctrl+C, sending leave message."); + let _ = tx_close.send(Message::Close(None)); +} + +#[cfg(test)] +mod tests { + + use super::*; + + #[test] + fn test_client_message_serialization() { + let join_msg = ClientMessage::Join { + username: "Alice".to_string(), + }; + let serialized = join_msg.to_json().unwrap(); + let deserialized: ClientMessage = ClientMessage::from_json(&serialized).unwrap(); + assert_eq!(join_msg, deserialized); + + let send_msg = ClientMessage::Send { + message: "Hello, world!".to_string(), + }; + let serialized = send_msg.to_json().unwrap(); + let deserialized: ClientMessage = ClientMessage::from_json(&serialized).unwrap(); + assert_eq!(send_msg, deserialized); + + let leave_msg = ClientMessage::Leave; + let serialized = leave_msg.to_json().unwrap(); + let deserialized: ClientMessage = ClientMessage::from_json(&serialized).unwrap(); + assert_eq!(leave_msg, deserialized); + } + + #[test] + fn test_client_message_parsing() { + let join_msg = ClientMessage::Join { + username: "Alice".to_string(), + }; + let parsed = join_msg.parse_to_server_message(&"Alice".to_string()); + assert_eq!( + parsed, + ServerMessage { + from: "Alice".to_string(), + message: "joined the chat".to_string(), + } + ); + + let send_msg = ClientMessage::Send { + message: "Hello, world!".to_string(), + }; + let parsed = send_msg.parse_to_server_message(&"Bob".to_string()); + assert_eq!( + parsed, + ServerMessage { + from: "Bob".to_string(), + message: "Hello, world!".to_string(), + } + ); + + let leave_msg = ClientMessage::Leave; + let parsed = leave_msg.parse_to_server_message(&"Charlie".to_string()); + assert_eq!( + parsed, + ServerMessage { + from: "Charlie".to_string(), + message: "left the chat".to_string(), + } + ); + } + + #[test] + fn test_server_message_serialization() { + let msg = ServerMessage { + from: "Alice".to_string(), + message: "Hello, everyone!".to_string(), + }; + let serialized = msg.to_json().unwrap(); + let deserialized: ServerMessage = ServerMessage::from_json(&serialized).unwrap(); + assert_eq!(msg, deserialized); + } + + #[tokio::test] + async fn test_handle_connection() { + use futures_util::StreamExt; + use tokio::net::TcpListener; + use tokio_tungstenite::connect_async; + + // Setup a mock server + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + let peer_map = Arc::new(DashMap::new()); + let (tx, _) = broadcast::channel::(100); + let (tx_close, _) = broadcast::channel::(100); + // Spawn the server handler + let handle = tokio::spawn(async move { + let (stream, _) = listener.accept().await.unwrap(); + handle_connection(peer_map, stream, addr, tx, tx_close).await; + }); + + // Connect a mock client + let (ws_stream, _) = connect_async(format!("ws://{}", addr)).await.unwrap(); + + let (mut write, mut read) = ws_stream.split(); + // Test joining + let join_msg = ClientMessage::Join { + username: "TestUser".to_string(), + }; + let serialized = join_msg.to_json().unwrap(); + write.send(Message::Binary(serialized)).await.unwrap(); + let welcome_message = "Welcome to the chat, TestUser!\n\nYou can interact with Server as follows:\n1. leave - to leave from room.\n2. join - to join to room.\n3. send or - to send message in the room"; + // Receive the welcome message + if let Some(Ok(msg)) = read.next().await { + assert_eq!(msg, Message::Text(welcome_message.to_string())); + } else { + panic!("Did not receive welcome message"); + } + + // Test sending a message + let send_msg = ClientMessage::Send { + message: "Hello, chat!".to_string(), + }; + let serialized = send_msg.to_json().unwrap(); + write.send(Message::Binary(serialized)).await.unwrap(); + // Test leaving the chat + let leave_msg = ClientMessage::Leave; + let serialized = leave_msg.to_json().unwrap(); + write.send(Message::Binary(serialized)).await.unwrap(); + + // Close the connection + write.close().await.unwrap(); + handle.abort(); + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + } + + #[tokio::test] + async fn test_multiple_clients() { + use futures_util::StreamExt; + use tokio::net::TcpListener; + use tokio_tungstenite::connect_async; + // Setup a mock server + let listener = TcpListener::bind("127.0.0.1:12345").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + let peer_map = Arc::new(DashMap::new()); + let (tx, _) = broadcast::channel::(100); + let (tx_close, _) = broadcast::channel::(100); + + // Spawn the server handler + let server_handle = tokio::spawn(async move { + while let Ok((stream, client_addr)) = listener.accept().await { + let peer_map = peer_map.clone(); + let tx = tx.clone(); + let tx_close = tx_close.clone(); + tokio::spawn(async move { + handle_connection(peer_map, stream, client_addr, tx, tx_close).await; + }); + } + }); + + // Connect two mock clients + let (alice_stream, _) = connect_async(format!("ws://{}", addr)).await.unwrap(); + let (bob_stream, _) = connect_async(format!("ws://{}", addr)).await.unwrap(); + + let (mut alice_write, mut alice_read) = alice_stream.split(); + let (mut bob_write, mut bob_read) = bob_stream.split(); + // Alice joins + let join_msg = ClientMessage::Join { + username: "Alice".to_string(), + }; + let serialized = join_msg.to_json().unwrap(); + alice_write.send(Message::Binary(serialized)).await.unwrap(); + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + // Bob joins + let join_msg = ClientMessage::Join { + username: "Bob".to_string(), + }; + let serialized = join_msg.to_json().unwrap(); + bob_write.send(Message::Binary(serialized)).await.unwrap(); + + // Receive welcome messages + alice_read.next().await; + bob_read.next().await; + + // Alice should receive Bob's message about joining + if let Some(Ok(Message::Binary(msg))) = alice_read.next().await { + let server_msg: ServerMessage = ServerMessage::from_json(&msg).unwrap(); + assert_eq!(server_msg.from, "Bob"); + assert_eq!(server_msg.message, "joined the chat"); + } else { + panic!("Alice did not receive Bob's message"); + } + + // Alice sends a message + let send_msg = ClientMessage::Send { + message: "Hello, Bob!".to_string(), + }; + let serialized = send_msg.to_json().unwrap(); + alice_write.send(Message::Binary(serialized)).await.unwrap(); + // Bob should receive Alice's message + if let Some(Ok(Message::Binary(msg))) = bob_read.next().await { + let server_msg: ServerMessage = ServerMessage::from_json(&msg).unwrap(); + assert_eq!(server_msg.from, "Alice"); + assert_eq!(server_msg.message, "Hello, Bob!"); + } else { + panic!("Bob did not receive Alice's message"); + } + // Close connections + alice_write.close().await.unwrap(); + bob_write.close().await.unwrap(); + server_handle.abort(); + } +} diff --git a/simple_chat.mp4 b/simple_chat.mp4 new file mode 100644 index 0000000..d38d908 Binary files /dev/null and b/simple_chat.mp4 differ