Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 53 additions & 28 deletions ferrum-websocket/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,50 @@
pub mod messages;
mod server;

use std::{collections::HashSet, iter::FromIterator};
use std::{
collections::{HashSet, VecDeque},
iter::FromIterator,
};

use actix::{Actor, ActorContext, Addr, AsyncContext, Handler, StreamHandler};
use actix_web::{web, HttpRequest, HttpResponse};
use actix_web_actors::ws;
use ferrum_shared::jwt::Jwt;
use messages::{IdentifyUser, SerializedWebSocketMessage, WebSocketClose, WebSocketMessage};
use messages::{IdentifyUser, SerializedWebSocketMessage, WebSocketClose, WebSocketMessageType};
use uuid::Uuid;

pub use server::WebSocketServer;

macro_rules! save_and_send_message {
($self:expr, $ctx:expr, $message:expr) => {
let last_index = $self.messages.back().map(|x| x.0);

let index = if let Some(index) = last_index {
$self.messages.push_back((index + 1, $message.clone()));

index + 1
} else {
$self.messages.push_back((0, $message.clone()));

0
};

$ctx.text(
::serde_json::to_string(&crate::messages::WebSocketMessage {
id: index,
payload: $message,
})
.unwrap(),
);
};
}

pub struct WebSocketSession {
pub user_id: Option<Uuid>,
pub server: Addr<WebSocketServer>,
pub channels: HashSet<Uuid>,
pub servers: HashSet<Uuid>,
pub messages: VecDeque<(u64, WebSocketMessageType)>,
jwt: Jwt,
}

