Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
553c4a5
MEDIA-4876: use PoolBuffer for data channel messages
reddvl1980 Apr 7, 2026
42fa29a
MEDIA-4876: add 'max-message-size' to conf actions reply's 'data' sec…
reddvl1980 Apr 7, 2026
18257bc
MEDIA-4876: add unit tests
reddvl1980 Apr 7, 2026
3ab538c
MEDIA-4876: address PR issues (cpp-check)
reddvl1980 Apr 7, 2026
26081e4
MEDIA-4876: address PR issues (include folder fix)
reddvl1980 Apr 7, 2026
a92482c
MEDIA-4876: delete unnecessary IEngine
reddvl1980 Apr 7, 2026
9c7e129
MEDIA-4876: address PR issues (rename)
reddvl1980 Apr 7, 2026
7d36059
MEDIA-4876: address dynamic memory issues
reddvl1980 Apr 7, 2026
fce5c0d
MEDIA-4876: pass PoolBuffer instead of copying
reddvl1980 Apr 7, 2026
1eb3951
MEDIA-4876: fix custom deleter for PoolBuffer
reddvl1980 Apr 7, 2026
0026d95
MEDIA-4876: get rid of AsIs and NullTerminated helpers
reddvl1980 Apr 8, 2026
4b3e597
MEDIA-4876: make sendSctp accept PoolBuffer rather then data*,len
reddvl1980 Apr 8, 2026
92abf83
MEDIA-4876: wip
reddvl1980 Apr 8, 2026
5fc2160
MEDIA-4876: address cpp-check issue
reddvl1980 Apr 9, 2026
e4eccd0
MEDIA-4876: renamed WebRtcDataStream::onSctpMessage -> onSctpMessageB…
reddvl1980 Apr 9, 2026
daa24f2
MEDIA-4876: get rid of use getReadonlyBuffer() in EngineMixer::sendEn…
reddvl1980 Apr 9, 2026
88835b8
MEDIA-4876: get rid of use getReadonlyBuffer() in EngineMixer::proces…
reddvl1980 Apr 9, 2026
6939267
MEDIA-4876: fix typo in unit test detected by DCheck build
reddvl1980 Apr 10, 2026
83a64ab
MEDIA-4876: add graceful termination of MixerManger to DataCHannelMes…
reddvl1980 Apr 10, 2026
5e980af
MEDIA-4876: PoolBuffer mem optimizaton 1/2
reddvl1980 Apr 10, 2026
5377830
MEDIA-4876: PoolBuffer mem optimization 2/2
reddvl1980 Apr 10, 2026
d1438b5
MEDIA-4876: remove view() class from PoolBuffer
reddvl1980 Apr 10, 2026
a95ec93
MEDIA-4876: renamed PoolBuffer read/write to copyFrom/copyTo
reddvl1980 Apr 10, 2026
e3c6925
MEDIA-4876: get rid of getContinuesBuffer in favor of on-stack variab…
reddvl1980 Apr 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,9 @@ git_version.*
versioninfo.txt
rtprcv-*
_bwelogs*

