Skip to content

Commit

Permalink
feat: support multiple voice nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
Zomatree committed Feb 17, 2025
1 parent 060b4c4 commit 367ac88
Show file tree
Hide file tree
Showing 20 changed files with 209 additions and 83 deletions.
19 changes: 15 additions & 4 deletions Revolt.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@ api = "http://local.revolt.chat:14702"
events = "ws://local.revolt.chat:14703"
autumn = "http://local.revolt.chat:14704"
january = "http://local.revolt.chat:14705"
livekit = "ws://local.revolt.chat:14706"
voso_legacy = ""
voso_legacy_ws = ""

# Public urls for livekit nodes
# each entry here should have a corresponding entry under `api.livekit.nodes`
[hosts.livekit]
worldwide = "ws://local.revolt.chat:14706"

[api]

[api.smtp]
Expand All @@ -38,9 +42,16 @@ port = 14025
use_tls = false

[api.livekit]
url = "ws://local.revolt.chat:14706"
key = ""
secret = ""

# Config for livekit nodes
# Make sure to change the secret when deploying
# The key and secret should match the values livekit is using
[api.livekit.nodes.worldwide]
url = "http://livekit"
lat = 0.0
lon = 0.0
key = "worldwide_key"
secret = "ZjCofRlfm6GGtjlifmNpCDkcQbEIIVC0"

[files.s3]
# S3 protocol endpoint
Expand Down
10 changes: 3 additions & 7 deletions crates/core/config/Revolt.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ api = "http://local.revolt.chat/api"
events = "ws://local.revolt.chat/ws"
autumn = "http://local.revolt.chat/autumn"
january = "http://local.revolt.chat/january"
livekit = "ws://local.revolt.chat/livekit"
voso_legacy = ""
voso_legacy_ws = ""

[hosts.livekit]

[rabbit]
host = "rabbit"
port = 5672
Expand Down Expand Up @@ -64,13 +65,8 @@ hcaptcha_sitekey = ""
max_concurrent_connections = 50

[api.livekit]
# Livekit server url
url = "ws://local.revolt.chat/livekit"
# Livekit security key name
key = ""
# Livekit security secret value
secret = ""

[api.livekit.nodes]

[pushd]
# this changes the names of the queues to not overlap
Expand Down
9 changes: 8 additions & 1 deletion crates/core/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ pub struct Hosts {
pub events: String,
pub autumn: String,
pub january: String,
pub livekit: String
pub livekit: HashMap<String, String>
}

#[derive(Deserialize, Debug, Clone)]
Expand Down Expand Up @@ -176,7 +176,14 @@ pub struct ApiWorkers {

#[derive(Deserialize, Debug, Clone)]
pub struct ApiLiveKit {
pub nodes: HashMap<String, LiveKitNode>
}

#[derive(Deserialize, Debug, Clone)]
pub struct LiveKitNode {
pub url: String,
pub lat: f64,
pub lon: f64,
pub key: String,
pub secret: String
}
Expand Down
38 changes: 30 additions & 8 deletions crates/core/database/src/voice/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,24 @@ pub async fn raise_if_in_voice(user: &User, target: &str) -> Result<()> {
}
}

pub async fn set_channel_node(channel: &str, node: &str) -> Result<()> {
get_connection()
.await?
.set(format!("node:{channel}"), node)
.await
.to_internal_error()?;

Ok(())
}

pub async fn get_channel_node(channel: &str) -> Result<String> {
get_connection()
.await?
.get(format!("node:{channel}"))
.await
.to_internal_error()
}

