From 6a4ec7bdbc969cbb6f4fcfe23681a20188f32e86 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Thu, 12 Sep 2024 14:24:41 -0400 Subject: [PATCH] refactor: use ractor --- Cargo.lock | 44 ++++++++++-- websockets/echo/Cargo.toml | 4 +- websockets/echo/src/main.rs | 18 ++++- websockets/echo/src/server.rs | 124 +++++++++++++++++++--------------- 4 files changed, 126 insertions(+), 64 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f569eb8a..e0cd6377 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1008,7 +1008,7 @@ dependencies = [ "proc-macro-crate 1.3.1", "proc-macro2", "quote", - "strum", + "strum 0.25.0", "syn 2.0.77", "thiserror", ] @@ -6070,6 +6070,20 @@ dependencies = [ "r2d2", ] +[[package]] +name = "ractor" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11e9a53fcabb680bb70a3ce495e744676ee7e1800a188e01a68d18916a1b48e1" +dependencies = [ + "dashmap", + "futures", + "once_cell", + "strum 0.26.3", + "tokio", + "tracing", +] + [[package]] name = "radium" version = "0.7.0" @@ -7484,7 +7498,16 @@ version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125" dependencies = [ - "strum_macros", + "strum_macros 0.25.3", +] + +[[package]] +name = "strum" +version = "0.26.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06" +dependencies = [ + "strum_macros 0.26.4", ] [[package]] @@ -7500,6 +7523,19 @@ dependencies = [ "syn 2.0.77", ] +[[package]] +name = "strum_macros" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be" +dependencies = [ + "heck 0.5.0", + "proc-macro2", + "quote", + "rustversion", + "syn 2.0.77", +] + [[package]] name = "subprocess" version = "0.2.9" @@ -8884,14 +8920,14 @@ dependencies = [ name = "websocket" version = "1.0.0" dependencies = [ - "actix", "actix-files", "actix-web", - "actix-web-actors", + "actix-ws", "awc", "env_logger", "futures-util", "log", + "ractor", "tokio", "tokio-stream", ] diff --git a/websockets/echo/Cargo.toml b/websockets/echo/Cargo.toml index e6491456..a8aa22cb 100644 --- a/websockets/echo/Cargo.toml +++ b/websockets/echo/Cargo.toml @@ -12,14 +12,14 @@ name = "websocket-client" path = "src/client.rs" [dependencies] -actix.workspace = true actix-files.workspace = true actix-web.workspace = true -actix-web-actors.workspace = true +actix-ws.workspace = true awc.workspace = true env_logger.workspace = true futures-util = { workspace = true, features = ["sink"] } log.workspace = true +ractor = { version = "0.10", default-features = false } tokio = { workspace = true, features = ["full"] } tokio-stream.workspace = true diff --git a/websockets/echo/src/main.rs b/websockets/echo/src/main.rs index 7a6d656a..a4817b09 100644 --- a/websockets/echo/src/main.rs +++ b/websockets/echo/src/main.rs @@ -4,10 +4,10 @@ use actix_files::NamedFile; use actix_web::{middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer, Responder}; -use actix_web_actors::ws; +use ractor::Actor; mod server; -use self::server::MyWebSocket; +use self::server::{AsMessage, MyWebSocket}; async fn index() -> impl Responder { NamedFile::open_async("./static/index.html").await.unwrap() @@ -15,7 +15,19 @@ async fn index() -> impl Responder { /// WebSocket handshake and start `MyWebSocket` actor. async fn echo_ws(req: HttpRequest, stream: web::Payload) -> Result { - ws::start(MyWebSocket::new(), &req, stream) + let (res, session, stream) = actix_ws::handle(&req, stream)?; + + let (actor, _handle) = Actor::spawn(None, MyWebSocket, session).await.unwrap(); + + actix_web::rt::spawn(async move { + let mut stream = stream.aggregate_continuations(); + + while let Some(Ok(msg)) = stream.recv().await { + actor.send_message(AsMessage::Ws(msg)).unwrap(); + } + }); + + Ok(res) } // the actor-based WebSocket examples REQUIRE `actix_web::main` for actor support diff --git a/websockets/echo/src/server.rs b/websockets/echo/src/server.rs index e262eb77..9526a9fd 100644 --- a/websockets/echo/src/server.rs +++ b/websockets/echo/src/server.rs @@ -1,8 +1,7 @@ +use actix_ws::AggregatedMessage; +use ractor::{ActorProcessingErr, ActorRef}; use std::time::{Duration, Instant}; -use actix::prelude::*; -use actix_web_actors::ws; - /// How often heartbeat pings are sent const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); @@ -11,68 +10,83 @@ const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); /// websocket connection is long running connection, it easier /// to handle with an actor -pub struct MyWebSocket { - /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT), - /// otherwise we drop connection. - hb: Instant, +pub(crate) struct MyWebSocket; + +#[derive(Debug)] +pub(crate) enum AsMessage { + Ws(actix_ws::AggregatedMessage), + Hb, } -impl MyWebSocket { - pub fn new() -> Self { - Self { hb: Instant::now() } - } +impl ractor::Actor for MyWebSocket { + type Msg = AsMessage; + type State = (Instant, Option); + type Arguments = actix_ws::Session; - /// helper method that sends ping to client every 5 seconds (HEARTBEAT_INTERVAL). - /// - /// also this method checks heartbeats from client - fn hb(&self, ctx: &mut ::Context) { - ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { - // check client heartbeats - if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { - // heartbeat timed out - println!("Websocket Client heartbeat failed, disconnecting!"); - - // stop actor - ctx.stop(); - - // don't try to send a ping - return; - } + async fn pre_start( + &self, + myself: ActorRef, + session: Self::Arguments, + ) -> Result { + myself.send_interval(HEARTBEAT_INTERVAL, || AsMessage::Hb); - ctx.ping(b""); - }); + Ok((Instant::now(), Some(session))) } -} -impl Actor for MyWebSocket { - type Context = ws::WebsocketContext; + async fn handle( + &self, + myself: ActorRef, + message: Self::Msg, + state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + match message { + AsMessage::Hb => { + // check client heartbeats + if Instant::now().duration_since(state.0) > CLIENT_TIMEOUT { + // heartbeat timed out + println!("Websocket Client heartbeat failed, disconnecting!"); - /// Method is called on actor start. We start the heartbeat process here. - fn started(&mut self, ctx: &mut Self::Context) { - self.hb(ctx); - } -} + let _ = state.1.take().unwrap().close(None).await; -/// Handler for `ws::Message` -impl StreamHandler> for MyWebSocket { - fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { - // process websocket messages - println!("WS: {msg:?}"); - match msg { - Ok(ws::Message::Ping(msg)) => { - self.hb = Instant::now(); - ctx.pong(&msg); - } - Ok(ws::Message::Pong(_)) => { - self.hb = Instant::now(); + // stop actor + myself.stop(None); + + // don't try to send a ping + } else { + state.1.as_mut().unwrap().ping(b"").await.unwrap(); + } } - Ok(ws::Message::Text(text)) => ctx.text(text), - Ok(ws::Message::Binary(bin)) => ctx.binary(bin), - Ok(ws::Message::Close(reason)) => { - ctx.close(reason); - ctx.stop(); + + AsMessage::Ws(msg) => { + // process websocket messages + println!("WS: {msg:?}"); + + match msg { + AggregatedMessage::Ping(msg) => { + state.0 = Instant::now(); + state.1.as_mut().unwrap().pong(&msg).await.unwrap(); + } + + AggregatedMessage::Pong(_) => { + state.0 = Instant::now(); + } + + AggregatedMessage::Text(text) => { + state.1.as_mut().unwrap().text(text).await.unwrap() + } + + AggregatedMessage::Binary(bin) => { + state.1.as_mut().unwrap().binary(bin).await.unwrap() + } + + AggregatedMessage::Close(reason) => { + let _ = state.1.take().unwrap().close(reason).await; + myself.stop(None); + } + } } - _ => ctx.stop(), } + + Ok(()) } }