Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 46 additions & 15 deletions ws_server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,27 @@ use base64::{Engine as _, engine::general_purpose::STANDARD};
use sha1::{Digest, Sha1};
use std::io::{Read, Result, Write};
use std::net::{TcpListener, TcpStream};
use std::sync::{Arc, Mutex};
use std::thread;

use ws_core::read::read_header;
use ws_core::write::send_server_message;


pub fn handle_client(mut stream: TcpStream) -> Result<()> {
pub fn handle_client(mut stream: TcpStream, clients: Arc<Mutex<Vec<(usize,TcpStream)>>>) -> Result<()> {
{
let mut list = clients.lock().unwrap();
let id = stream.peer_addr().unwrap().port() as usize;
list.push((id,stream.try_clone()?));
}
// these variable are here to read all the buffer sent from the client
let mut buffer = Vec::new();
let mut temp = [0u8;1024];
let mut temp = [0u8; 1024];

loop{
loop {
let bytes_read = stream.read(&mut temp).expect("failed to read stream");
//break the connection here if no bytes read
if bytes_read == 0{
if bytes_read == 0 {
break;
}

Expand All @@ -28,7 +34,7 @@ pub fn handle_client(mut stream: TcpStream) -> Result<()> {
// CR = Carriage Line = \r = ASCII 13
// LF = Line feed = \n = ASCII 10
// in HTTP protocol this is considered to use when we have to end any message.
if buffer.windows(4).any(|w| w == b"\r\n\r\n"){
if buffer.windows(4).any(|w| w == b"\r\n\r\n") {
break;
}
}
Expand All @@ -37,7 +43,10 @@ pub fn handle_client(mut stream: TcpStream) -> Result<()> {
let request = String::from_utf8_lossy(&buffer);

//check if the request contains the upgrade websocket
if request.contains("Upgrade: websocket") && request.contains("Sec-WebSocket-Key") && request.contains("Sec-WebSocket-Version") {
if request.contains("Upgrade: websocket")
&& request.contains("Sec-WebSocket-Key")
&& request.contains("Sec-WebSocket-Version")
{
let mut websocket_key = String::new();
//get the key to upgrade the protocol
for line in request.lines() {
Expand Down Expand Up @@ -81,8 +90,8 @@ pub fn handle_client(mut stream: TcpStream) -> Result<()> {

//close frame from the client
if cont_opcode == 0b0000_1000 {
let byte1 = fin_code | 0b0000_1000;
send_server_message(&mut stream, byte1, &frame.decoded_data)?;
let id = stream.peer_addr().unwrap().port() as usize;
remove_client(id, &clients);
return Ok(());
}

Expand Down Expand Up @@ -111,22 +120,38 @@ pub fn handle_client(mut stream: TcpStream) -> Result<()> {
}
}

let snapshot = {
let list = clients.lock().unwrap();
list.iter()
.map(|(cid,c)| (*cid,c.try_clone().unwrap()))
.collect::<Vec<_>>()
};

//handle the TEXT and BINARY opcode types
//TEXT type opcode = 1 (0b0000_0001)
//BINARY type opcode = 2 (0b0000_0010)
if opcode == 0b0000_0001 {
if let Ok(text) = String::from_utf8(final_message.clone()){
println!("Text recieved : {}",text);
if let Ok(text) = String::from_utf8(final_message.clone()) {
println!("Text recieved : {}", text);

let byte1 = fin_code | opcode;
send_server_message(&mut stream, byte1, text.as_bytes())?;
for (cid,mut c) in snapshot {
if let Err(_) = send_server_message(&mut c, byte1, text.as_bytes()) {
remove_client(cid, &clients);
}
}

continue;
}
} else {
println!("Recieved {} of binary message",final_message.len());
println!("Recieved {} of binary message", final_message.len());

let byte1 = fin_code | opcode;
send_server_message(&mut stream, byte1, &final_message)?;
for (cid,mut c) in snapshot {
if let Err(_) = send_server_message(&mut c, byte1, &final_message) {
remove_client(cid, &clients);
}
}
continue;
}
}
Expand All @@ -136,14 +161,15 @@ pub fn handle_client(mut stream: TcpStream) -> Result<()> {

pub fn main() -> Result<()> {
let listener = TcpListener::bind("127.0.0.1:8080")?;
let clients = Arc::new(Mutex::new(Vec::new()));

for stream in listener.incoming() {
match stream {
Ok(stream) => {
println!("Connection Established");

let clients_clone = Arc::clone(&clients);
thread::spawn(move || {
let _ = handle_client(stream);
let _ = handle_client(stream, clients_clone);
});
}
Err(e) => {
Expand All @@ -153,4 +179,9 @@ pub fn main() -> Result<()> {
}

Ok(())
}

fn remove_client (id:usize,clients: &Arc<Mutex<Vec<(usize,TcpStream)>>>) {
let mut list = clients.lock().unwrap();
list.retain(|(cid, _)| *cid != id);
}