Expand All @@ -35,45 +63,42 @@ impl Handler<SerializedWebSocketMessage> for WebSocketSession {
self.servers = HashSet::from_iter(servers.iter().cloned());
self.channels = HashSet::from_iter(channels.iter().cloned());

ctx.text(serde_json::to_string(&WebSocketMessage::Ready).unwrap());
save_and_send_message!(self, ctx, WebSocketMessageType::Ready);
}
SerializedWebSocketMessage::Data(data, channel) => {
SerializedWebSocketMessage::Data(message, channel) => {
if self.channels.contains(&channel) {
ctx.text(data);
save_and_send_message!(self, ctx, message);
}
}
SerializedWebSocketMessage::AddChannel(channel) => {
self.channels.insert(channel.id);

ctx.text(serde_json::to_string(&WebSocketMessage::NewChannel { channel }).unwrap());
let message = WebSocketMessageType::NewChannel { channel };
save_and_send_message!(self, ctx, message);
}
SerializedWebSocketMessage::AddServer(server, channels, users) => {
self.servers.insert(server.id);
self.channels.extend(channels.iter().map(|x| x.id));

ctx.text(
serde_json::to_string(&WebSocketMessage::NewServer {
server,
channels,
users,
})
.unwrap(),
);
let message = WebSocketMessageType::NewServer {
server,
channels,
users,
};

save_and_send_message!(self, ctx, message);
}
SerializedWebSocketMessage::AddUser(server_id, user) => {
if self.servers.contains(&server_id) == false {
return;
}

ctx.text(
serde_json::to_string(&WebSocketMessage::NewUser { server_id, user }).unwrap(),
);
let message = WebSocketMessageType::NewUser { server_id, user };
save_and_send_message!(self, ctx, message);
}
SerializedWebSocketMessage::DeleteUser(user_id, server_id) => {
ctx.text(
serde_json::to_string(&WebSocketMessage::DeleteUser { user_id, server_id })
.unwrap(),
);
let message = WebSocketMessageType::DeleteUser { user_id, server_id };
save_and_send_message!(self, ctx, message);
}
SerializedWebSocketMessage::DeleteServer(server_id) => {
if self.servers.contains(&server_id) == false {
Expand All @@ -82,9 +107,8 @@ impl Handler<SerializedWebSocketMessage> for WebSocketSession {

self.servers.remove(&server_id);

ctx.text(
serde_json::to_string(&WebSocketMessage::DeleteServer { server_id }).unwrap(),
)
let message = WebSocketMessageType::DeleteServer { server_id };
save_and_send_message!(self, ctx, message);
}
}
}
Expand All @@ -94,16 +118,16 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WebSocketSession
fn handle(&mut self, item: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
match item {
Ok(ws::Message::Text(text)) => {
let message = match serde_json::from_str::<WebSocketMessage>(&text) {
let message = match serde_json::from_str::<WebSocketMessageType>(&text) {
Ok(value) => value,
Err(_) => return,
};

match message {
WebSocketMessage::Ping => {
ctx.text(serde_json::to_string(&WebSocketMessage::Pong).unwrap());
WebSocketMessageType::Ping => {
ctx.text(serde_json::to_string(&WebSocketMessageType::Pong).unwrap());
}
WebSocketMessage::Identify { bearer } => {
WebSocketMessageType::Identify { bearer } => {
let claims = match self.jwt.get_claims(&bearer) {
Some(value) => value,
None => return,
Expand Down Expand Up @@ -145,6 +169,7 @@ pub async fn websocket(
server: server.get_ref().clone(),
channels: HashSet::new(),
servers: HashSet::new(),
messages: VecDeque::new(),
jwt: jwt.as_ref().clone(),
},
&request,
Expand Down
17 changes: 12 additions & 5 deletions ferrum-websocket/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,17 @@ use ferrum_shared::{
use serde::{Deserialize, Serialize};
use uuid::Uuid;

#[derive(Debug, Serialize, Deserialize, actix::prelude::Message)]
#[derive(Debug, Serialize, Deserialize)]
pub struct WebSocketMessage {
pub id: u64,
#[serde(flatten)]
pub payload: WebSocketMessageType,
}

#[derive(Debug, Clone, Serialize, Deserialize, actix::prelude::Message)]
#[rtype(result = "()")]
#[serde(tag = "type", content = "payload")]
pub enum WebSocketMessage {
pub enum WebSocketMessageType {
Empty,
Ping,
Pong,
Expand Down Expand Up @@ -50,7 +57,7 @@ pub enum SerializedWebSocketMessage {
AddUser(Uuid, UserResponse),
DeleteServer(Uuid),
DeleteUser(Uuid, Uuid),
Data(String, Uuid),
Data(WebSocketMessageType, Uuid),
}

#[derive(Debug, actix::prelude::Message)]
Expand Down Expand Up @@ -82,11 +89,11 @@ pub struct ReadyUser {
#[rtype(result = "()")]
pub struct SendMessageToChannel {
pub channel_id: Uuid,
pub message: WebSocketMessage,
pub message: WebSocketMessageType,
}

impl SendMessageToChannel {
pub fn new(channel_id: Uuid, message: WebSocketMessage) -> Self {
pub fn new(channel_id: Uuid, message: WebSocketMessageType) -> Self {
Self {
channel_id,
message,
Expand Down
7 changes: 3 additions & 4 deletions ferrum-websocket/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::messages::{DeleteServer, UserLeft};

use super::messages::{
IdentifyUser, NewChannel, NewServer, NewUser, SendMessageToChannel, SerializedWebSocketMessage,
WebSocketClose, WebSocketMessage,
WebSocketClose, WebSocketMessageType,
};

pub struct WebSocketServer {
Expand All @@ -30,9 +30,8 @@ impl WebSocketServer {
}
}

pub fn send_message_to_channel(&self, channel_id: Uuid, message: WebSocketMessage) {
let client_message =
SerializedWebSocketMessage::Data(serde_json::to_string(&message).unwrap(), channel_id);
pub fn send_message_to_channel(&self, channel_id: Uuid, message: WebSocketMessageType) {
let client_message = SerializedWebSocketMessage::Data(message, channel_id);

for recipient in self.users.values() {
recipient.do_send(client_message.clone()).expect("");
Expand Down
4 changes: 2 additions & 2 deletions src/routes/channels/create_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use ferrum_db::{
pub use ferrum_shared::error_chain_fmt;
use ferrum_shared::jwt::AuthorizationService;
use ferrum_websocket::{
messages::{self, WebSocketMessage},
messages::{self, WebSocketMessageType},
WebSocketServer,
};
use sqlx::PgPool;
Expand Down Expand Up @@ -104,7 +104,7 @@ pub async fn create_message(

server.do_send(messages::SendMessageToChannel::new(
*channel_id,
WebSocketMessage::NewMessage {
WebSocketMessageType::NewMessage {
message: message.to_response(user),
},
));
Expand Down
14 changes: 9 additions & 5 deletions tests/api/channels/create_message.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use actix_http::{encoding::Decoder, Payload};
use ferrum_websocket::messages::WebSocketMessage;
use ferrum_websocket::messages::WebSocketMessageType;
use uuid::Uuid;

use crate::{
Expand Down Expand Up @@ -244,10 +244,14 @@ async fn create_message_sends_websocket_message_to_ready_users() {
.await;

// Assert
assert_next_websocket_message!(WebSocketMessage::NewMessage { message }, &mut connection, {
assert_eq!("foobar", message.content);
assert_eq!(app.test_user().id, message.user.id);
});
assert_next_websocket_message!(
WebSocketMessageType::NewMessage { message },
&mut connection,
{
assert_eq!("foobar", message.content);
assert_eq!(app.test_user().id, message.user.id);
}
);
}

#[ferrum_macros::test(strategy = "UserAndOwnServer")]
Expand Down
12 changes: 6 additions & 6 deletions tests/api/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use ferrum::{
telemetry::{get_subscriber, init_subscriber},
};
use ferrum_shared::jwt::Jwt;
use ferrum_websocket::messages::WebSocketMessage;
use ferrum_websocket::messages::WebSocketMessageType;
use futures::{select, FutureExt, SinkExt, StreamExt};
use once_cell::sync::Lazy;
use sqlx::{postgres::PgPoolOptions, types::Uuid, Connection, Executor, PgConnection, PgPool};
Expand Down Expand Up @@ -163,8 +163,8 @@ impl TestApplication {
) {
let (response, mut connection) = self.websocket().await;

send_websocket_message(&mut connection, WebSocketMessage::Identify { bearer }).await;
assert_next_websocket_message!(WebSocketMessage::Ready, &mut connection, ());
send_websocket_message(&mut connection, WebSocketMessageType::Identify { bearer }).await;
assert_next_websocket_message!(WebSocketMessageType::Ready, &mut connection, ());

(response, connection)
}
Expand Down Expand Up @@ -324,7 +324,7 @@ async fn configure_database(settings: &DatabaseSettings) -> PgPool {

pub async fn get_next_websocket_message(
connection: &mut actix_codec::Framed<Box<dyn ConnectionIo>, actix_http::ws::Codec>,
) -> Option<WebSocketMessage> {
) -> Option<WebSocketMessageType> {
let mut message = connection.next().fuse();
let mut timeout = Box::pin(actix_rt::time::sleep(Duration::from_secs(2)).fuse());

Expand All @@ -334,7 +334,7 @@ pub async fn get_next_websocket_message(
};

match x.unwrap().unwrap() {
ws::Frame::Text(text) => match serde_json::from_slice::<WebSocketMessage>(&text) {
ws::Frame::Text(text) => match serde_json::from_slice::<WebSocketMessageType>(&text) {
Ok(value) => Some(value),
Err(_) => None,
},
Expand All @@ -344,7 +344,7 @@ pub async fn get_next_websocket_message(

pub async fn send_websocket_message(
connection: &mut actix_codec::Framed<Box<dyn ConnectionIo>, actix_http::ws::Codec>,
message: WebSocketMessage,
message: WebSocketMessageType,
) {
connection
.send(ws::Message::Text(
Expand Down
4 changes: 2 additions & 2 deletions tests/api/servers/create.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use actix_http::{encoding::Decoder, Payload};
use ferrum_websocket::messages::WebSocketMessage;
use ferrum_websocket::messages::WebSocketMessageType;

use crate::{assert_next_websocket_message, helpers::TestApplication};

Expand Down Expand Up @@ -185,7 +185,7 @@ async fn create_sends_new_server_to_owner_per_websocket() {

// Assert
assert_next_websocket_message!(
WebSocketMessage::NewServer {
WebSocketMessageType::NewServer {
server: new_server,
channels,
users
Expand Down
4 changes: 2 additions & 2 deletions tests/api/servers/create_channel.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use actix_http::{encoding::Decoder, Payload};
use ferrum_websocket::messages::WebSocketMessage;
use ferrum_websocket::messages::WebSocketMessageType;
use uuid::Uuid;

use crate::{assert_next_websocket_message, helpers::TestApplication};
Expand Down Expand Up @@ -235,7 +235,7 @@ async fn create_channel_sends_new_channel_to_authenticated_websocket_users() {

// Assert
assert_next_websocket_message!(
WebSocketMessage::NewChannel {
WebSocketMessageType::NewChannel {
channel: new_channel
},
&mut connection,
Expand Down
4 changes: 2 additions & 2 deletions tests/api/servers/delete.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use actix_http::{encoding::Decoder, Payload};
use ferrum_websocket::messages::WebSocketMessage;
use ferrum_websocket::messages::WebSocketMessageType;
use uuid::Uuid;

use crate::{
Expand Down Expand Up @@ -134,7 +134,7 @@ async fn delete_server_sends_deleted_server_to_users_on_server() {

// Assert
assert_next_websocket_message!(
WebSocketMessage::DeleteServer { server_id },
WebSocketMessageType::DeleteServer { server_id },
&mut connection,
{
assert_eq!(app.test_server().id, server_id);
Expand Down
8 changes: 4 additions & 4 deletions tests/api/servers/join.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use actix_http::{encoding::Decoder, Payload};
use ferrum_websocket::messages::WebSocketMessage;
use ferrum_websocket::messages::WebSocketMessageType;

use crate::{
assert_next_websocket_message, assert_no_next_websocket_message,
Expand Down Expand Up @@ -137,7 +137,7 @@ async fn join_sends_new_server_to_joining_user() {

// Assert
assert_next_websocket_message!(
WebSocketMessage::NewServer {
WebSocketMessageType::NewServer {
server: new_server,
channels,
users
Expand Down Expand Up @@ -167,7 +167,7 @@ async fn join_does_not_send_new_user_websocket_message_to_new_user() {

// Assert
assert_next_websocket_message!(
WebSocketMessage::NewServer {
WebSocketMessageType::NewServer {
server: _,
channels: _,
users: _
Expand Down Expand Up @@ -196,7 +196,7 @@ async fn join_sends_new_user_to_existing_users() {

// Assert
assert_next_websocket_message!(
WebSocketMessage::NewUser { server_id: _, user },
WebSocketMessageType::NewUser { server_id: _, user },
&mut connection,
{
assert_eq!(new_user.id, user.id);
Expand Down
Loading