Skip to content
This repository has been archived by the owner on Jul 31, 2023. It is now read-only.

Commit

Permalink
Fix for async timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
danielvschoor committed Jul 2, 2022
1 parent 7acc7cf commit 0191145
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 57 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ shellexpand = "^2.1.0"
regex = "^1.4.3"
portpicker = { git = "https://github.com/aiarena/portpicker-rs" }
tempfile = "3.1.0"
crossbeam = "^0.8.0"
crossbeam = "^0.8.1"
serde = { version = "^1.0", features = ["derive"] }
serde_json = "^1.0"
bincode = { version = "^1.3.1", optional = true }
Expand Down
112 changes: 60 additions & 52 deletions src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use protobuf::Message;
use sc2_proto::{self, sc2api::RequestJoinGame};
use std::collections::HashMap;
use tokio::net::TcpStream;
use tokio::runtime::Runtime;
use tokio_tungstenite::tungstenite::error::Error;
use tokio_tungstenite::tungstenite::Message as TMessage;
use tokio_tungstenite::WebSocketStream;
Expand All @@ -29,7 +30,7 @@ pub enum SupervisorAction {
NoAction,
Received,
Config(String),
Ping,
Ping(Vec<u8>),
}

enum PlaylistAction {
Expand Down Expand Up @@ -111,11 +112,11 @@ impl Controller {
self.game = None;
self.connected_clients = 0;
}
pub async fn send_pong(&mut self) {
pub async fn send_pong(&mut self, payload: Vec<u8>) {
match &mut self.supervisor {
Some(sender) => {
sender
.send(TMessage::Pong(vec![0_u8]))
.send(TMessage::Pong(payload))
.await
.expect("Could not send message to supervisor");
}
Expand Down Expand Up @@ -495,66 +496,73 @@ pub enum RemoteUpdateStatus {
NoAction,
}

pub async fn create_supervisor_listener(
pub fn create_supervisor_listener(
mut client_recv: SplitStream<WebSocketStream<TcpStream>>,
sender: Sender<SupervisorAction>,
) {
tokio::spawn(async move {
loop {
if let Some(r_msg) = client_recv.next().await {
trace!("Message received from supervisor client");
match r_msg {
Ok(msg) => match msg {
TMessage::Text(data) => {
if data == "Reset" {
sender
.send(SupervisorAction::Quit)
.expect("Could not send SupervisorAction");
break;
} else if data == "Received" {
std::thread::spawn(move || {
let rt = Runtime::new().unwrap();
rt.block_on(async {
loop {
if let Some(r_msg) = client_recv.next().await {
trace!("Message received from supervisor client");
match r_msg {
Ok(msg) => match msg {
TMessage::Text(data) => {
if data == "Reset" {
sender
.send(SupervisorAction::Quit)
.expect("Could not send SupervisorAction");
break;
} else if data == "Received" {
sender
.send(SupervisorAction::Received)
.expect("Could not send SupervisorAction");
} else if data.contains("Map") || data.contains("map") {
sender
.send(SupervisorAction::Config(data))
.expect("Could not send config");
} else if data == "Quit" {
sender
.send(SupervisorAction::ForceQuit)
.expect("Could not send ForceQuit");
}
}
TMessage::Ping(payload) => {
sender
.send(SupervisorAction::Received)
.send(SupervisorAction::Ping(payload))
.expect("Could not send SupervisorAction");
} else if data.contains("Map") || data.contains("map") {
sender
.send(SupervisorAction::Config(data))
.expect("Could not send config");
} else if data == "Quit" {
sender
.send(SupervisorAction::ForceQuit)
.expect("Could not send ForceQuit");
}
_ => { }
},
Err(Error::AlreadyClosed) => {
error!("Supervisor Error::AlreadyClosed");
sender
.send(SupervisorAction::ForceQuit)
.expect("Could not send ForceQuit");
break;
}
TMessage::Ping(_) => {
Err(Error::Capacity(e)) => {
error!("{:?}", e);
sender
.send(SupervisorAction::Ping)
.expect("Could not send SupervisorAction");
.send(SupervisorAction::ForceQuit)
.expect("Could not send ForceQuit");
break;
}
Err(e) => {
error!("{:?}", e);
sender
.send(SupervisorAction::ForceQuit)
.expect("Could not send ForceQuit");
break;
}
_ => {}
},
Err(Error::AlreadyClosed) => {
error!("Supervisor Error::AlreadyClosed");
sender
.send(SupervisorAction::ForceQuit)
.expect("Could not send ForceQuit");
break;
}
Err(Error::Capacity(e)) => {
error!("{:?}", e);
sender
.send(SupervisorAction::ForceQuit)
.expect("Could not send ForceQuit");
break;
}
Err(e) => {
error!("{:?}", e);
sender
.send(SupervisorAction::ForceQuit)
.expect("Could not send ForceQuit");
break;
}
}
else{
break;
}
}
}
return;
});
});
}
2 changes: 1 addition & 1 deletion src/handler/player.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ impl Player {
let mut debug_response = Response::new();
debug_response.set_id(0);
debug_response.set_status(Status::in_game);
let timeout_secs = Duration::from_secs(40);
let timeout_secs = Duration::from_secs(config.max_frame_time as u64);
let replay_path = config.replay_path();
let mut start_timer = false;
let mut frame_time = 0_f32;
Expand Down
6 changes: 3 additions & 3 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl RustServer {
ClientType::Controller => {
let (ws_sender, ws_receiver) = client.stream.split();
controller.add_supervisor(ws_sender, sup_recv.to_owned());
create_supervisor_listener(ws_receiver, sup_send.to_owned()).await;
create_supervisor_listener(ws_receiver, sup_send.to_owned());
controller.send_message("{\"Status\": \"Connected\"}").await;
}
},
Expand All @@ -76,8 +76,8 @@ impl RustServer {
controller.send_message("{\"Config\": \"Received\"}").await;
}
SupervisorAction::ForceQuit => break,
SupervisorAction::Ping => {
controller.send_pong().await;
SupervisorAction::Ping(payload) => {
controller.send_pong(payload).await;
}
_ => {}
}
Expand Down

0 comments on commit 0191145

Please sign in to comment.