From 4d620ca8e62a33783c2e9748352a108a67518d25 Mon Sep 17 00:00:00 2001 From: Kostya Vasilyev Date: Thu, 5 Jun 2025 11:55:04 -0700 Subject: [PATCH 1/6] Consider first packet when reading Simulcast IDs The code currently ignores the first packet when reading Simulcast IDs from a new SSRC, and probes only subsequent packets. This commit makes it so that we consider the first packet as well (which we already have read). Helps if the publisher only sends Simulcast IDs on the first packet. --- peerconnection.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/peerconnection.go b/peerconnection.go index db4a4a80d8a..6233f281e14 100644 --- a/peerconnection.go +++ b/peerconnection.go @@ -1784,7 +1784,20 @@ func (pc *PeerConnection) handleIncomingSSRC(rtpStream io.Reader, ssrc SSRC) err return err } + // try to read simulcast IDs from the packet we already have var mid, rid, rsid string + if _, _, err = handleUnknownRTPPacket( + b[:i], uint8(midExtensionID), //nolint:gosec // G115 + uint8(streamIDExtensionID), //nolint:gosec // G115 + uint8(repairStreamIDExtensionID), //nolint:gosec // G115 + &mid, + &rid, + &rsid, + ); err != nil { + return err + } + + // if the first packet didn't contain simuilcast IDs, then probe more packets var paddingOnly bool for readCount := 0; readCount <= simulcastProbeCount; readCount++ { if mid == "" || (rid == "" && rsid == "") { From 6770cfa2df8b9ae557d64485bc4bdc840d38bd59 Mon Sep 17 00:00:00 2001 From: Kostya Vasilyev Date: Tue, 24 Jun 2025 18:08:25 -0700 Subject: [PATCH 2/6] Add a unit test --- peerconnection_media_test.go | 100 +++++++++++++++++++++++++++++++++++ 1 file changed, 100 insertions(+) diff --git a/peerconnection_media_test.go b/peerconnection_media_test.go index 2219c2900d1..fc485fd04b4 100644 --- a/peerconnection_media_test.go +++ b/peerconnection_media_test.go @@ -1131,6 +1131,106 @@ func TestPeerConnection_Simulcast_Probe(t *testing.T) { //nolint:cyclop close(testFinished) }) + // Assert that we can send just one packet with Simulcast IDs (using extensions) and they will be properly received + t.Run("ExtractIDs", func(t *testing.T) { + offerer, answerer, err := newPair() + assert.NoError(t, err) + + rids := []string{"layer_1", "layer_2", "layer_3"} + ridSelected := rids[0] + + vp8WriterA, err := NewTrackLocalStaticRTP( + RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion1", WithRTPStreamID(rids[0]), + ) + assert.NoError(t, err) + + vp8WriterB, err := NewTrackLocalStaticRTP( + RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion1", WithRTPStreamID(rids[1]), + ) + assert.NoError(t, err) + + vp8WriterC, err := NewTrackLocalStaticRTP( + RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion1", WithRTPStreamID(rids[2]), + ) + assert.NoError(t, err) + + sender, err := offerer.AddTrack(vp8WriterA) + assert.NoError(t, err) + assert.NotNil(t, sender) + + assert.NoError(t, sender.AddEncoding(vp8WriterB)) + assert.NoError(t, sender.AddEncoding(vp8WriterC)) + + assert.NoError(t, signalPair(offerer, answerer)) + + answerer.OnTrack(func(remote *TrackRemote, receiver *RTPReceiver) { + assert.Equal(t, remote.rid, ridSelected) + }) + + peerConnectionConnected := untilConnectionState(PeerConnectionStateConnected, offerer, answerer) + peerConnectionConnected.Wait() + + ticker := time.NewTicker(time.Millisecond * 20) + defer ticker.Stop() + testFinished := make(chan struct{}) + seenOneStream, seenOneStreamCancel := context.WithCancel(context.Background()) + + go func() { + sentOnePacket := false + + senderTrack := vp8WriterA + + for { + select { + case <-testFinished: + return + case <-ticker.C: + answerer.dtlsTransport.lock.Lock() + if len(answerer.dtlsTransport.simulcastStreams) >= 1 { + seenOneStreamCancel() + } + answerer.dtlsTransport.lock.Unlock() + + senderTrack.mu.Lock() + + // We send just one packet with the RID, that's the point of this test + if !sentOnePacket && len(senderTrack.bindings) > 0 { + sentOnePacket = true + + midExtensionID, _, _ := answerer.api.mediaEngine.getHeaderExtensionID( + RTPHeaderExtensionCapability{sdp.SDESMidURI}, + ) + assert.Greater(t, midExtensionID, 0) + + streamIDExtensionID, _, _ := answerer.api.mediaEngine.getHeaderExtensionID( + RTPHeaderExtensionCapability{sdp.SDESRTPStreamIDURI}, + ) + assert.Greater(t, streamIDExtensionID, 0) + + header := &rtp.Header{ + Version: 2, + SSRC: util.RandUint32(), + } + header.Extension = true + header.ExtensionProfile = 0x1000 + assert.NoError(t, header.SetExtension(uint8(midExtensionID), []byte("0"))) + assert.NoError(t, header.SetExtension(uint8(streamIDExtensionID), []byte(ridSelected))) + + _, err = senderTrack.bindings[0].writeStream.WriteRTP(header, []byte{0, 1, 2, 3, 4, 5}) + assert.NoError(t, err) + } + + senderTrack.mu.Unlock() + } + } + }() + + <-seenOneStream.Done() + + closePairNow(t, offerer, answerer) + close(testFinished) + }) + // Assert that NonSimulcast Traffic isn't incorrectly broken by the probe t.Run("Break NonSimulcast", func(t *testing.T) { unhandledSimulcastError := make(chan struct{}) From bf06fe40f64116ad2a9bd2a2b9a1467c55d7414d Mon Sep 17 00:00:00 2001 From: Kostya Vasilyev Date: Wed, 25 Jun 2025 10:02:41 -0700 Subject: [PATCH 3/6] Validate that ontrack has been called --- peerconnection_media_test.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/peerconnection_media_test.go b/peerconnection_media_test.go index fc485fd04b4..99ae6e33db3 100644 --- a/peerconnection_media_test.go +++ b/peerconnection_media_test.go @@ -1139,6 +1139,12 @@ func TestPeerConnection_Simulcast_Probe(t *testing.T) { //nolint:cyclop rids := []string{"layer_1", "layer_2", "layer_3"} ridSelected := rids[0] + onTrackCalled := atomicBool{} + answerer.OnTrack(func(remote *TrackRemote, receiver *RTPReceiver) { + assert.Equal(t, remote.rid, ridSelected) + onTrackCalled.set(true) + }) + vp8WriterA, err := NewTrackLocalStaticRTP( RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion1", WithRTPStreamID(rids[0]), ) @@ -1163,10 +1169,6 @@ func TestPeerConnection_Simulcast_Probe(t *testing.T) { //nolint:cyclop assert.NoError(t, signalPair(offerer, answerer)) - answerer.OnTrack(func(remote *TrackRemote, receiver *RTPReceiver) { - assert.Equal(t, remote.rid, ridSelected) - }) - peerConnectionConnected := untilConnectionState(PeerConnectionStateConnected, offerer, answerer) peerConnectionConnected.Wait() @@ -1227,6 +1229,8 @@ func TestPeerConnection_Simulcast_Probe(t *testing.T) { //nolint:cyclop <-seenOneStream.Done() + assert.Equal(t, true, onTrackCalled.get()) + closePairNow(t, offerer, answerer) close(testFinished) }) From 95a37ee6ec5a5c879f8d8ec846e2fb0021edb91b Mon Sep 17 00:00:00 2001 From: Kostya Vasilyev Date: Wed, 25 Jun 2025 10:03:02 -0700 Subject: [PATCH 4/6] Remove unused parameter to fix lint warning --- peerconnection.go | 4 ++-- rtptransceiver.go | 9 ++++----- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/peerconnection.go b/peerconnection.go index 6233f281e14..40637c38945 100644 --- a/peerconnection.go +++ b/peerconnection.go @@ -1786,7 +1786,7 @@ func (pc *PeerConnection) handleIncomingSSRC(rtpStream io.Reader, ssrc SSRC) err // try to read simulcast IDs from the packet we already have var mid, rid, rsid string - if _, _, err = handleUnknownRTPPacket( + if _, err = handleUnknownRTPPacket( b[:i], uint8(midExtensionID), //nolint:gosec // G115 uint8(streamIDExtensionID), //nolint:gosec // G115 uint8(repairStreamIDExtensionID), //nolint:gosec // G115 @@ -1811,7 +1811,7 @@ func (pc *PeerConnection) handleIncomingSSRC(rtpStream io.Reader, ssrc SSRC) err return err } - if _, paddingOnly, err = handleUnknownRTPPacket( + if paddingOnly, err = handleUnknownRTPPacket( b[:i], uint8(midExtensionID), //nolint:gosec // G115 uint8(streamIDExtensionID), //nolint:gosec // G115 uint8(repairStreamIDExtensionID), //nolint:gosec // G115 diff --git a/rtptransceiver.go b/rtptransceiver.go index 5ce0f2d4819..f53a16b4fc3 100644 --- a/rtptransceiver.go +++ b/rtptransceiver.go @@ -291,10 +291,10 @@ func handleUnknownRTPPacket( streamIDExtensionID, repairStreamIDExtensionID uint8, mid, rid, rsid *string, -) (payloadType PayloadType, paddingOnly bool, err error) { +) (paddingOnly bool, err error) { rp := &rtp.Packet{} if err = rp.Unmarshal(buf); err != nil { - return 0, false, err + return false, err } if rp.Padding && len(rp.Payload) == 0 { @@ -302,10 +302,9 @@ func handleUnknownRTPPacket( } if !rp.Header.Extension { - return payloadType, paddingOnly, nil + return paddingOnly, nil } - payloadType = PayloadType(rp.PayloadType) if payload := rp.GetExtension(midExtensionID); payload != nil { *mid = string(payload) } @@ -318,5 +317,5 @@ func handleUnknownRTPPacket( *rsid = string(payload) } - return payloadType, paddingOnly, nil + return paddingOnly, nil } From be4f9952734d2df38706170e995c1e474da30190 Mon Sep 17 00:00:00 2001 From: Kostya Vasilyev Date: Tue, 8 Jul 2025 09:37:56 -0700 Subject: [PATCH 5/6] Switch to atomic.bool --- peerconnection_media_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/peerconnection_media_test.go b/peerconnection_media_test.go index be839044b62..5819ad2ba71 100644 --- a/peerconnection_media_test.go +++ b/peerconnection_media_test.go @@ -1140,10 +1140,10 @@ func TestPeerConnection_Simulcast_Probe(t *testing.T) { //nolint:cyclop rids := []string{"layer_1", "layer_2", "layer_3"} ridSelected := rids[0] - onTrackCalled := atomicBool{} + onTrackCalled := &atomic.Bool{} answerer.OnTrack(func(remote *TrackRemote, receiver *RTPReceiver) { assert.Equal(t, remote.rid, ridSelected) - onTrackCalled.set(true) + onTrackCalled.Store(true) }) vp8WriterA, err := NewTrackLocalStaticRTP( @@ -1230,7 +1230,7 @@ func TestPeerConnection_Simulcast_Probe(t *testing.T) { //nolint:cyclop <-seenOneStream.Done() - assert.Equal(t, true, onTrackCalled.get()) + assert.Equal(t, true, onTrackCalled.Load()) closePairNow(t, offerer, answerer) close(testFinished) From 94f92092ef1ca5df674265200ff1e8efc24f9585 Mon Sep 17 00:00:00 2001 From: Kostya Vasilyev Date: Tue, 8 Jul 2025 18:54:38 -0700 Subject: [PATCH 6/6] Fix lint errors --- peerconnection_media_test.go | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/peerconnection_media_test.go b/peerconnection_media_test.go index 5819ad2ba71..f6e02f945cf 100644 --- a/peerconnection_media_test.go +++ b/peerconnection_media_test.go @@ -1173,6 +1173,20 @@ func TestPeerConnection_Simulcast_Probe(t *testing.T) { //nolint:cyclop peerConnectionConnected := untilConnectionState(PeerConnectionStateConnected, offerer, answerer) peerConnectionConnected.Wait() + parameters := sender.GetParameters() + + var midID, ridID uint8 + for _, extension := range parameters.HeaderExtensions { + switch extension.URI { + case sdp.SDESMidURI: + midID = uint8(extension.ID) //nolint:gosec // G115 + case sdp.SDESRTPStreamIDURI: + ridID = uint8(extension.ID) //nolint:gosec // G115 + } + } + assert.NotZero(t, midID) + assert.NotZero(t, ridID) + ticker := time.NewTicker(time.Millisecond * 20) defer ticker.Stop() testFinished := make(chan struct{}) @@ -1200,24 +1214,14 @@ func TestPeerConnection_Simulcast_Probe(t *testing.T) { //nolint:cyclop if !sentOnePacket && len(senderTrack.bindings) > 0 { sentOnePacket = true - midExtensionID, _, _ := answerer.api.mediaEngine.getHeaderExtensionID( - RTPHeaderExtensionCapability{sdp.SDESMidURI}, - ) - assert.Greater(t, midExtensionID, 0) - - streamIDExtensionID, _, _ := answerer.api.mediaEngine.getHeaderExtensionID( - RTPHeaderExtensionCapability{sdp.SDESRTPStreamIDURI}, - ) - assert.Greater(t, streamIDExtensionID, 0) - header := &rtp.Header{ Version: 2, SSRC: util.RandUint32(), } header.Extension = true header.ExtensionProfile = 0x1000 - assert.NoError(t, header.SetExtension(uint8(midExtensionID), []byte("0"))) - assert.NoError(t, header.SetExtension(uint8(streamIDExtensionID), []byte(ridSelected))) + assert.NoError(t, header.SetExtension(midID, []byte("0"))) + assert.NoError(t, header.SetExtension(ridID, []byte(ridSelected))) _, err = senderTrack.bindings[0].writeStream.WriteRTP(header, []byte{0, 1, 2, 3, 4, 5}) assert.NoError(t, err)