From db738d4313e23fbdc0ec89a1351bf5d407d8a4e6 Mon Sep 17 00:00:00 2001 From: Shivendra Mishra Date: Tue, 27 Jan 2026 10:15:05 +0530 Subject: [PATCH] feat(server): multi client broadcast --- ws_server/src/main.rs | 61 ++++++++++++++++++++++++++++++++----------- 1 file changed, 46 insertions(+), 15 deletions(-) diff --git a/ws_server/src/main.rs b/ws_server/src/main.rs index 4c48686..7e359ef 100644 --- a/ws_server/src/main.rs +++ b/ws_server/src/main.rs @@ -2,21 +2,27 @@ use base64::{Engine as _, engine::general_purpose::STANDARD}; use sha1::{Digest, Sha1}; use std::io::{Read, Result, Write}; use std::net::{TcpListener, TcpStream}; +use std::sync::{Arc, Mutex}; use std::thread; use ws_core::read::read_header; use ws_core::write::send_server_message; -pub fn handle_client(mut stream: TcpStream) -> Result<()> { +pub fn handle_client(mut stream: TcpStream, clients: Arc>>) -> Result<()> { + { + let mut list = clients.lock().unwrap(); + let id = stream.peer_addr().unwrap().port() as usize; + list.push((id,stream.try_clone()?)); + } // these variable are here to read all the buffer sent from the client let mut buffer = Vec::new(); - let mut temp = [0u8;1024]; + let mut temp = [0u8; 1024]; - loop{ + loop { let bytes_read = stream.read(&mut temp).expect("failed to read stream"); //break the connection here if no bytes read - if bytes_read == 0{ + if bytes_read == 0 { break; } @@ -28,7 +34,7 @@ pub fn handle_client(mut stream: TcpStream) -> Result<()> { // CR = Carriage Line = \r = ASCII 13 // LF = Line feed = \n = ASCII 10 // in HTTP protocol this is considered to use when we have to end any message. - if buffer.windows(4).any(|w| w == b"\r\n\r\n"){ + if buffer.windows(4).any(|w| w == b"\r\n\r\n") { break; } } @@ -37,7 +43,10 @@ pub fn handle_client(mut stream: TcpStream) -> Result<()> { let request = String::from_utf8_lossy(&buffer); //check if the request contains the upgrade websocket - if request.contains("Upgrade: websocket") && request.contains("Sec-WebSocket-Key") && request.contains("Sec-WebSocket-Version") { + if request.contains("Upgrade: websocket") + && request.contains("Sec-WebSocket-Key") + && request.contains("Sec-WebSocket-Version") + { let mut websocket_key = String::new(); //get the key to upgrade the protocol for line in request.lines() { @@ -81,8 +90,8 @@ pub fn handle_client(mut stream: TcpStream) -> Result<()> { //close frame from the client if cont_opcode == 0b0000_1000 { - let byte1 = fin_code | 0b0000_1000; - send_server_message(&mut stream, byte1, &frame.decoded_data)?; + let id = stream.peer_addr().unwrap().port() as usize; + remove_client(id, &clients); return Ok(()); } @@ -111,22 +120,38 @@ pub fn handle_client(mut stream: TcpStream) -> Result<()> { } } + let snapshot = { + let list = clients.lock().unwrap(); + list.iter() + .map(|(cid,c)| (*cid,c.try_clone().unwrap())) + .collect::>() + }; + //handle the TEXT and BINARY opcode types //TEXT type opcode = 1 (0b0000_0001) //BINARY type opcode = 2 (0b0000_0010) if opcode == 0b0000_0001 { - if let Ok(text) = String::from_utf8(final_message.clone()){ - println!("Text recieved : {}",text); + if let Ok(text) = String::from_utf8(final_message.clone()) { + println!("Text recieved : {}", text); let byte1 = fin_code | opcode; - send_server_message(&mut stream, byte1, text.as_bytes())?; + for (cid,mut c) in snapshot { + if let Err(_) = send_server_message(&mut c, byte1, text.as_bytes()) { + remove_client(cid, &clients); + } + } + continue; } } else { - println!("Recieved {} of binary message",final_message.len()); + println!("Recieved {} of binary message", final_message.len()); let byte1 = fin_code | opcode; - send_server_message(&mut stream, byte1, &final_message)?; + for (cid,mut c) in snapshot { + if let Err(_) = send_server_message(&mut c, byte1, &final_message) { + remove_client(cid, &clients); + } + } continue; } } @@ -136,14 +161,15 @@ pub fn handle_client(mut stream: TcpStream) -> Result<()> { pub fn main() -> Result<()> { let listener = TcpListener::bind("127.0.0.1:8080")?; + let clients = Arc::new(Mutex::new(Vec::new())); for stream in listener.incoming() { match stream { Ok(stream) => { println!("Connection Established"); - + let clients_clone = Arc::clone(&clients); thread::spawn(move || { - let _ = handle_client(stream); + let _ = handle_client(stream, clients_clone); }); } Err(e) => { @@ -153,4 +179,9 @@ pub fn main() -> Result<()> { } Ok(()) +} + +fn remove_client (id:usize,clients: &Arc>>) { + let mut list = clients.lock().unwrap(); + list.retain(|(cid, _)| *cid != id); } \ No newline at end of file