From 1a460470913f92139095417838160c6db9ae9fbc Mon Sep 17 00:00:00 2001 From: Fabian Vowie Date: Sun, 10 Oct 2021 14:24:24 +0200 Subject: [PATCH] Store websocket messages and send them with a incrementing id --- ferrum-websocket/src/lib.rs | 81 ++++++++++++++++++--------- ferrum-websocket/src/messages.rs | 17 ++++-- ferrum-websocket/src/server.rs | 7 +-- src/routes/channels/create_message.rs | 4 +- tests/api/channels/create_message.rs | 14 +++-- tests/api/helpers.rs | 12 ++-- tests/api/servers/create.rs | 4 +- tests/api/servers/create_channel.rs | 4 +- tests/api/servers/delete.rs | 4 +- tests/api/servers/join.rs | 8 +-- tests/api/servers/leave.rs | 6 +- tests/api/websockets.rs | 14 ++--- 12 files changed, 105 insertions(+), 70 deletions(-) diff --git a/ferrum-websocket/src/lib.rs b/ferrum-websocket/src/lib.rs index d1fea6f..151e3c1 100644 --- a/ferrum-websocket/src/lib.rs +++ b/ferrum-websocket/src/lib.rs @@ -3,22 +3,50 @@ pub mod messages; mod server; -use std::{collections::HashSet, iter::FromIterator}; +use std::{ + collections::{HashSet, VecDeque}, + iter::FromIterator, +}; use actix::{Actor, ActorContext, Addr, AsyncContext, Handler, StreamHandler}; use actix_web::{web, HttpRequest, HttpResponse}; use actix_web_actors::ws; use ferrum_shared::jwt::Jwt; -use messages::{IdentifyUser, SerializedWebSocketMessage, WebSocketClose, WebSocketMessage}; +use messages::{IdentifyUser, SerializedWebSocketMessage, WebSocketClose, WebSocketMessageType}; use uuid::Uuid; pub use server::WebSocketServer; +macro_rules! save_and_send_message { + ($self:expr, $ctx:expr, $message:expr) => { + let last_index = $self.messages.back().map(|x| x.0); + + let index = if let Some(index) = last_index { + $self.messages.push_back((index + 1, $message.clone())); + + index + 1 + } else { + $self.messages.push_back((0, $message.clone())); + + 0 + }; + + $ctx.text( + ::serde_json::to_string(&crate::messages::WebSocketMessage { + id: index, + payload: $message, + }) + .unwrap(), + ); + }; +} + pub struct WebSocketSession { pub user_id: Option, pub server: Addr, pub channels: HashSet, pub servers: HashSet, + pub messages: VecDeque<(u64, WebSocketMessageType)>, jwt: Jwt, } @@ -35,45 +63,42 @@ impl Handler for WebSocketSession { self.servers = HashSet::from_iter(servers.iter().cloned()); self.channels = HashSet::from_iter(channels.iter().cloned()); - ctx.text(serde_json::to_string(&WebSocketMessage::Ready).unwrap()); + save_and_send_message!(self, ctx, WebSocketMessageType::Ready); } - SerializedWebSocketMessage::Data(data, channel) => { + SerializedWebSocketMessage::Data(message, channel) => { if self.channels.contains(&channel) { - ctx.text(data); + save_and_send_message!(self, ctx, message); } } SerializedWebSocketMessage::AddChannel(channel) => { self.channels.insert(channel.id); - ctx.text(serde_json::to_string(&WebSocketMessage::NewChannel { channel }).unwrap()); + let message = WebSocketMessageType::NewChannel { channel }; + save_and_send_message!(self, ctx, message); } SerializedWebSocketMessage::AddServer(server, channels, users) => { self.servers.insert(server.id); self.channels.extend(channels.iter().map(|x| x.id)); - ctx.text( - serde_json::to_string(&WebSocketMessage::NewServer { - server, - channels, - users, - }) - .unwrap(), - ); + let message = WebSocketMessageType::NewServer { + server, + channels, + users, + }; + + save_and_send_message!(self, ctx, message); } SerializedWebSocketMessage::AddUser(server_id, user) => { if self.servers.contains(&server_id) == false { return; } - ctx.text( - serde_json::to_string(&WebSocketMessage::NewUser { server_id, user }).unwrap(), - ); + let message = WebSocketMessageType::NewUser { server_id, user }; + save_and_send_message!(self, ctx, message); } SerializedWebSocketMessage::DeleteUser(user_id, server_id) => { - ctx.text( - serde_json::to_string(&WebSocketMessage::DeleteUser { user_id, server_id }) - .unwrap(), - ); + let message = WebSocketMessageType::DeleteUser { user_id, server_id }; + save_and_send_message!(self, ctx, message); } SerializedWebSocketMessage::DeleteServer(server_id) => { if self.servers.contains(&server_id) == false { @@ -82,9 +107,8 @@ impl Handler for WebSocketSession { self.servers.remove(&server_id); - ctx.text( - serde_json::to_string(&WebSocketMessage::DeleteServer { server_id }).unwrap(), - ) + let message = WebSocketMessageType::DeleteServer { server_id }; + save_and_send_message!(self, ctx, message); } } } @@ -94,16 +118,16 @@ impl StreamHandler> for WebSocketSession fn handle(&mut self, item: Result, ctx: &mut Self::Context) { match item { Ok(ws::Message::Text(text)) => { - let message = match serde_json::from_str::(&text) { + let message = match serde_json::from_str::(&text) { Ok(value) => value, Err(_) => return, }; match message { - WebSocketMessage::Ping => { - ctx.text(serde_json::to_string(&WebSocketMessage::Pong).unwrap()); + WebSocketMessageType::Ping => { + ctx.text(serde_json::to_string(&WebSocketMessageType::Pong).unwrap()); } - WebSocketMessage::Identify { bearer } => { + WebSocketMessageType::Identify { bearer } => { let claims = match self.jwt.get_claims(&bearer) { Some(value) => value, None => return, @@ -145,6 +169,7 @@ pub async fn websocket( server: server.get_ref().clone(), channels: HashSet::new(), servers: HashSet::new(), + messages: VecDeque::new(), jwt: jwt.as_ref().clone(), }, &request, diff --git a/ferrum-websocket/src/messages.rs b/ferrum-websocket/src/messages.rs index ff576bd..2d385bc 100644 --- a/ferrum-websocket/src/messages.rs +++ b/ferrum-websocket/src/messages.rs @@ -6,10 +6,17 @@ use ferrum_shared::{ use serde::{Deserialize, Serialize}; use uuid::Uuid; -#[derive(Debug, Serialize, Deserialize, actix::prelude::Message)] +#[derive(Debug, Serialize, Deserialize)] +pub struct WebSocketMessage { + pub id: u64, + #[serde(flatten)] + pub payload: WebSocketMessageType, +} + +#[derive(Debug, Clone, Serialize, Deserialize, actix::prelude::Message)] #[rtype(result = "()")] #[serde(tag = "type", content = "payload")] -pub enum WebSocketMessage { +pub enum WebSocketMessageType { Empty, Ping, Pong, @@ -50,7 +57,7 @@ pub enum SerializedWebSocketMessage { AddUser(Uuid, UserResponse), DeleteServer(Uuid), DeleteUser(Uuid, Uuid), - Data(String, Uuid), + Data(WebSocketMessageType, Uuid), } #[derive(Debug, actix::prelude::Message)] @@ -82,11 +89,11 @@ pub struct ReadyUser { #[rtype(result = "()")] pub struct SendMessageToChannel { pub channel_id: Uuid, - pub message: WebSocketMessage, + pub message: WebSocketMessageType, } impl SendMessageToChannel { - pub fn new(channel_id: Uuid, message: WebSocketMessage) -> Self { + pub fn new(channel_id: Uuid, message: WebSocketMessageType) -> Self { Self { channel_id, message, diff --git a/ferrum-websocket/src/server.rs b/ferrum-websocket/src/server.rs index 915b044..cb7b833 100644 --- a/ferrum-websocket/src/server.rs +++ b/ferrum-websocket/src/server.rs @@ -14,7 +14,7 @@ use crate::messages::{DeleteServer, UserLeft}; use super::messages::{ IdentifyUser, NewChannel, NewServer, NewUser, SendMessageToChannel, SerializedWebSocketMessage, - WebSocketClose, WebSocketMessage, + WebSocketClose, WebSocketMessageType, }; pub struct WebSocketServer { @@ -30,9 +30,8 @@ impl WebSocketServer { } } - pub fn send_message_to_channel(&self, channel_id: Uuid, message: WebSocketMessage) { - let client_message = - SerializedWebSocketMessage::Data(serde_json::to_string(&message).unwrap(), channel_id); + pub fn send_message_to_channel(&self, channel_id: Uuid, message: WebSocketMessageType) { + let client_message = SerializedWebSocketMessage::Data(message, channel_id); for recipient in self.users.values() { recipient.do_send(client_message.clone()).expect(""); diff --git a/src/routes/channels/create_message.rs b/src/routes/channels/create_message.rs index 8371e8a..a1c1dea 100644 --- a/src/routes/channels/create_message.rs +++ b/src/routes/channels/create_message.rs @@ -14,7 +14,7 @@ use ferrum_db::{ pub use ferrum_shared::error_chain_fmt; use ferrum_shared::jwt::AuthorizationService; use ferrum_websocket::{ - messages::{self, WebSocketMessage}, + messages::{self, WebSocketMessageType}, WebSocketServer, }; use sqlx::PgPool; @@ -104,7 +104,7 @@ pub async fn create_message( server.do_send(messages::SendMessageToChannel::new( *channel_id, - WebSocketMessage::NewMessage { + WebSocketMessageType::NewMessage { message: message.to_response(user), }, )); diff --git a/tests/api/channels/create_message.rs b/tests/api/channels/create_message.rs index c23dbbe..0f0e240 100644 --- a/tests/api/channels/create_message.rs +++ b/tests/api/channels/create_message.rs @@ -1,5 +1,5 @@ use actix_http::{encoding::Decoder, Payload}; -use ferrum_websocket::messages::WebSocketMessage; +use ferrum_websocket::messages::WebSocketMessageType; use uuid::Uuid; use crate::{ @@ -244,10 +244,14 @@ async fn create_message_sends_websocket_message_to_ready_users() { .await; // Assert - assert_next_websocket_message!(WebSocketMessage::NewMessage { message }, &mut connection, { - assert_eq!("foobar", message.content); - assert_eq!(app.test_user().id, message.user.id); - }); + assert_next_websocket_message!( + WebSocketMessageType::NewMessage { message }, + &mut connection, + { + assert_eq!("foobar", message.content); + assert_eq!(app.test_user().id, message.user.id); + } + ); } #[ferrum_macros::test(strategy = "UserAndOwnServer")] diff --git a/tests/api/helpers.rs b/tests/api/helpers.rs index c81911e..8089309 100644 --- a/tests/api/helpers.rs +++ b/tests/api/helpers.rs @@ -9,7 +9,7 @@ use ferrum::{ telemetry::{get_subscriber, init_subscriber}, }; use ferrum_shared::jwt::Jwt; -use ferrum_websocket::messages::WebSocketMessage; +use ferrum_websocket::messages::WebSocketMessageType; use futures::{select, FutureExt, SinkExt, StreamExt}; use once_cell::sync::Lazy; use sqlx::{postgres::PgPoolOptions, types::Uuid, Connection, Executor, PgConnection, PgPool}; @@ -163,8 +163,8 @@ impl TestApplication { ) { let (response, mut connection) = self.websocket().await; - send_websocket_message(&mut connection, WebSocketMessage::Identify { bearer }).await; - assert_next_websocket_message!(WebSocketMessage::Ready, &mut connection, ()); + send_websocket_message(&mut connection, WebSocketMessageType::Identify { bearer }).await; + assert_next_websocket_message!(WebSocketMessageType::Ready, &mut connection, ()); (response, connection) } @@ -324,7 +324,7 @@ async fn configure_database(settings: &DatabaseSettings) -> PgPool { pub async fn get_next_websocket_message( connection: &mut actix_codec::Framed, actix_http::ws::Codec>, -) -> Option { +) -> Option { let mut message = connection.next().fuse(); let mut timeout = Box::pin(actix_rt::time::sleep(Duration::from_secs(2)).fuse()); @@ -334,7 +334,7 @@ pub async fn get_next_websocket_message( }; match x.unwrap().unwrap() { - ws::Frame::Text(text) => match serde_json::from_slice::(&text) { + ws::Frame::Text(text) => match serde_json::from_slice::(&text) { Ok(value) => Some(value), Err(_) => None, }, @@ -344,7 +344,7 @@ pub async fn get_next_websocket_message( pub async fn send_websocket_message( connection: &mut actix_codec::Framed, actix_http::ws::Codec>, - message: WebSocketMessage, + message: WebSocketMessageType, ) { connection .send(ws::Message::Text( diff --git a/tests/api/servers/create.rs b/tests/api/servers/create.rs index d5e9c8e..d0d75af 100644 --- a/tests/api/servers/create.rs +++ b/tests/api/servers/create.rs @@ -1,5 +1,5 @@ use actix_http::{encoding::Decoder, Payload}; -use ferrum_websocket::messages::WebSocketMessage; +use ferrum_websocket::messages::WebSocketMessageType; use crate::{assert_next_websocket_message, helpers::TestApplication}; @@ -185,7 +185,7 @@ async fn create_sends_new_server_to_owner_per_websocket() { // Assert assert_next_websocket_message!( - WebSocketMessage::NewServer { + WebSocketMessageType::NewServer { server: new_server, channels, users diff --git a/tests/api/servers/create_channel.rs b/tests/api/servers/create_channel.rs index 5b5c638..57756a3 100644 --- a/tests/api/servers/create_channel.rs +++ b/tests/api/servers/create_channel.rs @@ -1,5 +1,5 @@ use actix_http::{encoding::Decoder, Payload}; -use ferrum_websocket::messages::WebSocketMessage; +use ferrum_websocket::messages::WebSocketMessageType; use uuid::Uuid; use crate::{assert_next_websocket_message, helpers::TestApplication}; @@ -235,7 +235,7 @@ async fn create_channel_sends_new_channel_to_authenticated_websocket_users() { // Assert assert_next_websocket_message!( - WebSocketMessage::NewChannel { + WebSocketMessageType::NewChannel { channel: new_channel }, &mut connection, diff --git a/tests/api/servers/delete.rs b/tests/api/servers/delete.rs index 73b6dbd..9cbc130 100644 --- a/tests/api/servers/delete.rs +++ b/tests/api/servers/delete.rs @@ -1,5 +1,5 @@ use actix_http::{encoding::Decoder, Payload}; -use ferrum_websocket::messages::WebSocketMessage; +use ferrum_websocket::messages::WebSocketMessageType; use uuid::Uuid; use crate::{ @@ -134,7 +134,7 @@ async fn delete_server_sends_deleted_server_to_users_on_server() { // Assert assert_next_websocket_message!( - WebSocketMessage::DeleteServer { server_id }, + WebSocketMessageType::DeleteServer { server_id }, &mut connection, { assert_eq!(app.test_server().id, server_id); diff --git a/tests/api/servers/join.rs b/tests/api/servers/join.rs index 3c72a92..f455341 100644 --- a/tests/api/servers/join.rs +++ b/tests/api/servers/join.rs @@ -1,5 +1,5 @@ use actix_http::{encoding::Decoder, Payload}; -use ferrum_websocket::messages::WebSocketMessage; +use ferrum_websocket::messages::WebSocketMessageType; use crate::{ assert_next_websocket_message, assert_no_next_websocket_message, @@ -137,7 +137,7 @@ async fn join_sends_new_server_to_joining_user() { // Assert assert_next_websocket_message!( - WebSocketMessage::NewServer { + WebSocketMessageType::NewServer { server: new_server, channels, users @@ -167,7 +167,7 @@ async fn join_does_not_send_new_user_websocket_message_to_new_user() { // Assert assert_next_websocket_message!( - WebSocketMessage::NewServer { + WebSocketMessageType::NewServer { server: _, channels: _, users: _ @@ -196,7 +196,7 @@ async fn join_sends_new_user_to_existing_users() { // Assert assert_next_websocket_message!( - WebSocketMessage::NewUser { server_id: _, user }, + WebSocketMessageType::NewUser { server_id: _, user }, &mut connection, { assert_eq!(new_user.id, user.id); diff --git a/tests/api/servers/leave.rs b/tests/api/servers/leave.rs index f6bff8e..e00f98a 100644 --- a/tests/api/servers/leave.rs +++ b/tests/api/servers/leave.rs @@ -1,6 +1,6 @@ use actix_http::{encoding::Decoder, Payload}; use claim::assert_err; -use ferrum_websocket::messages::WebSocketMessage; +use ferrum_websocket::messages::WebSocketMessageType; use crate::{ assert_next_websocket_message, @@ -174,7 +174,7 @@ async fn leave_sends_delete_server_to_leaving_user() { // Assert assert_next_websocket_message!( - WebSocketMessage::DeleteServer { server_id }, + WebSocketMessageType::DeleteServer { server_id }, &mut connection, { assert_eq!(app.test_server().id, server_id); @@ -205,7 +205,7 @@ async fn leave_sends_delete_user_to_users_on_server() { // Assert assert_next_websocket_message!( - WebSocketMessage::DeleteUser { user_id, server_id }, + WebSocketMessageType::DeleteUser { user_id, server_id }, &mut connection, { assert_eq!(other_user.id, user_id); diff --git a/tests/api/websockets.rs b/tests/api/websockets.rs index 98891b9..ccd19e4 100644 --- a/tests/api/websockets.rs +++ b/tests/api/websockets.rs @@ -1,6 +1,6 @@ use actix_http::ws; use claim::assert_ok; -use ferrum_websocket::messages::WebSocketMessage; +use ferrum_websocket::messages::WebSocketMessageType; use futures::SinkExt; use crate::{ @@ -26,7 +26,7 @@ async fn websocket_closes_successfully() { send_websocket_message( &mut connection, - WebSocketMessage::Identify { + WebSocketMessageType::Identify { bearer: app.test_user_token(), }, ) @@ -51,14 +51,14 @@ async fn websocket_receives_ready_message_after_successfull_identify() { // Act send_websocket_message( &mut connection, - WebSocketMessage::Identify { + WebSocketMessageType::Identify { bearer: app.test_user_token(), }, ) .await; // Assert - assert_next_websocket_message!(WebSocketMessage::Ready, &mut connection, ()); + assert_next_websocket_message!(WebSocketMessageType::Ready, &mut connection, ()); } #[ferrum_macros::test(strategy = "User")] @@ -71,7 +71,7 @@ async fn websocket_does_not_receive_ready_message_after_missing_or_invalid_beare // Act send_websocket_message( &mut connection, - WebSocketMessage::Identify { bearer: token }, + WebSocketMessageType::Identify { bearer: token }, ) .await; @@ -86,10 +86,10 @@ async fn websocket_responds_to_ping_with_pong() { let (_response, mut connection) = app.websocket().await; // Act - send_websocket_message(&mut connection, WebSocketMessage::Ping).await; + send_websocket_message(&mut connection, WebSocketMessageType::Ping).await; // Assert - assert_next_websocket_message!(WebSocketMessage::Pong, &mut connection, ()); + assert_next_websocket_message!(WebSocketMessageType::Pong, &mut connection, ()); } #[ferrum_macros::test(strategy = "User")]