Skip to content

Commit

Permalink
More websocket progress
Browse files Browse the repository at this point in the history
  • Loading branch information
connorslade committed May 8, 2023
1 parent 40bcb1d commit c5307ed
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 15 deletions.
9 changes: 6 additions & 3 deletions examples/tmp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,13 @@ fn main() {
server.route(Method::GET, "/ws", |req| {
let stream = req.ws().unwrap();

stream.send("ello world");
thread::park();
loop {
println!("Sending...");
stream.send("hello world");
thread::sleep(Duration::from_secs(5));
}

Response::end()
// Response::end()
});

server.route(Method::GET, "/sse", |req| {
Expand Down
17 changes: 9 additions & 8 deletions lib/http/web_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,16 @@ impl WebSocketStream {
let (s2c, rx) = mpsc::sync_channel::<TxType>(10);
let (_tx, c2s) = mpsc::sync_channel::<TxType>(10);
let (s2c, c2s) = (Arc::new(s2c), Arc::new(c2s));

let socket = req.socket.clone();
let this_s2c = s2c.clone();

let socket = req.socket.force_lock();
let mut read_socket = socket.try_clone().unwrap();
let mut write_socket = socket.try_clone().unwrap();
drop(socket);
thread::spawn(move || {
let mut socket = socket.force_lock();
let mut buf = [0u8; 1024];
loop {
let len = socket.read(&mut buf).unwrap();
let len = read_socket.read(&mut buf).unwrap();
if len == 0 {
break;
}
Expand Down Expand Up @@ -115,7 +117,6 @@ impl WebSocketStream {
}
});

let socket = req.socket.clone();
thread::spawn(move || {
//todo
for i in rx {
Expand All @@ -125,7 +126,7 @@ impl WebSocketStream {
TxType::Text(s) => Frame::text(s),
TxType::Binary(b) => Frame::binary(b),
}
.write(socket.clone())
.write(&mut write_socket)
.unwrap();
trace!(Level::Debug, "WS: Sent :p");
}
Expand Down Expand Up @@ -256,11 +257,11 @@ impl Frame {
buf
}

fn write(&self, socket: Arc<Mutex<TcpStream>>) -> io::Result<()> {
fn write(&self, socket: &mut TcpStream) -> io::Result<()> {
let buf = self.to_bytes();
trace!(Level::Debug, "WS: Writing: {:?}", buf);

socket.force_lock().write_all(&buf)?;
socket.write_all(&buf)?;
Ok(())
}

Expand Down
4 changes: 2 additions & 2 deletions lib/internal/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{

use crate::{
error::{HandleError, ParseError, Result, StreamError},
internal::common::{any_string, ForceLock},
internal::common::any_string,
middleware::MiddleResult,
response::ResponseFlag,
route::RouteType,
Expand Down Expand Up @@ -71,7 +71,7 @@ where

if !keep_alive || res.flag == ResponseFlag::Close || !this.keep_alive {
trace!(Level::Debug, "Closing socket");
if let Err(e) = stream.force_lock().shutdown(Shutdown::Both) {
if let Err(e) = stream.lock().unwrap().shutdown(Shutdown::Both) {
trace!(Level::Debug, "Error closing socket: {:?}", e);
}
break;
Expand Down
3 changes: 1 addition & 2 deletions lib/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::sync::{Arc, Mutex};
use crate::consts;
use crate::header::{HeaderType, Headers};
use crate::http::status::Status;
use crate::internal::common::ForceLock;
use crate::{
error::Result, header::headers_to_string, internal::handle::Writeable, Content, Header,
SetCookie,
Expand Down Expand Up @@ -319,7 +318,7 @@ impl Response {
headers_to_string(&self.headers)
);

let mut stream = stream.force_lock();
let mut stream = stream.lock().unwrap();
stream.write_all(response.as_bytes())?;
self.data.write(&mut stream)?;

Expand Down

0 comments on commit c5307ed

Please sign in to comment.