# Executables
smb
UnitTest2
UnitTest
LoadTest
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,7 @@ set(TEST_FILES
test/api/ParserTest.cpp
test/memory/MapTest.cpp
test/memory/PoolAllocatorTest.cpp
test/memory/PoolBufferTest.cpp
test/memory/RingAllocatorTest.cpp
test/utils/StringTokenizerTest.cpp
test/utils/TrackerTest.cpp
Expand Down Expand Up @@ -651,6 +652,7 @@ set(TEST_FILES
test/memory/StackMapTest.cpp
test/bridge/ActiveMediaListTestLevels.h
test/bridge/MixerTest.cpp
test/bridge/DataChannelMessageSizeTest.cpp
test/bridge/VideoNackReceiveJobTest.cpp
test/utils/LogSpamTest.cpp
test/utils/FunctionTest.cpp
Expand Down
67 changes: 66 additions & 1 deletion api/DataChannelMessage.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "memory/PoolBuffer.h"
#include "utils/StringBuilder.h"

#if ENABLE_LEGACY_API
Expand All @@ -12,7 +13,8 @@ namespace api
namespace DataChannelMessage
{

inline void makeEndpointMessage(utils::StringBuilder<2048>& outMessage,
template <size_t T>
inline void makeEndpointMessage(utils::StringBuilder<T>& outMessage,
const std::string& toEndpointId,
const std::string& fromEndpointId,
const char* message)
Expand All @@ -33,6 +35,69 @@ inline void makeEndpointMessage(utils::StringBuilder<2048>& outMessage,
#endif
}

inline memory::UniquePoolBuffer<memory::PacketPoolAllocator> makeUniqueEndpointMessageBuffer(
const std::string& toEndpointId,
const std::string& fromEndpointId,
const memory::UniquePoolBuffer<memory::PacketPoolAllocator>& payload)
{
#if ENABLE_LEGACY_API
return legacyapi::DataChannelMessage::makeUniqueEndpointMessageBuffer(toEndpointId, fromEndpointId, payload);
#else
constexpr const char* TO_STRING = "{\"type\":\"EndpointMessage\",\"to\":\"";
constexpr const char* FROM_STRING = "\",\"from\":\"";
constexpr const char* MSG_STRING = "\",\"payload\":";
constexpr const char* TAIL_STRING = "}";

constexpr std::size_t overhead_len = std::char_traits<char>::length(TO_STRING) +
std::char_traits<char>::length(FROM_STRING) +
std::char_traits<char>::length(MSG_STRING) +
std::char_traits<char>::length(TAIL_STRING);

const std::size_t extraLen = toEndpointId.length() + fromEndpointId.length() + payload->getLength();
auto buffer = memory::makeUniquePoolBuffer<memory::PacketPoolAllocator>(payload->getAllocator(), overhead_len + extraLen);
if (!buffer)
{
return buffer;
}

auto written = buffer->write(TO_STRING, std::char_traits<char>::length(TO_STRING), 0);
written += buffer->write(toEndpointId.c_str(), toEndpointId.length(), written);
written += buffer->write(FROM_STRING, std::char_traits<char>::length(FROM_STRING), written);
written += buffer->write(fromEndpointId.c_str(), fromEndpointId.length(), written);
written += buffer->write(MSG_STRING, std::char_traits<char>::length(MSG_STRING), written);
written += buffer->write(*payload.get(), written);
written += buffer->write(TAIL_STRING, std::char_traits<char>::length(TAIL_STRING), written);

assert(written == buffer->getLength());
return buffer;
#endif
}

template <size_t T>
inline void makeLoggableStringFromBuffer(memory::Array<char,T>& outArray, memory::UniquePoolBuffer<memory::PacketPoolAllocator>& payload)
{
if (!payload)
{
return;
}
outArray.clear();
bool ellipsisNeeded = payload->getLength() > T - 1;

const size_t maxCStrLength = std::min(payload->getLength(), T - 1);
outArray.resize(maxCStrLength + 1);
const auto read = payload->copyTo(const_cast<void*>(reinterpret_cast<const void*>(outArray.data())), 0, maxCStrLength);
assert(read == maxCStrLength);
outArray[maxCStrLength] = '\0';

// Indicate that message was incompletely logged
if (ellipsisNeeded && T >= 4)
{
outArray[T - 2] = '.';
outArray[T - 3] = '.';
outArray[T - 4] = '.';
}
}

inline void makeDominantSpeaker(utils::StringBuilder<256>& outMessage, const char* endpointId)
{
#if ENABLE_LEGACY_API
Expand Down
1 change: 1 addition & 0 deletions api/Generator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ nlohmann::json generateAllocateEndpointResponse(const EndpointDescription& chann
const auto& data = channelsDescription.data.get();
nlohmann::json dataJson;
dataJson["port"] = data.port;
dataJson["max-message-size"] = data.maxMessageSize;
responseJson["data"] = dataJson;
}

Expand Down
1 change: 1 addition & 0 deletions api/RtcDescriptors.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,5 +140,6 @@ struct Video
struct Data
{
uint32_t port;
uint32_t maxMessageSize;
};
} // namespace api
13 changes: 8 additions & 5 deletions bridge/Mixer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1835,14 +1835,17 @@ void Mixer::sendEndpointMessage(const std::string& toEndpointId,
const utils::SimpleJson& message)
{
assert(fromEndpointIdHash);
if (message.size() >= memory::AudioPacket::maxLength())
if (message.size() >= _config.sctp.maxMessageSize)
{
logger::warn("Endpoint message too big, len %zu",
"MixerManager",
message.size()
);
return;
}

auto& audioAllocator = _engineMixer->getAudioAllocator();
auto packet = memory::makeUniquePacket(audioAllocator, message.jsonBegin(), message.size());
reinterpret_cast<char*>(packet->get())[message.size()] = 0; // null terminated in packet
auto& packetAllocator = _engineMixer->getMainAllocator();
auto buffer = memory::makeUniquePoolBuffer(packetAllocator, message.jsonBegin(), message.size());

std::lock_guard<std::mutex> locker(_configurationLock);

Expand All @@ -1857,7 +1860,7 @@ void Mixer::sendEndpointMessage(const std::string& toEndpointId,
toEndpointIdHash = dataStreamItr->second->endpointIdHash;
}

_engineMixer->asyncSendEndpointMessage(toEndpointIdHash, fromEndpointIdHash, packet);
_engineMixer->asyncSendEndpointMessage(toEndpointIdHash, fromEndpointIdHash, buffer);
}

RecordingStream* Mixer::findRecordingStream(const std::string& recordingId)
Expand Down
26 changes: 18 additions & 8 deletions bridge/MixerManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include "bridge/Mixer.h"
#include "bridge/MixerJobs.h"
#include "bridge/VideoStream.h"
#include "bridge/engine/Engine.h"

#include "bridge/engine/EngineAudioStream.h"
#include "bridge/engine/EngineBarbell.h"
#include "bridge/engine/EngineDataStream.h"
Expand Down Expand Up @@ -492,24 +492,34 @@ void MixerManager::freeVideoPacketCache(EngineMixer& mixer, uint32_t ssrc, size_
mixerItr->second->freeVideoPacketCache(ssrc, endpointIdHash);
}

void MixerManager::sctpReceived(EngineMixer& mixer, memory::UniquePacket msgPacket, size_t endpointIdHash)
void MixerManager::sctpReceived(EngineMixer& mixer, memory::UniquePoolBuffer<memory::PacketPoolAllocator> message, size_t endpointIdHash)
{
auto& sctpHeader = webrtc::streamMessageHeader(*msgPacket);
// HEADER: SctpStreamMessageHeader prepended to payload
// Need to get full message instead of first chunk only to form JSON from it.
constexpr size_t MAX_BUFFER_SIZE = 8192;
if (message->size() > MAX_BUFFER_SIZE) {
logger::warn("Received large SCTP message(size = %zu, max allowed = %zu). Dropping.", "MixerManager", message->getLength(), MAX_BUFFER_SIZE);
return;
}

char continousBuffer[message->size()];
message->copyTo(continousBuffer, 0, message->size());

auto& sctpHeader = *reinterpret_cast<const webrtc::SctpStreamMessageHeader*>(continousBuffer);

if (sctpHeader.payloadProtocol == webrtc::DataChannelPpid::WEBRTC_ESTABLISH)
{
// create command with this packet to send the binary data -> engine -> WebRtcDataStream belonging to this
// transport
mixer.asyncHandleSctpControl(endpointIdHash, msgPacket);
mixer.asyncHandleSctpControl(endpointIdHash, message);
return; // do not free packet as we passed it on
}
else if (sctpHeader.payloadProtocol == webrtc::DataChannelPpid::WEBRTC_STRING)
{
std::string body(reinterpret_cast<const char*>(sctpHeader.data()), msgPacket->getLength() - sizeof(sctpHeader));
std::string body(sctpHeader.getMessage(), message->getLength() - sizeof(sctpHeader));
try
{
auto json = utils::SimpleJson::create(reinterpret_cast<const char*>(sctpHeader.data()),
msgPacket->getLength() - sizeof(sctpHeader));
auto json = utils::SimpleJson::create(sctpHeader.getMessage(), message->getLength() - sizeof(sctpHeader));

if (api::DataChannelMessageParser::isPinnedEndpointsChanged(json))
{
Expand Down Expand Up @@ -573,7 +583,7 @@ void MixerManager::sctpReceived(EngineMixer& mixer, memory::UniquePacket msgPack
logger::debug("received unexpected DataChannel payload protocol, %u, len %zu",
"MixerManager",
sctpHeader.payloadProtocol,
msgPacket->getLength());
message->getLength());
}
}

Expand Down
10 changes: 4 additions & 6 deletions bridge/MixerManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "bridge/Stats.h"
#include "bridge/engine/EngineMixer.h"
#include "bridge/engine/EngineStats.h"
#include "bridge/engine/Engine.h"
#include "concurrency/MpmcQueue.h"
#include "memory/PacketPoolAllocator.h"
#include "utils/Pacer.h"
Expand All @@ -14,11 +15,6 @@
#include <unordered_map>
#include <vector>

namespace bridge
{
class Engine;
}

namespace utils
{
class IdGenerator;
Expand Down Expand Up @@ -122,7 +118,9 @@ class MixerManager : public MixerManagerAsync
void allocateVideoPacketCache(EngineMixer& mixer, uint32_t ssrc, size_t endpointIdHash) override;
void allocateRecordingRtpPacketCache(EngineMixer& mixer, uint32_t ssrc, size_t endpointIdHash) override;
void videoStreamRemoved(EngineMixer& engineMixer, const EngineVideoStream& videoStream) override;
void sctpReceived(EngineMixer& mixer, memory::UniquePacket msgPacket, size_t endpointIdHash) override;
void sctpReceived(EngineMixer& mixer,
memory::UniquePoolBuffer<memory::PacketPoolAllocator> message,
size_t endpointIdHash) override;
void dataStreamRemoved(EngineMixer& mixer, const EngineDataStream& dataStream) override;
void freeRecordingRtpPacketCache(EngineMixer& mixer, uint32_t ssrc, size_t endpointIdHash) override;
void barbellRemoved(EngineMixer& mixer, const EngineBarbell& barbell) override;
Expand Down
6 changes: 4 additions & 2 deletions bridge/MixerManagerAsync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@ bool MixerManagerAsync::asyncVideoStreamRemoved(EngineMixer& engineMixer, const
utils::bind(&MixerManagerAsync::videoStreamRemoved, this, std::ref(engineMixer), std::cref(videoStream)));
}

bool MixerManagerAsync::asyncSctpReceived(EngineMixer& mixer, memory::UniquePacket& msgPacket, size_t endpointIdHash)
bool MixerManagerAsync::asyncSctpReceived(EngineMixer& mixer,
memory::UniquePoolBuffer<memory::PacketPoolAllocator>& message,
size_t endpointIdHash)
{
return post(utils::bind(&MixerManagerAsync::sctpReceived,
this,
std::ref(mixer),
utils::moveParam(msgPacket),
utils::moveParam(message),
endpointIdHash));
}

Expand Down
9 changes: 7 additions & 2 deletions bridge/MixerManagerAsync.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once
#include "bridge/engine/EndpointId.h"
#include "memory/PacketPoolAllocator.h"
#include "memory/PoolBuffer.h"
#include "utils/Function.h"
#include <string>

Expand Down Expand Up @@ -44,7 +45,9 @@ class MixerManagerAsync
virtual void allocateVideoPacketCache(EngineMixer& mixer, uint32_t ssrc, size_t endpointIdHash) = 0;
virtual void allocateRecordingRtpPacketCache(EngineMixer& mixer, uint32_t ssrc, size_t endpointIdHash) = 0;
virtual void videoStreamRemoved(EngineMixer& engineMixer, const EngineVideoStream& videoStream) = 0;
virtual void sctpReceived(EngineMixer& mixer, memory::UniquePacket msgPacket, size_t endpointIdHash) = 0;
virtual void sctpReceived(EngineMixer& mixer,
memory::UniquePoolBuffer<memory::PacketPoolAllocator> message,
size_t endpointIdHash) = 0;
virtual void dataStreamRemoved(EngineMixer& mixer, const EngineDataStream& dataStream) = 0;
virtual void freeRecordingRtpPacketCache(EngineMixer& mixer, uint32_t ssrc, size_t endpointIdHash) = 0;
virtual void barbellRemoved(EngineMixer& mixer, const EngineBarbell& barbell) = 0;
Expand All @@ -61,7 +64,9 @@ class MixerManagerAsync
bool asyncAllocateVideoPacketCache(EngineMixer& mixer, uint32_t ssrc, size_t endpointIdHash);
bool asyncAllocateRecordingRtpPacketCache(EngineMixer& mixer, uint32_t ssrc, size_t endpointIdHash);
bool asyncVideoStreamRemoved(EngineMixer& engineMixer, const EngineVideoStream& videoStream);
bool asyncSctpReceived(EngineMixer& mixer, memory::UniquePacket& msgPacket, size_t endpointIdHash);
bool asyncSctpReceived(EngineMixer& mixer,
memory::UniquePoolBuffer<memory::PacketPoolAllocator>& msgBuffer,
size_t endpointIdHash);
bool asyncDataStreamRemoved(EngineMixer& mixer, const EngineDataStream& dataStream);
bool asyncFreeRecordingRtpPacketCache(EngineMixer& mixer, uint32_t ssrc, size_t endpointIdHash);
bool asyncBarbellRemoved(EngineMixer& mixer, const EngineBarbell& barbell);
Expand Down
1 change: 1 addition & 0 deletions bridge/endpointActions/BarbellActions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ httpd::Response generateBarbellResponse(ActionContext* context,

api::Data responseData;
responseData.port = 5000;
responseData.maxMessageSize = 2048;
channelsDescription.data = responseData;

const auto responseBody = api::Generator::generateAllocateBarbellResponse(channelsDescription);
Expand Down
1 change: 1 addition & 0 deletions bridge/endpointActions/ConferenceActions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ httpd::Response generateAllocateEndpointResponse(ActionContext* context,
}

responseData.port = streamDescription.sctpPort.isSet() ? streamDescription.sctpPort.get() : 5000;
responseData.maxMessageSize = mixer.getConfig().sctp.maxMessageSize;
channelsDescription.data.set(responseData);
}

Expand Down
11 changes: 6 additions & 5 deletions bridge/engine/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@ class Engine
public:
Engine(jobmanager::JobManager& backgroundJobQueue);
Engine(jobmanager::JobManager& backgroundJobQueue, std::thread&& externalThread);
virtual ~Engine() = default;

void setMessageListener(MixerManagerAsync* messageListener);
virtual void setMessageListener(MixerManagerAsync* messageListener);
void stop();
void run();

bool post(utils::Function&& task) { return _tasks.push(std::move(task)); }
virtual bool post(utils::Function&& task) { return _tasks.push(std::move(task)); }

concurrency::SynchronizationContext getSynchronizationContext()
virtual concurrency::SynchronizationContext getSynchronizationContext()
{
return concurrency::SynchronizationContext(_tasks);
}
Expand All @@ -62,8 +63,8 @@ class Engine
void updateStats(uint64_t& statsPollTime, EngineStats::EngineStats& currentStatSample, uint64_t timestamp);

public:
bool asyncAddMixer(EngineMixer* engineMixer);
bool asyncRemoveMixer(EngineMixer* engineMixer);
virtual bool asyncAddMixer(EngineMixer* engineMixer);
virtual bool asyncRemoveMixer(EngineMixer* engineMixer);

private:
void addMixer(EngineMixer* engineMixer);
Expand Down
Loading
Loading