diff --git a/constants.go b/constants.go index caf24c122c7..e7a78606e3d 100644 --- a/constants.go +++ b/constants.go @@ -38,13 +38,15 @@ const ( sdpAttributeSimulcast = "simulcast" + ssrcGroupSimulcast = "SIM" + outboundMTU = 1200 rtpPayloadTypeBitmask = 0x7F incomingUnhandledRTPSsrc = "Incoming unhandled RTP ssrc(%d), OnTrack will not be fired. %v" - useReadSimulcast = "Use ReadSimulcast(rid) instead of Read() when multiple tracks are present" + useReadSimulcast = "Use ReadSimulcast(rid)/ReadSimulcastSSRC(ssrc) instead of Read() when multiple tracks are present" generatedCertificateOrigin = "WebRTC" diff --git a/errors.go b/errors.go index b023d161a22..57ce99bb953 100644 --- a/errors.go +++ b/errors.go @@ -244,6 +244,7 @@ var ( errRTPReceiverReceiveAlreadyCalled = errors.New("Receive has already been called") errRTPReceiverWithSSRCTrackStreamNotFound = errors.New("unable to find stream for Track with SSRC") errRTPReceiverForRIDTrackStreamNotFound = errors.New("no trackStreams found for RID") + errRTPReceiverForSSRCTrackStreamNotFound = errors.New("no trackStreams found for SSRC") errRTPSenderTrackNil = errors.New("Track must not be nil") errRTPSenderDTLSTransportNil = errors.New("DTLSTransport must not be nil") diff --git a/peerconnection.go b/peerconnection.go index acb5b5bbb37..bb965e222bf 100644 --- a/peerconnection.go +++ b/peerconnection.go @@ -1784,10 +1784,14 @@ func (pc *PeerConnection) handleIncomingSSRC(rtpStream *srtp.ReadStreamSRTP, ssr // If a SSRC already exists in the RemoteDescription don't perform heuristics upon it for _, track := range trackDetailsFromSDP(pc.log, remoteDescription.parsed) { - if track.rtxSsrc != nil && ssrc == *track.rtxSsrc { + if slices.ContainsFunc(track.rtxSsrc, func(s *SSRC) bool { + return s != nil && ssrc == *s + }) { return nil } - if track.fecSsrc != nil && ssrc == *track.fecSsrc { + if slices.ContainsFunc(track.fecSsrc, func(s *SSRC) bool { + return s != nil && ssrc == *s + }) { return nil } if slices.Contains(track.ssrcs, ssrc) { diff --git a/rtpreceiver.go b/rtpreceiver.go index 284e65e468c..6058a1bbfb4 100644 --- a/rtpreceiver.go +++ b/rtpreceiver.go @@ -326,6 +326,33 @@ func (r *RTPReceiver) ReadSimulcast(b []byte, rid string) (n int, a interceptor. } } +// ReadSimulcastSSRC reads incoming RTCP for this RTPReceiver for given SSRC. +func (r *RTPReceiver) ReadSimulcastSSRC(b []byte, ssrc SSRC) (n int, a interceptor.Attributes, err error) { + select { + case <-r.received: + var rtcpInterceptor interceptor.RTCPReader + + r.mu.Lock() + for _, t := range r.tracks { + if t.track != nil && t.track.ssrc == ssrc { + rtcpInterceptor = t.rtcpInterceptor + + break + } + } + r.mu.Unlock() + + if rtcpInterceptor == nil { + return 0, nil, fmt.Errorf("%w: %d", errRTPReceiverForSSRCTrackStreamNotFound, ssrc) + } + + return rtcpInterceptor.Read(b, a) + + case <-r.closedChan: + return 0, nil, io.ErrClosedPipe + } +} + // ReadRTCP is a convenience method that wraps Read and unmarshal for you. // It also runs any configured interceptors. func (r *RTPReceiver) ReadRTCP() ([]rtcp.Packet, interceptor.Attributes, error) { @@ -356,6 +383,19 @@ func (r *RTPReceiver) ReadSimulcastRTCP(rid string) ([]rtcp.Packet, interceptor. return pkts, attributes, err } +// ReadSimulcastSSRCRTCP is a convenience method that wraps ReadSimulcastSSRC and unmarshal for you. +func (r *RTPReceiver) ReadSimulcastSSRCRTCP(ssrc SSRC) ([]rtcp.Packet, interceptor.Attributes, error) { + b := make([]byte, r.api.settingEngine.getReceiveMTU()) + i, attributes, err := r.ReadSimulcastSSRC(b, ssrc) + if err != nil { + return nil, nil, err + } + + pkts, err := rtcp.Unmarshal(b[:i]) + + return pkts, attributes, err +} + func (r *RTPReceiver) haveReceived() bool { select { case <-r.received: diff --git a/sdp.go b/sdp.go index fe1500ec092..caa423c6a9f 100644 --- a/sdp.go +++ b/sdp.go @@ -29,8 +29,8 @@ type trackDetails struct { streamID string id string ssrcs []SSRC - rtxSsrc *SSRC - fecSsrc *SSRC + rtxSsrc []*SSRC + fecSsrc []*SSRC rids []string } @@ -136,7 +136,7 @@ func trackDetailsFromSDP( for i := range tracksInMediaSection { if tracksInMediaSection[i].ssrcs[0] == SSRC(baseSsrc) { repairSsrc := SSRC(rtxRepairFlow) - tracksInMediaSection[i].rtxSsrc = &repairSsrc + tracksInMediaSection[i].rtxSsrc = []*SSRC{&repairSsrc} } } } @@ -164,7 +164,7 @@ func trackDetailsFromSDP( for i := range tracksInMediaSection { if tracksInMediaSection[i].ssrcs[0] == SSRC(baseSsrc) { repairSsrc := SSRC(fecRepairFlow) - tracksInMediaSection[i].fecSsrc = &repairSsrc + tracksInMediaSection[i].fecSsrc = []*SSRC{&repairSsrc} } } } @@ -221,13 +221,13 @@ func trackDetailsFromSDP( for r, baseSsrc := range rtxRepairFlows { if baseSsrc == ssrc { repairSsrc := SSRC(r) //nolint:gosec // G115 - trackDetails.rtxSsrc = &repairSsrc + trackDetails.rtxSsrc = []*SSRC{&repairSsrc} } } for r, baseSsrc := range fecRepairFlows { if baseSsrc == ssrc { fecSsrc := SSRC(r) //nolint:gosec // G115 - trackDetails.fecSsrc = &fecSsrc + trackDetails.fecSsrc = []*SSRC{&fecSsrc} } } @@ -249,6 +249,36 @@ func trackDetailsFromSDP( simulcastTrack.rids = append(simulcastTrack.rids, rid.id) } + tracksInMediaSection = []trackDetails{simulcastTrack} + } else if ssrcs := getSimulcastSSRCs(log, media); len(ssrcs) != 0 && trackID != "" && streamID != "" { + simulcastTrack := trackDetails{ + mid: midValue, + kind: codecType, + streamID: streamID, + id: trackID, + ssrcs: ssrcs, + } + if len(rtxRepairFlows) > 0 { + simulcastTrack.rtxSsrc = make([]*SSRC, len(ssrcs)) + for rtx, base := range rtxRepairFlows { + baseSsrc := SSRC(base) //nolint:gosec // G115 + if pos := slices.Index(ssrcs, baseSsrc); pos != -1 { + repairSsrc := SSRC(rtx) //nolint:gosec // G115 + simulcastTrack.rtxSsrc[pos] = &repairSsrc + } + } + } + if len(fecRepairFlows) > 0 { + simulcastTrack.fecSsrc = make([]*SSRC, len(ssrcs)) + for fec, base := range fecRepairFlows { + baseSsrc := SSRC(base) //nolint:gosec // G115 + if pos := slices.Index(ssrcs, baseSsrc); pos != -1 { + fecSsrc := SSRC(fec) //nolint:gosec // G115 + simulcastTrack.fecSsrc[pos] = &fecSsrc + } + } + } + tracksInMediaSection = []trackDetails{simulcastTrack} } @@ -270,12 +300,12 @@ func trackDetailsToRTPReceiveParameters(trackDetails *trackDetails) RTPReceivePa encodings[i].SSRC = trackDetails.ssrcs[i] } - if trackDetails.rtxSsrc != nil { - encodings[i].RTX.SSRC = *trackDetails.rtxSsrc + if len(trackDetails.rtxSsrc) > i && trackDetails.rtxSsrc[i] != nil { + encodings[i].RTX.SSRC = *trackDetails.rtxSsrc[i] } - if trackDetails.fecSsrc != nil { - encodings[i].FEC.SSRC = *trackDetails.fecSsrc + if len(trackDetails.fecSsrc) > i && trackDetails.fecSsrc[i] != nil { + encodings[i].FEC.SSRC = *trackDetails.fecSsrc[i] } } @@ -316,6 +346,29 @@ func getRids(media *sdp.MediaDescription) []*simulcastRid { return rids } +func getSimulcastSSRCs(log logging.LeveledLogger, media *sdp.MediaDescription) []SSRC { + var ssrcs []SSRC + for _, attr := range media.Attributes { + if attr.Key == sdp.AttrKeySSRCGroup { + split := strings.Split(attr.Value, " ") + if split[0] == ssrcGroupSimulcast { + for _, v := range split[1:] { + ssrc, err := strconv.ParseUint(v, 10, 32) + if err != nil { + log.Warnf("Failed to parse SSRC: %v", err) + + continue + } + + ssrcs = append(ssrcs, SSRC(ssrc)) + } + } + } + } + + return ssrcs +} + func addCandidatesToMediaDescriptions( candidates []ICECandidate, mediaDescr *sdp.MediaDescription, diff --git a/sdp_test.go b/sdp_test.go index 5e7f135893b..8d2a429ec82 100644 --- a/sdp_test.go +++ b/sdp_test.go @@ -13,6 +13,7 @@ import ( "strings" "testing" + "github.com/pion/logging" "github.com/pion/sdp/v3" "github.com/pion/transport/v4/test" "github.com/stretchr/testify/assert" @@ -590,10 +591,12 @@ func TestTrackDetailsFromSDP(t *testing.T) { assert.Equal(t, RTPCodecTypeVideo, track.kind) assert.Equal(t, SSRC(3000), track.ssrcs[0]) assert.Equal(t, "video_trk_label", track.streamID) - require.NotNil(t, track.rtxSsrc, "missing RTX ssrc for video track") - assert.Equal(t, SSRC(4000), *track.rtxSsrc) - require.NotNil(t, track.fecSsrc, "missing FEC ssrc for video track") - assert.Equal(t, SSRC(5000), *track.fecSsrc) + require.Len(t, track.rtxSsrc, 1) + require.NotNil(t, track.rtxSsrc[0], "missing RTX ssrc for video track") + assert.Equal(t, SSRC(4000), *track.rtxSsrc[0]) + require.Len(t, track.fecSsrc, 1) + require.NotNil(t, track.fecSsrc[0], "missing FEC ssrc for video track") + assert.Equal(t, SSRC(5000), *track.fecSsrc[0]) }) t.Run("inactive and recvonly tracks ignored", func(t *testing.T) { @@ -654,8 +657,98 @@ func TestTrackDetailsFromSDP(t *testing.T) { tracks := trackDetailsFromSDP(nil, descr) assert.Equal(t, 2, len(tracks)) - assert.Equal(t, SSRC(4000), *tracks[0].rtxSsrc) - assert.Equal(t, SSRC(6000), *tracks[1].rtxSsrc) + require.Len(t, tracks[0].rtxSsrc, 1) + assert.Equal(t, SSRC(4000), *tracks[0].rtxSsrc[0]) + require.Len(t, tracks[1].rtxSsrc, 1) + assert.Equal(t, SSRC(6000), *tracks[1].rtxSsrc[0]) + }) + + t.Run("simulcast ssrc-group", func(t *testing.T) { + log := logging.NewDefaultLoggerFactory().NewLogger("test") + descr := &sdp.SessionDescription{ + MediaDescriptions: []*sdp.MediaDescription{ + { + MediaName: sdp.MediaName{ + Media: "video", + }, + Attributes: []sdp.Attribute{ + {Key: "mid", Value: "0"}, + {Key: "sendrecv"}, + {Key: "ssrc", Value: "3000 msid:video_trk_label video_trk_guid"}, + {Key: "ssrc", Value: "4000 msid:video_trk_label video_trk_guid"}, + {Key: "ssrc", Value: "5000 msid:video_trk_label video_trk_guid"}, + {Key: "ssrc-group", Value: "SIM 3000 4000 5000"}, + }, + }, + { + MediaName: sdp.MediaName{ + Media: "video", + }, + Attributes: []sdp.Attribute{ + {Key: "mid", Value: "0"}, + {Key: "sendrecv"}, + {Key: "ssrc", Value: "3000 msid:video_trk_label video_trk_guid"}, + {Key: "ssrc", Value: "4000 msid:video_trk_label video_trk_guid"}, + {Key: "ssrc", Value: "5000 msid:video_trk_label video_trk_guid"}, + {Key: "ssrc-group", Value: "SIM 3000 invalid 5000"}, + }, + }, + { + MediaName: sdp.MediaName{ + Media: "video", + }, + Attributes: []sdp.Attribute{ + {Key: "mid", Value: "0"}, + {Key: "sendrecv"}, + {Key: "ssrc", Value: "3000 msid:video_trk_label video_trk_guid"}, + {Key: "ssrc", Value: "3100 msid:video_trk_label video_trk_guid"}, + {Key: "ssrc", Value: "4000 msid:video_trk_label video_trk_guid"}, + {Key: "ssrc", Value: "4100 msid:video_trk_label video_trk_guid"}, + {Key: "ssrc", Value: "5000 msid:video_trk_label video_trk_guid"}, + {Key: "ssrc", Value: "5100 msid:video_trk_label video_trk_guid"}, + {Key: "ssrc-group", Value: "SIM 3000 4000 5000"}, + {Key: "ssrc-group", Value: "FID 3000 3100"}, + {Key: "ssrc-group", Value: "FID 4000 4100"}, + {Key: "ssrc-group", Value: "FID 5000 5100"}, + }, + }, + { + MediaName: sdp.MediaName{ + Media: "video", + }, + Attributes: []sdp.Attribute{ + {Key: "mid", Value: "0"}, + {Key: "sendrecv"}, + {Key: "ssrc", Value: "3000 msid:video_trk_label video_trk_guid"}, + {Key: "ssrc", Value: "3100 msid:video_trk_label video_trk_guid"}, + {Key: "ssrc", Value: "4000 msid:video_trk_label video_trk_guid"}, + {Key: "ssrc", Value: "4100 msid:video_trk_label video_trk_guid"}, + {Key: "ssrc", Value: "5000 msid:video_trk_label video_trk_guid"}, + {Key: "ssrc", Value: "5100 msid:video_trk_label video_trk_guid"}, + {Key: "ssrc-group", Value: "SIM 3000 4000 5000"}, + {Key: "ssrc-group", Value: "FEC-FR 3000 3100"}, + {Key: "ssrc-group", Value: "FEC-FR 4000 4100"}, + {Key: "ssrc-group", Value: "FEC-FR 5000 5100"}, + }, + }, + }, + } + + tracks := trackDetailsFromSDP(log, descr) + assert.Equal(t, 4, len(tracks)) + assert.Equal(t, []SSRC{3000, 4000, 5000}, tracks[0].ssrcs) + assert.Equal(t, []SSRC{3000, 5000}, tracks[1].ssrcs) + assert.Equal(t, []SSRC{3000, 4000, 5000}, tracks[2].ssrcs) + if assert.Len(t, tracks[2].rtxSsrc, 3) { + assert.Equal(t, SSRC(3100), *tracks[2].rtxSsrc[0]) + assert.Equal(t, SSRC(4100), *tracks[2].rtxSsrc[1]) + assert.Equal(t, SSRC(5100), *tracks[2].rtxSsrc[2]) + } + if assert.Len(t, tracks[3].fecSsrc, 3) { + assert.Equal(t, SSRC(3100), *tracks[3].fecSsrc[0]) + assert.Equal(t, SSRC(4100), *tracks[3].fecSsrc[1]) + assert.Equal(t, SSRC(5100), *tracks[3].fecSsrc[2]) + } }) }