pub async fn get_user_voice_channels(user_id: &str) -> Result<Vec<String>> {
get_connection()
.await?
Expand Down Expand Up @@ -207,10 +225,10 @@ pub async fn update_voice_state(
Ok(())
}

pub async fn get_voice_channel_members(channel_id: &str) -> Result<Vec<String>> {
pub async fn get_voice_channel_members(channel_id: &str) -> Result<Option<Vec<String>>> {
get_connection()
.await?
.smembers::<_, Vec<String>>(format!("vc_members:{}", channel_id))
.smembers(format!("vc_members:{}", channel_id))
.await
.to_internal_error()
}
Expand Down Expand Up @@ -255,8 +273,9 @@ pub async fn get_channel_voice_state(channel: &Channel) -> Result<Option<v0::Cha
_ => None
};

if !members.is_empty() {
if let Some(members) = members {
let mut participants = Vec::with_capacity(members.len());
let node = get_channel_node(channel.id()).await?;

for user_id in members {
if let Some(voice_state) = get_voice_state(channel.id(), server, &user_id).await? {
Expand All @@ -271,6 +290,7 @@ pub async fn get_channel_voice_state(channel: &Channel) -> Result<Option<v0::Cha
Ok(Some(v0::ChannelVoiceState {
id: channel.id().to_string(),
participants,
node
}))
} else {
Ok(None)
Expand All @@ -292,17 +312,18 @@ pub async fn move_user(user: &str, from: &str, to: &str) -> Result<()> {
}

pub async fn sync_voice_permissions(db: &Database, voice_client: &VoiceClient, channel: &Channel, server: Option<&Server>, role_id: Option<&str>) -> Result<()> {
let node = get_channel_node(channel.id()).await?;

for user_id in get_voice_channel_members(channel.id()).await? {
for user_id in get_voice_channel_members(channel.id()).await?.iter().flatten() {
let user = Reference::from_unchecked(user_id.clone()).as_user(db).await?;

sync_user_voice_permissions(db, voice_client, &user, channel, server, role_id).await?;
sync_user_voice_permissions(db, voice_client, &node, &user, channel, server, role_id).await?;
};

Ok(())
}

pub async fn sync_user_voice_permissions(db: &Database, voice_client: &VoiceClient, user: &User, channel: &Channel, server: Option<&Server>, role_id: Option<&str>) -> Result<()> {
pub async fn sync_user_voice_permissions(db: &Database, voice_client: &VoiceClient, node: &str, user: &User, channel: &Channel, server: Option<&Server>, role_id: Option<&str>) -> Result<()> {
let channel_id = channel.id();
let server_id: Option<&str> = server.as_ref().map(|s| s.id.as_str());

Expand All @@ -315,7 +336,8 @@ pub async fn sync_user_voice_permissions(db: &Database, voice_client: &VoiceClie
let voice_state = get_voice_state(channel_id, server_id, &user.id).await?.unwrap();

let mut query = DatabasePermissionQuery::new(db, user)
.channel(channel);
.channel(channel)
.user(user);

if let (Some(server), Some(member)) = (server, member.as_ref()) {
query = query.member(member).server(server)
Expand All @@ -340,7 +362,7 @@ pub async fn sync_user_voice_permissions(db: &Database, voice_client: &VoiceClie

update_voice_state(channel_id, server_id, &user.id, &update_event).await?;

voice_client.update_permissions(user, channel_id, ParticipantPermission {
voice_client.update_permissions(node, user, channel_id, ParticipantPermission {
can_subscribe: can_listen,
can_publish: can_speak,
can_publish_data: can_speak,
Expand Down
65 changes: 45 additions & 20 deletions crates/core/database/src/voice/voice_client.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use livekit_api::{
access_token::{AccessToken, VideoGrants},
services::room::{CreateRoomOptions, RoomClient, UpdateParticipantOptions},
services::room::{CreateRoomOptions, RoomClient as InnerRoomClient, UpdateParticipantOptions},
};
use livekit_protocol::{ParticipantInfo, ParticipantPermission, Room};
use revolt_config::config;
use revolt_config::{config, LiveKitNode};
use crate::models::{Channel, User};
use revolt_models::v0;
use revolt_permissions::{ChannelPermission, PermissionValue};
Expand All @@ -12,41 +12,59 @@ use std::{borrow::Cow, collections::HashMap, time::Duration};

use super::get_allowed_sources;


#[derive(Debug)]
pub struct RoomClient {
pub client: InnerRoomClient,
pub node: LiveKitNode
}

#[derive(Debug)]
pub struct VoiceClient {
rooms: RoomClient,
api_key: String,
api_secret: String,
pub rooms: HashMap<String, RoomClient>
}

impl VoiceClient {
pub fn new(url: String, api_key: String, api_secret: String) -> Self {
pub fn new(nodes: HashMap<String, LiveKitNode>) -> Self {
Self {
rooms: RoomClient::with_api_key(&url, &api_key, &api_secret),
api_key,
api_secret,
rooms: nodes
.into_iter()
.map(|(name, node)|
(
name,
RoomClient {
client: InnerRoomClient::with_api_key(&node.url, &node.key, &node.secret),
node
}
)
)
.collect()
}
}

pub async fn from_revolt_config() -> Self {
let config = config().await;

Self::new(
config.hosts.livekit,
config.api.livekit.key,
config.api.livekit.secret,
)
Self::new(config.api.livekit.nodes.clone())
}

pub fn get_node(&self, name: &str) -> Result<&RoomClient> {
self.rooms.get(name)
.ok_or_else(|| create_error!(UnknownNode))
}

pub fn create_token(
&self,
node: &str,
user: &User,
permissions: PermissionValue,
channel: &Channel,
) -> Result<String> {
let room = self.get_node(node)?;

let allowed_sources = get_allowed_sources(permissions);

AccessToken::with_api_key(&self.api_key, &self.api_secret)
AccessToken::with_api_key(&room.node.key, &room.node.secret)
.with_name(&format!("{}#{}", user.username, user.discriminator))
.with_identity(&user.id)
.with_metadata(&serde_json::to_string(&user).to_internal_error()?)
Expand All @@ -63,7 +81,9 @@ impl VoiceClient {
.to_internal_error()
}

pub async fn create_room(&self, channel: &Channel) -> Result<Room> {
pub async fn create_room(&self, node: &str, channel: &Channel) -> Result<Room> {
let room = self.get_node(node)?;

let voice = match channel {
Channel::DirectMessage { .. } | Channel::VoiceChannel { .. } => Some(Cow::Owned(v0::VoiceInformation::default())),
Channel::TextChannel { voice: Some(voice), .. } => Some(Cow::Borrowed(voice)),
Expand All @@ -72,7 +92,7 @@ impl VoiceClient {

.ok_or_else(|| create_error!(NotAVoiceChannel))?;

self.rooms
room.client
.create_room(
channel.id(),
CreateRoomOptions {
Expand All @@ -87,11 +107,14 @@ impl VoiceClient {

pub async fn update_permissions(
&self,
node: &str,
user: &User,
channel_id: &str,
new_permissions: ParticipantPermission,
) -> Result<ParticipantInfo> {
self.rooms
let room = self.get_node(node)?;

room.client
.update_participant(
channel_id,
&user.id,
Expand All @@ -106,8 +129,10 @@ impl VoiceClient {
.to_internal_error()
}

pub async fn remove_user(&self, user_id: &str, channel_id: &str) -> Result<()> {
self.rooms.remove_participant(channel_id, user_id)
pub async fn remove_user(&self, node: &str, user_id: &str, channel_id: &str) -> Result<()> {
let room = self.get_node(node)?;

room.client.remove_participant(channel_id, user_id)
.await
.to_internal_error()
}
Expand Down
9 changes: 8 additions & 1 deletion crates/core/models/src/v0/channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,14 @@ auto_derived!(
pub struct ChannelVoiceState {
pub id: String,
/// The states of the users who are connected to the channel
pub participants: Vec<UserVoiceState>
pub participants: Vec<UserVoiceState>,
/// The node's node which the channel is currently using
pub node: String
}

/// Join a voice channel
pub struct DataJoinCall {
pub node: String
}

);
Expand Down
1 change: 1 addition & 0 deletions crates/core/permissions/src/models/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ pub static DEFAULT_PERMISSION: Lazy<u64> = Lazy::new(|| {
+ ChannelPermission::Connect
+ ChannelPermission::Speak
+ ChannelPermission::Listen
+ ChannelPermission::Video
)
});

Expand Down
1 change: 1 addition & 0 deletions crates/core/result/src/axum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ impl IntoResponse for Error {
ErrorType::AlreadyInVoiceChannel => StatusCode::BAD_REQUEST,
ErrorType::NotAVoiceChannel => StatusCode::BAD_REQUEST,
ErrorType::AlreadyConnected => StatusCode::BAD_REQUEST,
ErrorType::UnknownNode => StatusCode::BAD_REQUEST,
ErrorType::ProxyError => StatusCode::BAD_REQUEST,
ErrorType::FileTooSmall => StatusCode::UNPROCESSABLE_ENTITY,
ErrorType::FileTooLarge { .. } => StatusCode::UNPROCESSABLE_ENTITY,
Expand Down
1 change: 1 addition & 0 deletions crates/core/result/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ pub enum ErrorType {
AlreadyInVoiceChannel,
NotAVoiceChannel,
AlreadyConnected,
UnknownNode,
// ? Micro-service errors
ProxyError,
FileTooSmall,
Expand Down
1 change: 1 addition & 0 deletions crates/core/result/src/rocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ impl<'r> Responder<'r, 'static> for Error {
ErrorType::AlreadyInVoiceChannel => Status::BadRequest,
ErrorType::NotAVoiceChannel => Status::BadRequest,
ErrorType::AlreadyConnected => Status::BadRequest,
ErrorType::UnknownNode => Status::BadRequest,
ErrorType::ProxyError => Status::BadRequest,
ErrorType::FileTooSmall => Status::UnprocessableEntity,
ErrorType::FileTooLarge { .. } => Status::UnprocessableEntity,
Expand Down
Loading

0 comments on commit 367ac88

Please sign in to comment.