Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ const (

sdpAttributeSimulcast = "simulcast"

ssrcGroupSimulcast = "SIM"

outboundMTU = 1200

rtpPayloadTypeBitmask = 0x7F

incomingUnhandledRTPSsrc = "Incoming unhandled RTP ssrc(%d), OnTrack will not be fired. %v"

useReadSimulcast = "Use ReadSimulcast(rid) instead of Read() when multiple tracks are present"
useReadSimulcast = "Use ReadSimulcast(rid)/ReadSimulcastSSRC(ssrc) instead of Read() when multiple tracks are present"

generatedCertificateOrigin = "WebRTC"

Expand Down
1 change: 1 addition & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ var (
errRTPReceiverReceiveAlreadyCalled = errors.New("Receive has already been called")
errRTPReceiverWithSSRCTrackStreamNotFound = errors.New("unable to find stream for Track with SSRC")
errRTPReceiverForRIDTrackStreamNotFound = errors.New("no trackStreams found for RID")
errRTPReceiverForSSRCTrackStreamNotFound = errors.New("no trackStreams found for SSRC")

errRTPSenderTrackNil = errors.New("Track must not be nil")
errRTPSenderDTLSTransportNil = errors.New("DTLSTransport must not be nil")
Expand Down
8 changes: 6 additions & 2 deletions peerconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -1784,10 +1784,14 @@ func (pc *PeerConnection) handleIncomingSSRC(rtpStream *srtp.ReadStreamSRTP, ssr

// If a SSRC already exists in the RemoteDescription don't perform heuristics upon it
for _, track := range trackDetailsFromSDP(pc.log, remoteDescription.parsed) {
if track.rtxSsrc != nil && ssrc == *track.rtxSsrc {
if slices.ContainsFunc(track.rtxSsrc, func(s *SSRC) bool {
return s != nil && ssrc == *s
}) {
return nil
}
if track.fecSsrc != nil && ssrc == *track.fecSsrc {
if slices.ContainsFunc(track.fecSsrc, func(s *SSRC) bool {
return s != nil && ssrc == *s
}) {
return nil
}
if slices.Contains(track.ssrcs, ssrc) {
Expand Down
40 changes: 40 additions & 0 deletions rtpreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,33 @@ func (r *RTPReceiver) ReadSimulcast(b []byte, rid string) (n int, a interceptor.
}
}

// ReadSimulcastSSRC reads incoming RTCP for this RTPReceiver for given SSRC.
func (r *RTPReceiver) ReadSimulcastSSRC(b []byte, ssrc SSRC) (n int, a interceptor.Attributes, err error) {
select {
case <-r.received:
var rtcpInterceptor interceptor.RTCPReader

r.mu.Lock()
for _, t := range r.tracks {
if t.track != nil && t.track.ssrc == ssrc {
rtcpInterceptor = t.rtcpInterceptor

break
}
}
r.mu.Unlock()

if rtcpInterceptor == nil {
return 0, nil, fmt.Errorf("%w: %d", errRTPReceiverForSSRCTrackStreamNotFound, ssrc)
}

return rtcpInterceptor.Read(b, a)

case <-r.closedChan:
return 0, nil, io.ErrClosedPipe
}
}

// ReadRTCP is a convenience method that wraps Read and unmarshal for you.
// It also runs any configured interceptors.
func (r *RTPReceiver) ReadRTCP() ([]rtcp.Packet, interceptor.Attributes, error) {
Expand Down Expand Up @@ -356,6 +383,19 @@ func (r *RTPReceiver) ReadSimulcastRTCP(rid string) ([]rtcp.Packet, interceptor.
return pkts, attributes, err
}

// ReadSimulcastSSRCRTCP is a convenience method that wraps ReadSimulcastSSRC and unmarshal for you.
func (r *RTPReceiver) ReadSimulcastSSRCRTCP(ssrc SSRC) ([]rtcp.Packet, interceptor.Attributes, error) {
b := make([]byte, r.api.settingEngine.getReceiveMTU())
i, attributes, err := r.ReadSimulcastSSRC(b, ssrc)
if err != nil {
return nil, nil, err
}

pkts, err := rtcp.Unmarshal(b[:i])

return pkts, attributes, err
}

func (r *RTPReceiver) haveReceived() bool {
select {
case <-r.received:
Expand Down
73 changes: 63 additions & 10 deletions sdp.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
streamID string
id string
ssrcs []SSRC
rtxSsrc *SSRC
fecSsrc *SSRC
rtxSsrc []*SSRC
fecSsrc []*SSRC
rids []string
}

Expand Down Expand Up @@ -76,7 +76,7 @@
// extract all trackDetails from an SDP.
//
//nolint:gocognit,gocyclo,cyclop
func trackDetailsFromSDP(

Check failure on line 79 in sdp.go

View workflow job for this annotation

GitHub Actions / lint / Go

Function name: trackDetailsFromSDP, Cyclomatic Complexity: 49, Halstead Volume: 5316.54, Maintainability Index: 16 (maintidx)
log logging.LeveledLogger,
s *sdp.SessionDescription,
) (incomingTracks []trackDetails) {
Expand Down Expand Up @@ -136,7 +136,7 @@
for i := range tracksInMediaSection {
if tracksInMediaSection[i].ssrcs[0] == SSRC(baseSsrc) {
repairSsrc := SSRC(rtxRepairFlow)
tracksInMediaSection[i].rtxSsrc = &repairSsrc
tracksInMediaSection[i].rtxSsrc = []*SSRC{&repairSsrc}
}
}
}
Expand Down Expand Up @@ -164,7 +164,7 @@
for i := range tracksInMediaSection {
if tracksInMediaSection[i].ssrcs[0] == SSRC(baseSsrc) {
repairSsrc := SSRC(fecRepairFlow)
tracksInMediaSection[i].fecSsrc = &repairSsrc
tracksInMediaSection[i].fecSsrc = []*SSRC{&repairSsrc}
}
}
}
Expand Down Expand Up @@ -221,13 +221,13 @@
for r, baseSsrc := range rtxRepairFlows {
if baseSsrc == ssrc {
repairSsrc := SSRC(r) //nolint:gosec // G115
trackDetails.rtxSsrc = &repairSsrc
trackDetails.rtxSsrc = []*SSRC{&repairSsrc}
}
}
for r, baseSsrc := range fecRepairFlows {
if baseSsrc == ssrc {
fecSsrc := SSRC(r) //nolint:gosec // G115
trackDetails.fecSsrc = &fecSsrc
trackDetails.fecSsrc = []*SSRC{&fecSsrc}
}
}

Expand All @@ -237,7 +237,7 @@
}
}

if rids := getRids(media); len(rids) != 0 && trackID != "" && streamID != "" {

Check failure on line 240 in sdp.go

View workflow job for this annotation

GitHub Actions / lint / Go

`if len(rids) != 0 && trackID != "" && streamID != ""` has complex nested blocks (complexity: 7) (nestif)
simulcastTrack := trackDetails{
mid: midValue,
kind: codecType,
Expand All @@ -249,6 +249,36 @@
simulcastTrack.rids = append(simulcastTrack.rids, rid.id)
}

tracksInMediaSection = []trackDetails{simulcastTrack}
} else if ssrcs := getSimulcastSSRCs(log, media); len(ssrcs) != 0 && trackID != "" && streamID != "" {
simulcastTrack := trackDetails{
mid: midValue,
kind: codecType,
streamID: streamID,
id: trackID,
ssrcs: ssrcs,
}
if len(rtxRepairFlows) > 0 {
simulcastTrack.rtxSsrc = make([]*SSRC, len(ssrcs))
for rtx, base := range rtxRepairFlows {
baseSsrc := SSRC(base) //nolint:gosec // G115
if pos := slices.Index(ssrcs, baseSsrc); pos != -1 {
repairSsrc := SSRC(rtx) //nolint:gosec // G115
simulcastTrack.rtxSsrc[pos] = &repairSsrc
}
}
}
if len(fecRepairFlows) > 0 {
simulcastTrack.fecSsrc = make([]*SSRC, len(ssrcs))
for fec, base := range fecRepairFlows {
baseSsrc := SSRC(base) //nolint:gosec // G115
if pos := slices.Index(ssrcs, baseSsrc); pos != -1 {
fecSsrc := SSRC(fec) //nolint:gosec // G115
simulcastTrack.fecSsrc[pos] = &fecSsrc
}
}
}

tracksInMediaSection = []trackDetails{simulcastTrack}
}

Expand All @@ -270,12 +300,12 @@
encodings[i].SSRC = trackDetails.ssrcs[i]
}

if trackDetails.rtxSsrc != nil {
encodings[i].RTX.SSRC = *trackDetails.rtxSsrc
if len(trackDetails.rtxSsrc) > i && trackDetails.rtxSsrc[i] != nil {
encodings[i].RTX.SSRC = *trackDetails.rtxSsrc[i]
}

if trackDetails.fecSsrc != nil {
encodings[i].FEC.SSRC = *trackDetails.fecSsrc
if len(trackDetails.fecSsrc) > i && trackDetails.fecSsrc[i] != nil {
encodings[i].FEC.SSRC = *trackDetails.fecSsrc[i]
}
}

Expand Down Expand Up @@ -316,6 +346,29 @@
return rids
}

func getSimulcastSSRCs(log logging.LeveledLogger, media *sdp.MediaDescription) []SSRC {
var ssrcs []SSRC
for _, attr := range media.Attributes {
if attr.Key == sdp.AttrKeySSRCGroup {
split := strings.Split(attr.Value, " ")
if split[0] == ssrcGroupSimulcast {
for _, v := range split[1:] {
ssrc, err := strconv.ParseUint(v, 10, 32)
if err != nil {
log.Warnf("Failed to parse SSRC: %v", err)

continue
}

ssrcs = append(ssrcs, SSRC(ssrc))
}
}
}
}

return ssrcs
}

func addCandidatesToMediaDescriptions(
candidates []ICECandidate,
mediaDescr *sdp.MediaDescription,
Expand Down
105 changes: 99 additions & 6 deletions sdp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"strings"
"testing"

"github.com/pion/logging"
"github.com/pion/sdp/v3"
"github.com/pion/transport/v4/test"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -473,7 +474,7 @@
})
}

func TestTrackDetailsFromSDP(t *testing.T) {

Check failure on line 477 in sdp_test.go

View workflow job for this annotation

GitHub Actions / lint / Go

Function name: TestTrackDetailsFromSDP, Cyclomatic Complexity: 8, Halstead Volume: 9198.00, Maintainability Index: 17 (maintidx)
t.Run("Tracks unknown, audio and video with RTX", func(t *testing.T) {
descr := &sdp.SessionDescription{
MediaDescriptions: []*sdp.MediaDescription{
Expand Down Expand Up @@ -590,10 +591,12 @@
assert.Equal(t, RTPCodecTypeVideo, track.kind)
assert.Equal(t, SSRC(3000), track.ssrcs[0])
assert.Equal(t, "video_trk_label", track.streamID)
require.NotNil(t, track.rtxSsrc, "missing RTX ssrc for video track")
assert.Equal(t, SSRC(4000), *track.rtxSsrc)
require.NotNil(t, track.fecSsrc, "missing FEC ssrc for video track")
assert.Equal(t, SSRC(5000), *track.fecSsrc)
require.Len(t, track.rtxSsrc, 1)
require.NotNil(t, track.rtxSsrc[0], "missing RTX ssrc for video track")
assert.Equal(t, SSRC(4000), *track.rtxSsrc[0])
require.Len(t, track.fecSsrc, 1)
require.NotNil(t, track.fecSsrc[0], "missing FEC ssrc for video track")
assert.Equal(t, SSRC(5000), *track.fecSsrc[0])
})

t.Run("inactive and recvonly tracks ignored", func(t *testing.T) {
Expand Down Expand Up @@ -654,8 +657,98 @@

tracks := trackDetailsFromSDP(nil, descr)
assert.Equal(t, 2, len(tracks))
assert.Equal(t, SSRC(4000), *tracks[0].rtxSsrc)
assert.Equal(t, SSRC(6000), *tracks[1].rtxSsrc)
require.Len(t, tracks[0].rtxSsrc, 1)
assert.Equal(t, SSRC(4000), *tracks[0].rtxSsrc[0])
require.Len(t, tracks[1].rtxSsrc, 1)
assert.Equal(t, SSRC(6000), *tracks[1].rtxSsrc[0])
})

t.Run("simulcast ssrc-group", func(t *testing.T) {
log := logging.NewDefaultLoggerFactory().NewLogger("test")
descr := &sdp.SessionDescription{
MediaDescriptions: []*sdp.MediaDescription{
{
MediaName: sdp.MediaName{
Media: "video",
},
Attributes: []sdp.Attribute{
{Key: "mid", Value: "0"},
{Key: "sendrecv"},
{Key: "ssrc", Value: "3000 msid:video_trk_label video_trk_guid"},
{Key: "ssrc", Value: "4000 msid:video_trk_label video_trk_guid"},
{Key: "ssrc", Value: "5000 msid:video_trk_label video_trk_guid"},
{Key: "ssrc-group", Value: "SIM 3000 4000 5000"},
},
},
{
MediaName: sdp.MediaName{
Media: "video",
},
Attributes: []sdp.Attribute{
{Key: "mid", Value: "0"},
{Key: "sendrecv"},
{Key: "ssrc", Value: "3000 msid:video_trk_label video_trk_guid"},
{Key: "ssrc", Value: "4000 msid:video_trk_label video_trk_guid"},
{Key: "ssrc", Value: "5000 msid:video_trk_label video_trk_guid"},
{Key: "ssrc-group", Value: "SIM 3000 invalid 5000"},
},
},
{
MediaName: sdp.MediaName{
Media: "video",
},
Attributes: []sdp.Attribute{
{Key: "mid", Value: "0"},
{Key: "sendrecv"},
{Key: "ssrc", Value: "3000 msid:video_trk_label video_trk_guid"},
{Key: "ssrc", Value: "3100 msid:video_trk_label video_trk_guid"},
{Key: "ssrc", Value: "4000 msid:video_trk_label video_trk_guid"},
{Key: "ssrc", Value: "4100 msid:video_trk_label video_trk_guid"},
{Key: "ssrc", Value: "5000 msid:video_trk_label video_trk_guid"},
{Key: "ssrc", Value: "5100 msid:video_trk_label video_trk_guid"},
{Key: "ssrc-group", Value: "SIM 3000 4000 5000"},
{Key: "ssrc-group", Value: "FID 3000 3100"},
{Key: "ssrc-group", Value: "FID 4000 4100"},
{Key: "ssrc-group", Value: "FID 5000 5100"},
},
},
{
MediaName: sdp.MediaName{
Media: "video",
},
Attributes: []sdp.Attribute{
{Key: "mid", Value: "0"},
{Key: "sendrecv"},
{Key: "ssrc", Value: "3000 msid:video_trk_label video_trk_guid"},
{Key: "ssrc", Value: "3100 msid:video_trk_label video_trk_guid"},
{Key: "ssrc", Value: "4000 msid:video_trk_label video_trk_guid"},
{Key: "ssrc", Value: "4100 msid:video_trk_label video_trk_guid"},
{Key: "ssrc", Value: "5000 msid:video_trk_label video_trk_guid"},
{Key: "ssrc", Value: "5100 msid:video_trk_label video_trk_guid"},
{Key: "ssrc-group", Value: "SIM 3000 4000 5000"},
{Key: "ssrc-group", Value: "FEC-FR 3000 3100"},
{Key: "ssrc-group", Value: "FEC-FR 4000 4100"},
{Key: "ssrc-group", Value: "FEC-FR 5000 5100"},
},
},
},
}

tracks := trackDetailsFromSDP(log, descr)
assert.Equal(t, 4, len(tracks))
assert.Equal(t, []SSRC{3000, 4000, 5000}, tracks[0].ssrcs)
assert.Equal(t, []SSRC{3000, 5000}, tracks[1].ssrcs)
assert.Equal(t, []SSRC{3000, 4000, 5000}, tracks[2].ssrcs)
if assert.Len(t, tracks[2].rtxSsrc, 3) {
assert.Equal(t, SSRC(3100), *tracks[2].rtxSsrc[0])
assert.Equal(t, SSRC(4100), *tracks[2].rtxSsrc[1])
assert.Equal(t, SSRC(5100), *tracks[2].rtxSsrc[2])
}
if assert.Len(t, tracks[3].fecSsrc, 3) {
assert.Equal(t, SSRC(3100), *tracks[3].fecSsrc[0])
assert.Equal(t, SSRC(4100), *tracks[3].fecSsrc[1])
assert.Equal(t, SSRC(5100), *tracks[3].fecSsrc[2])
}
})
}

Expand Down
Loading