From 24ecea6170d77160f5cfa5357c9ec6064ce8c9cb Mon Sep 17 00:00:00 2001 From: Dario A Lencina-Talarico Date: Wed, 21 Jun 2023 11:09:01 -0400 Subject: [PATCH] refactor chat server join room method (#77) --- actix-api/src/actors/chat_server.rs | 94 ++++++++++++++++++++--------- 1 file changed, 67 insertions(+), 27 deletions(-) diff --git a/actix-api/src/actors/chat_server.rs b/actix-api/src/actors/chat_server.rs index 0ecf5e25..f5ce1a3f 100644 --- a/actix-api/src/actors/chat_server.rs +++ b/actix-api/src/actors/chat_server.rs @@ -12,7 +12,7 @@ use types::protos::media_packet::MediaPacket; use super::chat_session::{RoomId, SessionId}; pub struct ChatServer { - nc: nats::Connection, + nats_connection: nats::Connection, sessions: HashMap>, active_subs: HashMap, } @@ -24,7 +24,7 @@ impl ChatServer { .connect(std::env::var("NATS_URL").expect("NATS_URL env var must be defined")) .unwrap(); ChatServer { - nc, + nats_connection: nc, active_subs: HashMap::new(), sessions: HashMap::new(), } @@ -33,7 +33,7 @@ impl ChatServer { pub fn send_message(&self, room: &RoomId, message: Arc, session_id: SessionId) { let subject = format!("room.{}.{}", room, session_id); if let Ok(message) = message.write_to_bytes() { - match self.nc.publish(&subject, message) { + match self.nats_connection.publish(&subject, message) { Ok(_) => trace!("published message to {}", subject), Err(e) => error!("error publishing message to {}: {}", subject, e), } @@ -105,46 +105,86 @@ impl Handler for ChatServer { ) -> Self::Result { self.leave_rooms(&session); - let subject = format!("room.{}.*", room); - let queue = format!("{}-{}", session, room); - let session_recipient = self.sessions.get(&session).unwrap().clone(); - let room_clone = room.clone(); - let session_clone = session.clone(); - let sub = match self.nc.queue_subscribe(&subject, &queue) { - Ok(sub) => sub, - Err(e) => { - let err = format!("error subscribing to subject {}: {}", subject, e); + let (subject, queue) = build_subject_and_queue(&room, &session); + let session_recipient = match self.sessions.get(&session) { + Some(recipient) => recipient.clone(), + None => { + let err = format!("session {} is not connected", session); error!("{}", err); return MessageResult(Err(err)); } }; - let handler = sub.with_handler(move |msg| { - if msg.subject == format!("room.{}.{}", room_clone, session_clone) { - return Ok(()); - } - let msg = MediaPacket::parse_from_bytes(&msg.data).unwrap(); - let msg = Message { - nickname: Arc::new(Some(msg.email.clone())), - msg: Arc::new(msg), - }; - - session_recipient.try_send(msg).map_err(|e| { - error!("error sending message to session {}: {}", session_clone, e); - std::io::Error::new(std::io::ErrorKind::Other, e) - }) - }); + + let sub = match self + .nats_connection + .queue_subscribe(&subject, &queue) + .map_err(|e| handle_subscription_error(e, &subject)) + { + Ok(sub) => sub, + Err(e) => return MessageResult(Err(e)), + }; + + let handler = sub.with_handler(build_handler( + session_recipient, + room.clone(), + session.clone(), + )); + debug!("Subscribed to subject {} with queue {}", subject, queue); + let result = self .active_subs .insert(session.clone(), handler) .map(|_| ()) .ok_or("The session is already subscribed".into()); + info!( "someone connected to room {} with session {} result {:?}", room, session.trim(), result ); + MessageResult(result) } } + +fn build_subject_and_queue(room: &str, session: &str) -> (String, String) { + (format!("room.{}.*", room), format!("{}-{}", session, room)) +} + +fn handle_subscription_error(e: impl std::fmt::Display, subject: &str) -> String { + let err = format!("error subscribing to subject {}: {}", subject, e); + error!("{}", err); + err +} + +fn build_handler( + session_recipient: Recipient, // Assuming Recipient is a type + room: String, + session: String, +) -> impl Fn(nats::Message) -> Result<(), std::io::Error> { + move |msg| { + if msg.subject == format!("room.{}.{}", room, session) { + return Ok(()); + } + + let media_packet = match MediaPacket::parse_from_bytes(&msg.data) { + Ok(media_packet) => media_packet, + Err(e) => { + error!("error parsing message: {}", e); + return Err(std::io::Error::new(std::io::ErrorKind::Other, e)); + } + }; + + let message = Message { + nickname: Arc::new(Some(media_packet.email.clone())), + msg: Arc::new(media_packet), + }; + + session_recipient.try_send(message).map_err(|e| { + error!("error sending message to session {}: {}", session, e); + std::io::Error::new(std::io::ErrorKind::Other, e) + }) + } +}