Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Monitor visitors' codec lists, and aggregate them into a conference property. #1137

Merged
merged 11 commits into from
Feb 27, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ interface ChatRoomMember {
/** The statistics id if any. */
val statsId: String?

/** The supported video codecs if any */
val videoCodecs: List<String>?

/**
* The list of features advertised as XMPP capabilities. Note that although the features are cached (XEP-0115),
* the first time [features] is accessed it may block waiting for a disco#info response!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.jitsi.utils.logging2.Logger
import org.jitsi.utils.logging2.createChildLogger
import org.jitsi.xmpp.extensions.jitsimeet.AudioMutedExtension
import org.jitsi.xmpp.extensions.jitsimeet.FeaturesExtension
import org.jitsi.xmpp.extensions.jitsimeet.JitsiParticipantCodecList
import org.jitsi.xmpp.extensions.jitsimeet.JitsiParticipantRegionPacketExtension
import org.jitsi.xmpp.extensions.jitsimeet.StartMutedPacketExtension
import org.jitsi.xmpp.extensions.jitsimeet.StatsId
Expand Down Expand Up @@ -68,6 +69,8 @@ class ChatRoomMemberImpl(
private set
override var statsId: String? = null
private set
override var videoCodecs: List<String>? = null
private set
override var isAudioMuted = true
private set
override var isVideoMuted = true
Expand Down Expand Up @@ -209,6 +212,27 @@ class ChatRoomMemberImpl(
presence.getExtension(StatsId::class.java)?.let {
statsId = it.statsId
}

presence.getExtension(JitsiParticipantCodecList::class.java)?.let {
if (videoCodecs != null && it.codecs != videoCodecs) {
logger.warn("Video codec list changed from {$videoCodecs} to {${it.codecs}} - not supported!")
}
if (!it.codecs.contains("vp8")) {
logger.warn("Video codec list {${it.codecs}} does not contain vp8! Adding manually.")
videoCodecs = it.codecs + "vp8"
} else {
videoCodecs = it.codecs
}
} ?: // Older clients sent a single codec in codecType rather than all supported ones in codecList
presence.getExtensionElement("jitsi_participant_codecType", "jabber:client")?.let {
if (it is StandardExtensionElement) {
videoCodecs = if (it.text == "vp8") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In JitsiParticipantCodecList.kt you use lowercase(). Is it also required here?

listOf(it.text)
} else {
listOf(it.text, "vp8")
}
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ public class JitsiMeetConferenceImpl
return null;
});

/**
* The aggregated count of visitors' supported codecs
*/
private final PreferenceAggregator visitorCodecs;

/**
* The {@link JibriRecorder} instance used to provide live streaming through
* Jibri.
Expand Down Expand Up @@ -290,6 +295,16 @@ public JitsiMeetConferenceImpl(
TimeUnit.MILLISECONDS);


visitorCodecs = new PreferenceAggregator(
logger,
(codecs) -> {
setConferenceProperty(
ConferenceProperties.KEY_VISITOR_CODECS,
String.join(",", codecs)
);
return null;
});

logger.info("Created new conference.");
}

Expand Down Expand Up @@ -816,7 +831,7 @@ private void inviteChatMember(ChatRoomMember chatRoomMember, boolean justJoined)
}
else if (participant.getChatMember().getRole() == MemberRole.VISITOR)
{
visitorAdded();
visitorAdded(participant.getChatMember().getVideoCodecs());
}
}

Expand Down Expand Up @@ -1042,7 +1057,7 @@ private void terminateParticipant(
}
else if (removed.getChatMember().getRole() == MemberRole.VISITOR)
{
visitorRemoved();
visitorRemoved(removed.getChatMember().getVideoCodecs());
}
}
}
Expand Down Expand Up @@ -2013,15 +2028,21 @@ private void rescheduleSingleParticipantTimeout()
}

/** Called when a new visitor has been added to the conference. */
private void visitorAdded()
private void visitorAdded(List<String> codecs)
{
visitorCount.adjustValue(+1);
if (codecs != null) {
visitorCodecs.addPreference(codecs);
}
}

/** Called when a new visitor has been added to the conference. */
private void visitorRemoved()
private void visitorRemoved(List<String> codecs)
{
visitorCount.adjustValue(-1);
if (codecs != null) {
visitorCodecs.removePreference(codecs);
Copy link
Member

@bgrozev bgrozev Feb 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We allow codecs to change from null to a non-null, so if a visitor joins with no codecs and then adds some it would mess with the count. It's a bit of a stretch, but one could trick jicofo into upgrading to av1 thus breaking video for some participants.
Maybe make ChatRoomMemberImpl.videoCodecs a lazy val and initialize it with the first presence received?

Unless I misunderstand how the aggregator works

}
}

/**
Expand Down
128 changes: 128 additions & 0 deletions jicofo/src/main/kotlin/org/jitsi/jicofo/util/PreferenceAggregator.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Jicofo, the Jitsi Conference Focus.
*
* Copyright @ 2015-Present 8x8, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jitsi.jicofo.util

import org.jitsi.utils.logging2.Logger
import org.jitsi.utils.logging2.createChildLogger

/** Aggregate lists of preferences coming from a large group of people, such that the resulting aggregated
* list consists of preference items supported by everyone, and in a rough consensus of preference order.
*
* The intended use case is maintaining the list of supported codecs for conference visitors.
*
* Preference orders are aggregated using the Borda count; this isn't theoretically optimal, but it should be
* good enough, and it's computationally cheap.
*/
class PreferenceAggregator(
parentLogger: Logger,
private val onChanged: (List<String>) -> Unit
) {
private val logger = createChildLogger(parentLogger)
private val lock = Any()

var aggregate: List<String> = emptyList()
private set

var count = 0
private set

private val values = mutableMapOf<String, ValueInfo>()

/**
* Add a preference to the aggregator.
*/
fun addPreference(prefs: List<String>) {
val distinctPrefs = prefs.distinct()
if (distinctPrefs != prefs) {
logger.warn("Preferences $prefs contains repeated values")
}
synchronized(lock) {
count++
distinctPrefs.forEachIndexed { index, element ->
val info = values.computeIfAbsent(element) { ValueInfo() }
info.count++
info.rankAggregate += index
}
updateAggregate()
}
}

/**
* Remove a preference from the aggregator.
*/
fun removePreference(prefs: List<String>) {
val distinctPrefs = prefs.distinct()
if (distinctPrefs != prefs) {
logger.warn("Preferences $prefs contains repeated values")
}
synchronized(lock) {
count--
check(count >= 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could also be force to fail by sending initial presence with no codecs, then updating with some codecs, then leaving repeatedly

"Preference count $count should not be negative"
}
distinctPrefs.forEachIndexed { index, element ->
val info = values[element]
check(info != null) {
"Preference info for $element should exist when preferences are being removed"
}
info.count--
check(info.count >= 0) {
"Preference count for $element ${info.count} should not be negative"
}
info.rankAggregate -= index
check(info.rankAggregate >= 0) {
"Preference rank aggregate for $element ${info.rankAggregate} should not be negative"
}
if (info.count == 0) {
check(info.rankAggregate == 0) {
"Preference rank aggregate for $element ${info.rankAggregate} should be zero " +
"when preference count is 0"
}
values.remove(element)
}
}
updateAggregate()
}
}

fun reset() {
synchronized(lock) {
aggregate = emptyList()
count = 0
values.clear()
}
}

private fun updateAggregate() {
val newAggregate = values.asSequence()
.filter { it.value.count == count }
.sortedBy { it.value.rankAggregate }
.map { it.key }
.toList()
if (aggregate != newAggregate) {
aggregate = newAggregate
/* ?? Do we need to drop the lock before calling this? */
onChanged(aggregate)
}
}

private class ValueInfo {
var count = 0
var rankAggregate = 0
}
}
6 changes: 6 additions & 0 deletions jicofo/src/main/kotlin/org/jitsi/jicofo/xmpp/Smack.kt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.jitsi.xmpp.extensions.jitsimeet.ConferenceIqProvider
import org.jitsi.xmpp.extensions.jitsimeet.FeatureExtension
import org.jitsi.xmpp.extensions.jitsimeet.FeaturesExtension
import org.jitsi.xmpp.extensions.jitsimeet.IceStatePacketExtension
import org.jitsi.xmpp.extensions.jitsimeet.JitsiParticipantCodecList
import org.jitsi.xmpp.extensions.jitsimeet.JitsiParticipantRegionPacketExtension
import org.jitsi.xmpp.extensions.jitsimeet.JsonMessageExtension
import org.jitsi.xmpp.extensions.jitsimeet.LoginUrlIqProvider
Expand Down Expand Up @@ -119,6 +120,11 @@ fun registerXmppExtensions() {
StatsId.NAMESPACE,
StatsId.Provider()
)
ProviderManager.addExtensionProvider(
JitsiParticipantCodecList.ELEMENT,
JitsiParticipantCodecList.NAMESPACE,
DefaultPacketExtensionProvider(JitsiParticipantCodecList::class.java)
)

// Add the extensions used for handling the inviting of transcriber
ProviderManager.addExtensionProvider(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ class JibriChatRoomMember(
override val isVideoMuted: Boolean get() = TODO("Not yet implemented")
override val region: String? get() = TODO("Not yet implemented")
override val statsId: String? get() = TODO("Not yet implemented")
override val videoCodecs: List<String>? get() = TODO("Not yet implemented")
override val features: Set<Features> get() = TODO("Not yet implemented")
override val debugState: OrderedJsonObject get() = TODO("Not yet implemented")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package org.jitsi.jicofo.util

import io.kotest.core.spec.IsolationMode
import io.kotest.core.spec.style.ShouldSpec
import io.kotest.matchers.collections.shouldContainExactly
import io.kotest.matchers.collections.shouldContainExactlyInAnyOrder
import io.kotest.matchers.shouldBe
import org.jitsi.utils.logging2.createLogger

class PreferenceAggregatorTest : ShouldSpec() {
override fun isolationMode() = IsolationMode.InstancePerLeaf

private val logger = createLogger()

private val calledWith = mutableListOf<List<String>>()
private val aggregator = PreferenceAggregator(logger) {
calledWith.add(it)
}

init {
context("An aggregator with no values added") {
should("Not call its callback") {
calledWith shouldBe emptyList()
}
}
context("An aggregator called once") {
aggregator.addPreference(listOf("vp9", "vp8", "h264"))
should("Call the callback exactly once with that set of values") {
calledWith shouldContainExactly listOf(listOf("vp9", "vp8", "h264"))
}
}
context("An aggregator called twice with the same values") {
aggregator.addPreference(listOf("vp9", "vp8", "h264"))
aggregator.addPreference(listOf("vp9", "vp8", "h264"))
should("Call the callback exactly once") {
calledWith shouldContainExactly listOf(listOf("vp9", "vp8", "h264"))
}
}
context("An aggregator with all preferences removed") {
aggregator.addPreference(listOf("vp9", "vp8", "h264"))
aggregator.removePreference(listOf("vp9", "vp8", "h264"))
should("Have its final output be the empty set") {
calledWith.last() shouldBe emptyList()
}
}
context("Aggregating preferences with disparate values (subset)") {
aggregator.addPreference(listOf("vp9", "vp8", "h264"))
aggregator.addPreference(listOf("vp8", "h264"))
should("Output the minimal agreed set") {
calledWith.last().shouldContainExactly("vp8", "h264")
}
}
context("Aggregating preferences with disparate values (non-subset)") {
aggregator.addPreference(listOf("vp9", "vp8", "h264"))
aggregator.addPreference(listOf("vp8", "h264"))
aggregator.addPreference(listOf("vp9", "vp8"))
should("Output the minimal agreed set") {
calledWith.last().shouldContainExactly("vp8")
}
}
context("Aggregating a new superset") {
aggregator.addPreference(listOf("vp9", "vp8", "h264"))
aggregator.addPreference(listOf("av1", "vp9", "vp8", "h264"))
should("Not call the callback a second time") {
calledWith shouldContainExactly listOf(listOf("vp9", "vp8", "h264"))
}
}
context("Removing the only preference that does not support a value") {
aggregator.addPreference(listOf("vp9", "vp8", "h264"))
aggregator.addPreference(listOf("vp8", "h264"))
aggregator.addPreference(listOf("vp9", "vp8"))

aggregator.removePreference(listOf("vp8", "h264"))
should("Return that value to the set of preferences") {
calledWith.last().shouldContainExactly(listOf("vp9", "vp8"))
}
}
context("Preferences that express different orders") {
aggregator.addPreference(listOf("vp9", "vp8", "h264"))
aggregator.addPreference(listOf("vp9", "vp8", "h264"))
aggregator.addPreference(listOf("vp9", "h264", "vp8"))

should("Reflect the majority preference") {
calledWith shouldContainExactly listOf(listOf("vp9", "vp8", "h264"))
}
}
context("Ties in preference order") {
aggregator.addPreference(listOf("vp9", "vp8", "h264"))
aggregator.addPreference(listOf("vp9", "h264", "vp8"))

should("Result in the correct set, in some order, with consensus where it exists") {
calledWith.last().shouldContainExactlyInAnyOrder("h264", "vp9", "vp8")
calledWith.last().first() shouldBe "vp9"
}
}
context("Repeated values in preferences") {
aggregator.addPreference(listOf("vp9", "vp8"))
aggregator.addPreference(listOf("vp9", "vp8", "vp9"))
should("not confuse things") {
calledWith shouldContainExactly listOf(listOf("vp9", "vp8"))
}
aggregator.removePreference(listOf("vp9", "vp8", "vp9"))
aggregator.removePreference(listOf("vp9", "vp8"))
should("not confuse things on removal") {
calledWith.last().shouldContainExactly(emptyList())
}
}
}
}
Loading