Skip to content

Commit

Permalink
feat: Add an API to move endpoints. (#1161)
Browse files Browse the repository at this point in the history
* feat: Add an API to move endpoints.
  • Loading branch information
bgrozev authored Jul 17, 2024
1 parent 213f776 commit b9d0612
Show file tree
Hide file tree
Showing 9 changed files with 369 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.jitsi.xmpp.extensions.colibri.ColibriStatsExtension
import org.json.simple.JSONObject
import org.jxmpp.jid.Jid
import java.time.Clock
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors

/**
Expand All @@ -50,17 +51,16 @@ class BridgeSelector @JvmOverloads constructor(
fun addHandler(eventHandler: EventHandler) = eventEmitter.addHandler(eventHandler)
fun removeHandler(eventHandler: EventHandler) = eventEmitter.removeHandler(eventHandler)

/**
* The bridge selection strategy.
*/
/** The bridge selection strategy. */
private val bridgeSelectionStrategy = BridgeConfig.config.selectionStrategy.also {
logger.info("Using ${it.javaClass.name}")
}

/**
* The map of bridge JID to <tt>Bridge</tt>.
*/
private val bridges: MutableMap<Jid, Bridge> = mutableMapOf()
/** The map of bridge JID to <tt>Bridge</tt>. */
private val bridges: MutableMap<Jid, Bridge> = ConcurrentHashMap()

/** Get the [Bridge] with a specific JID or null */
fun get(jid: Jid) = bridges[jid]

init {
JicofoMetricsContainer.instance.metricsUpdater.addUpdateTask { updateMetrics() }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.jitsi.jicofo.bridge.colibri

import org.jitsi.jicofo.bridge.Bridge
import org.jitsi.jicofo.bridge.ConferenceBridgeProperties
import org.jitsi.jicofo.conference.source.EndpointSourceSet
import org.jitsi.utils.MediaType
import org.jitsi.utils.OrderedJsonObject
Expand All @@ -41,8 +42,12 @@ interface ColibriSessionManager {
val bridgeCount: Int
val bridgeRegions: Set<String>

/** Get the list of participant IDs that are currently allocated on a specific [Bridge]. */
fun getParticipants(bridge: Bridge): List<String>

@Throws(ColibriAllocationFailedException::class, BridgeSelectionFailedException::class)
fun allocate(participant: ParticipantAllocationParameters): ColibriAllocation
fun getBridges(): Map<Bridge, ConferenceBridgeProperties>

fun updateParticipant(
participantId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class ColibriV2SessionManager(
}

/**
* The colibri2 sessions that are currently active, mapped by the [Bridge] that they use.
* The colibri2 sessions that are currently active, mapped by the relayId of the [Bridge] that they use.
*/
override val sessions = mutableMapOf<String?, Colibri2Session>()

Expand All @@ -92,15 +92,18 @@ class ColibriV2SessionManager(
*/
private val participants = mutableMapOf<String, ParticipantInfo>()

override fun getParticipants(bridge: Bridge): List<String> = synchronized(syncRoot) {
val session = sessions[bridge.relayId] ?: return emptyList()
return getSessionParticipants(session).map { it.id }
}

/**
* Maintains the same set as [participants], but organized by their session. Needs to be kept in sync with
* [participants] (see [add], [remove], [clear]).
*/
private val participantsBySession = mutableMapOf<Colibri2Session, MutableList<ParticipantInfo>>()

/**
* Protects access to [sessions], [participants] and [participantsBySession].
*/
/** Protects access to [sessions], [participants] and [participantsBySession]. */
private val syncRoot = Any()

/**
Expand Down Expand Up @@ -243,7 +246,7 @@ class ColibriV2SessionManager(
}

/** Get the bridge-to-bridge-properties map needed for bridge selection. */
private fun getBridges(): Map<Bridge, ConferenceBridgeProperties> = synchronized(syncRoot) {
override fun getBridges(): Map<Bridge, ConferenceBridgeProperties> = synchronized(syncRoot) {
return participantsBySession.entries
.filter { it.key.bridge.isOperational }
.associate {
Expand Down
4 changes: 4 additions & 0 deletions jicofo-selector/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,10 @@ jicofo {
// Enable the conference-request endpoint
enabled = true
}
move-endpoints {
// Enable the move-endpoint API.
enabled = true
}
}

sctp {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.jitsi.jicofo.conference;

import org.jetbrains.annotations.*;
import org.jitsi.jicofo.bridge.*;
import org.jitsi.jicofo.jibri.*;
import org.jitsi.jicofo.xmpp.*;
import org.jitsi.jicofo.xmpp.muc.*;
Expand Down Expand Up @@ -150,4 +151,20 @@ MuteResult handleMuteRequest(
/** Get the stats for this conference that should be exported to rtcstats. */
@NotNull
OrderedJsonObject getRtcstatsState();

/** Move (reinvite) an endpoint in this conference. Return true if the endpoint was moved. */
boolean moveEndpoint(@NotNull String endpointId, Bridge bridge);

/**
* Move (reinvite) a specific number of endpoints from the conference from a specific bridge. The implementation
* decides which endpoints to move.
*
* @param bridge the bridge from which to move endpoints.
* @param numEps the number of endpoints to move.
* @return the number of endpoints moved.
*/
int moveEndpoints(@NotNull Bridge bridge, int numEps);

/** Get information about the bridges currently used by this conference. */
Map<Bridge, ConferenceBridgeProperties> getBridges();
}
Original file line number Diff line number Diff line change
Expand Up @@ -1686,6 +1686,56 @@ public long getVisitorCount()
}
}

public Map<Bridge, ConferenceBridgeProperties> getBridges()
{
ColibriSessionManager colibriSessionManager = this.colibriSessionManager;
if (colibriSessionManager == null)
{
return Collections.emptyMap();
}
return colibriSessionManager.getBridges();
}

@Override
public boolean moveEndpoint(@NotNull String endpointId, Bridge bridge)
{
if (bridge != null)
{
List<String> bridgeParticipants = colibriSessionManager.getParticipants(bridge);
if (!bridgeParticipants.contains(endpointId))
{
logger.warn("Endpoint " + endpointId + " is not connected to bridge " + bridge.getJid());
return false;
}
}
ColibriSessionManager colibriSessionManager = this.colibriSessionManager;
if (colibriSessionManager == null)
{
return false;
}

colibriSessionManager.removeParticipant(endpointId);
return reInviteParticipantsById(Collections.singletonList(endpointId)) == 1;
}

@Override
public int moveEndpoints(@NotNull Bridge bridge, int numEps)
{
logger.info("Moving " + numEps + " endpoints from " + bridge.getJid());
ColibriSessionManager colibriSessionManager = this.colibriSessionManager;
if (colibriSessionManager == null)
{
return 0;
}
List<String> participantIds
= colibriSessionManager.getParticipants(bridge).stream().limit(numEps).collect(Collectors.toList());
for (String participantId : participantIds)
{
colibriSessionManager.removeParticipant(participantId);
}
return reInviteParticipantsById(participantIds);
}

/**
* Checks whether a request for a new endpoint to join this conference should be redirected to a visitor node.
* @return the name of the visitor node if it should be redirected, and null otherwise.
Expand Down Expand Up @@ -1887,33 +1937,41 @@ private void onBridgeUp(Jid bridgeJid)
}
}

private void reInviteParticipantsById(@NotNull List<String> participantIdsToReinvite)
private int reInviteParticipantsById(@NotNull List<String> participantIdsToReinvite)
{
reInviteParticipantsById(participantIdsToReinvite, true);
return reInviteParticipantsById(participantIdsToReinvite, true);
}

private void reInviteParticipantsById(@NotNull List<String> participantIdsToReinvite, boolean updateParticipant)
private int reInviteParticipantsById(@NotNull List<String> participantIdsToReinvite, boolean updateParticipant)
{
if (!participantIdsToReinvite.isEmpty())
int n = participantIdsToReinvite.size();
if (n == 0)
{
ConferenceMetrics.participantsMoved.addAndGet(participantIdsToReinvite.size());
synchronized (participantLock)
return 0;
}

List<Participant> participantsToReinvite = new ArrayList<>();
synchronized (participantLock)
{
for (Participant participant : participants.values())
{
List<Participant> participantsToReinvite = new ArrayList<>();
for (Participant participant : participants.values())
if (participantsToReinvite.size() == n)
{
if (participantIdsToReinvite.contains(participant.getEndpointId()))
{
participantsToReinvite.add(participant);
}
break;
}
if (participantsToReinvite.size() != participantIdsToReinvite.size())
if (participantIdsToReinvite.contains(participant.getEndpointId()))
{
logger.error("Can not re-invite all participants, no Participant object for some of them.");
participantsToReinvite.add(participant);
}
reInviteParticipants(participantsToReinvite, updateParticipant);
}
if (participantsToReinvite.size() != participantIdsToReinvite.size())
{
logger.error("Can not re-invite all participants, no Participant object for some of them.");
}
reInviteParticipants(participantsToReinvite, updateParticipant);
}
ConferenceMetrics.participantsMoved.addAndGet(participantsToReinvite.size());
return participantsToReinvite.size();
}

/**
Expand Down
8 changes: 6 additions & 2 deletions jicofo/src/main/kotlin/org/jitsi/jicofo/JicofoServices.kt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import org.jitsi.jicofo.metrics.JicofoMetricsContainer
import org.jitsi.jicofo.rest.Application
import org.jitsi.jicofo.rest.ConferenceRequest
import org.jitsi.jicofo.rest.RestConfig
import org.jitsi.jicofo.rest.move.MoveEndpoints
import org.jitsi.jicofo.rest.move.MoveEndpointsConfig
import org.jitsi.jicofo.util.SynchronizedDelegate
import org.jitsi.jicofo.version.CurrentVersionImpl
import org.jitsi.jicofo.xmpp.XmppServices
Expand Down Expand Up @@ -147,8 +149,7 @@ class JicofoServices {
jettyServer = if (RestConfig.config.enabled) {
logger.info("Starting HTTP server with config: ${RestConfig.config.httpServerConfig}.")
val restApp = Application(
buildList
{
buildList {
healthChecker?.let {
add(org.jitsi.rest.Health(it))
}
Expand All @@ -159,6 +160,9 @@ class JicofoServices {
if (RestConfig.config.enablePrometheus) {
add(Prometheus(JicofoMetricsContainer.instance))
}
if (MoveEndpointsConfig.enabled) {
add(MoveEndpoints(focusManager, bridgeSelector))
}
}
)
createServer(RestConfig.config.httpServerConfig).also {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,6 @@ class ConferenceRequest(
* in the constructor below) is necessary. This class exists in order to expose
* that behavior in a more concise way
*/
private class BadRequestExceptionWithMessage(message: String?) : BadRequestException(
class BadRequestExceptionWithMessage(message: String?) : BadRequestException(
Response.status(HttpServletResponse.SC_BAD_REQUEST, message).build()
)
Loading

0 comments on commit b9d0612

Please sign in to comment.