-
-
Notifications
You must be signed in to change notification settings - Fork 808
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
4 changed files
with
143 additions
and
60 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,78 +1,113 @@ | ||
use actix_ws::AggregatedMessage; | ||
use ractor::{ActorProcessingErr, ActorRef}; | ||
use std::time::{Duration, Instant}; | ||
|
||
use actix::prelude::*; | ||
use actix_web_actors::ws; | ||
|
||
/// How often heartbeat pings are sent | ||
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); | ||
|
||
/// How long before lack of client response causes a timeout | ||
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); | ||
|
||
#[derive(Debug)] | ||
pub(crate) enum AsMessage { | ||
Ws(actix_ws::AggregatedMessage), | ||
Hb, | ||
} | ||
|
||
/// websocket connection is long running connection, it easier | ||
/// to handle with an actor | ||
pub struct MyWebSocket { | ||
/// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT), | ||
/// otherwise we drop connection. | ||
hb: Instant, | ||
} | ||
pub(crate) struct MyWebSocket; | ||
|
||
impl MyWebSocket { | ||
pub fn new() -> Self { | ||
Self { hb: Instant::now() } | ||
async fn handle_hb( | ||
&self, | ||
state: &mut (Instant, Option<actix_ws::Session>), | ||
myself: &ActorRef<AsMessage>, | ||
) -> Result<(), ActorProcessingErr> { | ||
if Instant::now().duration_since(state.0) > CLIENT_TIMEOUT { | ||
// heartbeat timed out | ||
println!("Websocket Client heartbeat failed, disconnecting!"); | ||
|
||
let _ = state.1.take().unwrap().close(None).await; | ||
|
||
// stop actor | ||
myself.stop(None); | ||
|
||
// don't try to send a ping | ||
} else { | ||
state.1.as_mut().unwrap().ping(b"").await?; | ||
}; | ||
|
||
Ok(()) | ||
} | ||
|
||
/// helper method that sends ping to client every 5 seconds (HEARTBEAT_INTERVAL). | ||
/// | ||
/// also this method checks heartbeats from client | ||
fn hb(&self, ctx: &mut <Self as Actor>::Context) { | ||
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { | ||
// check client heartbeats | ||
if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { | ||
// heartbeat timed out | ||
println!("Websocket Client heartbeat failed, disconnecting!"); | ||
|
||
// stop actor | ||
ctx.stop(); | ||
|
||
// don't try to send a ping | ||
return; | ||
async fn handle_ws_msg( | ||
&self, | ||
msg: AggregatedMessage, | ||
state: &mut (Instant, Option<actix_ws::Session>), | ||
myself: ActorRef<AsMessage>, | ||
) -> Result<(), ActorProcessingErr> { | ||
println!("WS: {msg:?}"); | ||
|
||
match msg { | ||
AggregatedMessage::Ping(msg) => { | ||
state.0 = Instant::now(); | ||
state.1.as_mut().unwrap().pong(&msg).await?; | ||
} | ||
|
||
AggregatedMessage::Pong(_) => { | ||
state.0 = Instant::now(); | ||
} | ||
|
||
ctx.ping(b""); | ||
}); | ||
AggregatedMessage::Text(text) => { | ||
state.1.as_mut().unwrap().text(text).await?; | ||
} | ||
|
||
AggregatedMessage::Binary(bin) => { | ||
state.1.as_mut().unwrap().binary(bin).await?; | ||
} | ||
|
||
AggregatedMessage::Close(reason) => { | ||
let _ = state.1.take().unwrap().close(reason).await; | ||
myself.stop(None); | ||
} | ||
}; | ||
|
||
Ok(()) | ||
} | ||
} | ||
|
||
impl Actor for MyWebSocket { | ||
type Context = ws::WebsocketContext<Self>; | ||
impl ractor::Actor for MyWebSocket { | ||
type Msg = AsMessage; | ||
type State = (Instant, Option<actix_ws::Session>); | ||
type Arguments = actix_ws::Session; | ||
|
||
async fn pre_start( | ||
&self, | ||
myself: ActorRef<Self::Msg>, | ||
session: Self::Arguments, | ||
) -> Result<Self::State, ActorProcessingErr> { | ||
myself.send_interval(HEARTBEAT_INTERVAL, || AsMessage::Hb); | ||
|
||
/// Method is called on actor start. We start the heartbeat process here. | ||
fn started(&mut self, ctx: &mut Self::Context) { | ||
self.hb(ctx); | ||
Ok((Instant::now(), Some(session))) | ||
} | ||
} | ||
|
||
/// Handler for `ws::Message` | ||
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWebSocket { | ||
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) { | ||
// process websocket messages | ||
println!("WS: {msg:?}"); | ||
match msg { | ||
Ok(ws::Message::Ping(msg)) => { | ||
self.hb = Instant::now(); | ||
ctx.pong(&msg); | ||
async fn handle( | ||
&self, | ||
myself: ActorRef<Self::Msg>, | ||
message: Self::Msg, | ||
state: &mut Self::State, | ||
) -> Result<(), ActorProcessingErr> { | ||
match message { | ||
AsMessage::Hb => { | ||
self.handle_hb(state, &myself).await?; | ||
} | ||
Ok(ws::Message::Pong(_)) => { | ||
self.hb = Instant::now(); | ||
} | ||
Ok(ws::Message::Text(text)) => ctx.text(text), | ||
Ok(ws::Message::Binary(bin)) => ctx.binary(bin), | ||
Ok(ws::Message::Close(reason)) => { | ||
ctx.close(reason); | ||
ctx.stop(); | ||
|
||
AsMessage::Ws(msg) => { | ||
self.handle_ws_msg(msg, state, myself).await?; | ||
} | ||
_ => ctx.stop(), | ||
} | ||
|
||
Ok(()) | ||
} | ||
} |