diff --git a/apidocs/openapi.yaml b/apidocs/openapi.yaml index f51b48f85f6..7a6e299cc82 100644 --- a/apidocs/openapi.yaml +++ b/apidocs/openapi.yaml @@ -71,6 +71,8 @@ components: type: boolean runOnDisconnect: type: string + gopCache: + type: boolean # Authentication authMethod: diff --git a/internal/conf/conf.go b/internal/conf/conf.go index 78f2a784380..cc7997e4d9d 100644 --- a/internal/conf/conf.go +++ b/internal/conf/conf.go @@ -169,6 +169,7 @@ type Conf struct { RunOnConnect string `json:"runOnConnect"` RunOnConnectRestart bool `json:"runOnConnectRestart"` RunOnDisconnect string `json:"runOnDisconnect"` + GopCache bool `json:"gopCache"` // Authentication AuthMethod AuthMethod `json:"authMethod"` diff --git a/internal/core/core.go b/internal/core/core.go index 6ba7a449991..9ff115db0b6 100644 --- a/internal/core/core.go +++ b/internal/core/core.go @@ -360,6 +360,7 @@ func (p *Core) createResources(initial bool) error { pathConfs: p.conf.Paths, externalCmdPool: p.externalCmdPool, parent: p, + gopCache: p.conf.GopCache, } p.pathManager.initialize() @@ -715,6 +716,7 @@ func (p *Core) closeResources(newConf *conf.Conf, calledByAPI bool) { newConf.WriteTimeout != p.conf.WriteTimeout || newConf.WriteQueueSize != p.conf.WriteQueueSize || newConf.UDPMaxPayloadSize != p.conf.UDPMaxPayloadSize || + newConf.GopCache != p.conf.GopCache || closeMetrics || closeAuthManager || closeLogger diff --git a/internal/core/path.go b/internal/core/path.go index f261cd107fe..cae2868816f 100644 --- a/internal/core/path.go +++ b/internal/core/path.go @@ -75,6 +75,7 @@ type path struct { wg *sync.WaitGroup externalCmdPool *externalcmd.Pool parent pathParent + gopCache bool ctx context.Context ctxCancel func() @@ -701,6 +702,7 @@ func (pa *path) setReady(desc *description.Session, allocateEncoder bool) error Desc: desc, GenerateRTPPackets: allocateEncoder, DecodeErrLogger: logger.NewLimitedLogger(pa.source), + GopCache: pa.gopCache, } err := pa.stream.Initialize() if err != nil { diff --git a/internal/core/path_manager.go b/internal/core/path_manager.go index bbd1bb7b0a3..d0d4adac74e 100644 --- a/internal/core/path_manager.go +++ b/internal/core/path_manager.go @@ -59,6 +59,7 @@ type pathManager struct { pathConfs map[string]*conf.Path externalCmdPool *externalcmd.Pool parent pathManagerParent + gopCache bool ctx context.Context ctxCancel func() @@ -352,6 +353,7 @@ func (pm *pathManager) createPath( wg: &pm.wg, externalCmdPool: pm.externalCmdPool, parent: pm, + gopCache: pm.gopCache, } pa.initialize() diff --git a/internal/servers/rtsp/session.go b/internal/servers/rtsp/session.go index 8895ae2bfea..ff9a5f068bc 100644 --- a/internal/servers/rtsp/session.go +++ b/internal/servers/rtsp/session.go @@ -10,6 +10,7 @@ import ( "github.com/bluenviron/gortsplib/v4" "github.com/bluenviron/gortsplib/v4/pkg/base" + "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/google/uuid" "github.com/pion/rtp" @@ -250,6 +251,27 @@ func (s *session) onPlay(_ *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, e s.state = gortsplib.ServerSessionStatePlay s.transport = s.rsession.SetuppedTransport() s.mutex.Unlock() + + if len(s.stream.CachedUnits) > 0 { + lastCachedUnits := s.stream.CachedUnits[len(s.stream.CachedUnits)-1] + rtpPackets := lastCachedUnits.GetRTPPackets() + if len(rtpPackets) > 0 { + lastTimestamp := rtpPackets[0].Timestamp + for _, medi := range s.stream.Desc.Medias { + if medi.Type == description.MediaTypeVideo { + for _, u := range s.stream.CachedUnits { + for _, pkt := range u.GetRTPPackets() { + pkt.Timestamp = lastTimestamp + err := s.rsession.WritePacketRTP(medi, pkt) + if err != nil { + break + } + } + } + } + } + } + } } return &base.Response{ diff --git a/internal/servers/webrtc/server_test.go b/internal/servers/webrtc/server_test.go index a17f87e314b..7ddd910ce28 100644 --- a/internal/servers/webrtc/server_test.go +++ b/internal/servers/webrtc/server_test.go @@ -357,8 +357,9 @@ func TestServerRead(t *testing.T) { for _, ca := range []struct { name string medias []*description.Media - unit unit.Unit + unit []unit.Unit outRTPPayload []byte + gopCache bool }{ { "av1", @@ -368,10 +369,13 @@ func TestServerRead(t *testing.T) { PayloadTyp: 96, }}, }}, - &unit.AV1{ - TU: [][]byte{{1, 2}}, + []unit.Unit{ + &unit.AV1{ + TU: [][]byte{{1, 2}}, + }, }, []byte{0, 2, 1, 2}, + false, }, { "vp9", @@ -381,14 +385,17 @@ func TestServerRead(t *testing.T) { PayloadTyp: 96, }}, }}, - &unit.VP9{ - Frame: []byte{0x82, 0x49, 0x83, 0x42, 0x0, 0x77, 0xf0, 0x32, 0x34}, + []unit.Unit{ + &unit.VP9{ + Frame: []byte{0x82, 0x49, 0x83, 0x42, 0x0, 0x77, 0xf0, 0x32, 0x34}, + }, }, []byte{ 0x8f, 0xa0, 0xfd, 0x18, 0x07, 0x80, 0x03, 0x24, 0x01, 0x14, 0x01, 0x82, 0x49, 0x83, 0x42, 0x00, 0x77, 0xf0, 0x32, 0x34, }, + false, }, { "vp8", @@ -398,17 +405,22 @@ func TestServerRead(t *testing.T) { PayloadTyp: 96, }}, }}, - &unit.VP8{ - Frame: []byte{1, 2}, + []unit.Unit{ + &unit.VP8{ + Frame: []byte{1, 2}, + }, }, []byte{0x10, 1, 2}, + false, }, { "h264", []*description.Media{test.MediaH264}, - &unit.H264{ - AU: [][]byte{ - {5, 1}, + []unit.Unit{ + &unit.H264{ + AU: [][]byte{ + {5, 1}, + }, }, }, []byte{ @@ -418,6 +430,145 @@ func TestServerRead(t *testing.T) { 0x3c, 0x60, 0xc9, 0x20, 0x00, 0x04, 0x08, 0x06, 0x07, 0x08, 0x00, 0x02, 0x05, 0x01, }, + false, + }, + { + "h264 with gop cache", + []*description.Media{test.MediaH264}, + []unit.Unit{ + // ffmpeg -f lavfi -i color=blue:s=2x2 -vframes 10 -c:v libx264 out.264 + &unit.H264{ + AU: [][]byte{ + { + 0x65, 0x88, 0x84, 0x00, 0x37, 0xff, 0xfe, 0xe1, + 0x03, 0xf8, 0x14, 0xd7, 0x4d, 0xfe, 0x63, 0x8f, + 0x43, 0xd9, 0x01, 0x68, 0xc1, + }, + }, + }, + &unit.H264{ + AU: [][]byte{ + {0x41, 0x9a, 0x24, 0x6c, 0x43, 0x7f, 0xfe, 0xe0}, + }, + }, + &unit.H264{ + AU: [][]byte{ + {0x41, 0x9e, 0x42, 0x78, 0x85, 0xff, 0xc1, 0x81}, + }, + }, + &unit.H264{ + AU: [][]byte{ + {0x01, 0x9e, 0x61, 0x74, 0x42, 0xbf, 0xc4, 0x80}, + }, + }, + &unit.H264{ + AU: [][]byte{ + {0x01, 0x9e, 0x63, 0x6a, 0x42, 0xbf, 0xc4, 0x81}, + }, + }, + &unit.H264{ + AU: [][]byte{ + {0x41, 0x9a, 0x68, 0x49, 0xa8, 0x41, 0x68, 0x99, 0x4c, 0x08, 0x5f, 0xff, 0xfe, 0xe1}, + }, + }, + &unit.H264{ + AU: [][]byte{ + {0x41, 0x9e, 0x86, 0x45, 0x11, 0x2c, 0x2f, 0xff, 0xc1, 0x81}, + }, + }, + &unit.H264{ + AU: [][]byte{ + {0x01, 0x9e, 0xa5, 0x74, 0x42, 0xbf, 0xc4, 0x81}, + }, + }, + &unit.H264{ + AU: [][]byte{ + {0x01, 0x9e, 0xa7, 0x6a, 0x42, 0xbf, 0xc4, 0x80}, + }, + }, + &unit.H264{ + AU: [][]byte{ + {0x41, 0x9a, 0xa9, 0x49, 0xa8, 0x41, 0x6c, 0x99, 0x4c, 0x08, 0x57, 0xff, 0xfe, 0xc0}, + }, + }, + }, + []byte{ + 0x18, 0x00, 0x19, 0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02, 0x27, 0xe5, 0x84, 0x00, 0x00, + 0x03, 0x00, 0x04, 0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, 0x20, 0x00, 0x04, 0x08, 0x06, + 0x07, 0x08, 0x00, 0x15, 0x65, 0x88, 0x84, 0x00, 0x37, 0xff, 0xfe, 0xe1, 0x03, 0xf8, 0x14, 0xd7, + 0x4d, 0xfe, 0x63, 0x8f, 0x43, 0xd9, 0x01, 0x68, 0xc1, + }, + true, + }, + { + "h265 with gop cache", + []*description.Media{test.MediaH265}, + []unit.Unit{ + // ffmpeg -f lavfi -i color=blue:s=16x16 -vframes 10 -c:v libx265 out.265 + &unit.H265{ + AU: [][]byte{ + { + 0x28, 0x01, 0xaf, 0x1d, 0x80, 0xf0, 0x0e, 0x9e, 0x0f, 0xfd, 0x7d, 0x3a, 0x39, 0xb1, + 0xc7, 0x6f, 0x98, + }, + }, + }, + &unit.H265{ + AU: [][]byte{ + {0x02, 0x01, 0xd0, 0x29, 0x4b, 0xe1, 0x0c, 0x63, 0x90, 0xfa, 0x84}, + }, + }, + &unit.H265{ + AU: [][]byte{ + {0x02, 0x01, 0xe0, 0x64, 0x9d, 0x78, 0x61, 0x24, 0xc5, 0x60}, + }, + }, + &unit.H265{ + AU: [][]byte{ + {0x00, 0x01, 0xe0, 0x24, 0xf5, 0x5f, 0xa2, 0xc2, 0x98, 0xc8, 0x20}, + }, + }, + &unit.H265{ + AU: [][]byte{ + {0x00, 0x01, 0xe0, 0x44, 0xd7, 0x5f, 0xa2, 0xc2, 0x88, 0xc8, 0x20}, + }, + }, + &unit.H265{ + AU: [][]byte{ + {0x00, 0x01, 0xe0, 0x86, 0xb7, 0xfd, 0x46, 0x14, 0xc0, 0xc8, 0x20}, + }, + }, + &unit.H265{ + AU: [][]byte{ + {0x02, 0x01, 0xd0, 0x48, 0x92, 0x55, 0xfd, 0xc4, 0x30, 0x18, 0xec, 0xfa, 0x84}, + }, + }, + &unit.H265{ + AU: [][]byte{ + {0x02, 0x01, 0xe0, 0xe2, 0x25, 0x57, 0x5f, 0x71, 0x84, 0x90, 0xc5, 0x60}, + }, + }, + &unit.H265{ + AU: [][]byte{ + {0x00, 0x01, 0xe0, 0xc6, 0xf5, 0xd7, 0xd2, 0x2c, 0x29, 0x80, 0xc8, 0x20}, + }, + }, + &unit.H265{ + AU: [][]byte{ + {0x00, 0x01, 0xe1, 0x02, 0x2d, 0x57, 0xf7, 0x18, 0x51, 0xc8, 0x20}, + }, + }, + }, + []byte{ + 0x60, 0x0, 0x0, 0x18, 0x40, 0x1, 0xc, 0x1, 0xff, 0xff, 0x2, 0x20, 0x0, 0x0, 0x3, 0x0, 0xb0, 0x0, + 0x0, 0x3, 0x0, 0x0, 0x3, 0x0, 0x7b, 0x18, 0xb0, 0x24, 0x0, 0x3c, 0x42, 0x1, 0x1, 0x2, 0x20, 0x0, + 0x0, 0x3, 0x0, 0xb0, 0x0, 0x0, 0x3, 0x0, 0x0, 0x3, 0x0, 0x7b, 0xa0, 0x7, 0x82, 0x0, 0x88, 0x7d, + 0xb6, 0x71, 0x8b, 0x92, 0x44, 0x80, 0x53, 0x88, 0x88, 0x92, 0xcf, 0x24, 0xa6, 0x92, 0x72, 0xc9, + 0x12, 0x49, 0x22, 0xdc, 0x91, 0xaa, 0x48, 0xfc, 0xa2, 0x23, 0xff, 0x0, 0x1, 0x0, 0x1, 0x6a, 0x2, + 0x2, 0x2, 0x1, 0x0, 0x8, 0x44, 0x1, 0xc0, 0x25, 0x2f, 0x5, 0x32, 0x40, 0x0, 0x11, 0x28, 0x1, 0xaf, + 0x1d, 0x80, 0xf0, 0xe, 0x9e, 0xf, 0xfd, 0x7d, 0x3a, 0x39, 0xb1, 0xc7, 0x6f, 0x98, + }, + true, }, { "opus", @@ -428,10 +579,13 @@ func TestServerRead(t *testing.T) { ChannelCount: 2, }}, }}, - &unit.Opus{ - Packets: [][]byte{{1, 2}}, + []unit.Unit{ + &unit.Opus{ + Packets: [][]byte{{1, 2}}, + }, }, []byte{1, 2}, + false, }, { "g722", @@ -439,22 +593,25 @@ func TestServerRead(t *testing.T) { Type: description.MediaTypeAudio, Formats: []format.Format{&format.G722{}}, }}, - &unit.Generic{ - Base: unit.Base{ - RTPPackets: []*rtp.Packet{{ - Header: rtp.Header{ - Version: 2, - Marker: true, - PayloadType: 9, - SequenceNumber: 1123, - Timestamp: 45343, - SSRC: 563423, - }, - Payload: []byte{1, 2}, - }}, + []unit.Unit{ + &unit.Generic{ + Base: unit.Base{ + RTPPackets: []*rtp.Packet{{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 9, + SequenceNumber: 1123, + Timestamp: 45343, + SSRC: 563423, + }, + Payload: []byte{1, 2}, + }}, + }, }, }, []byte{1, 2}, + false, }, { "g711 8khz mono", @@ -466,10 +623,13 @@ func TestServerRead(t *testing.T) { ChannelCount: 1, }}, }}, - &unit.G711{ - Samples: []byte{1, 2, 3}, + []unit.Unit{ + &unit.G711{ + Samples: []byte{1, 2, 3}, + }, }, []byte{1, 2, 3}, + false, }, { "g711 16khz stereo", @@ -481,10 +641,13 @@ func TestServerRead(t *testing.T) { ChannelCount: 2, }}, }}, - &unit.G711{ - Samples: []byte{1, 2, 3, 4}, + []unit.Unit{ + &unit.G711{ + Samples: []byte{1, 2, 3, 4}, + }, }, []byte{0x86, 0x84, 0x8a, 0x84, 0x8e, 0x84, 0x92, 0x84}, + false, }, { "lpcm", @@ -497,10 +660,13 @@ func TestServerRead(t *testing.T) { ChannelCount: 2, }}, }}, - &unit.LPCM{ - Samples: []byte{1, 2, 3, 4}, + []unit.Unit{ + &unit.LPCM{ + Samples: []byte{1, 2, 3, 4}, + }, }, []byte{1, 2, 3, 4}, + false, }, } { t.Run(ca.name, func(t *testing.T) { @@ -510,8 +676,9 @@ func TestServerRead(t *testing.T) { WriteQueueSize: 512, UDPMaxPayloadSize: 1472, Desc: desc, - GenerateRTPPackets: reflect.TypeOf(ca.unit) != reflect.TypeOf(&unit.Generic{}), + GenerateRTPPackets: reflect.TypeOf(ca.unit[0]) != reflect.TypeOf(&unit.Generic{}), DecodeErrLogger: test.NilLogger, + GopCache: ca.gopCache, } err := strm.Initialize() require.NoError(t, err) @@ -578,16 +745,26 @@ func TestServerRead(t *testing.T) { go func() { defer close(writerDone) - strm.WaitRunningReader() - - r := reflect.New(reflect.TypeOf(ca.unit).Elem()) - r.Elem().Set(reflect.ValueOf(ca.unit).Elem()) + // When testing for gopCache, start pushing packets before the client connects + if !ca.gopCache { + strm.WaitRunningReader() + } - if g, ok := r.Interface().(*unit.Generic); ok { - clone := *g.RTPPackets[0] - strm.WriteRTPPacket(desc.Medias[0], desc.Medias[0].Formats[0], &clone, time.Time{}, 0) - } else { - strm.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], r.Interface().(unit.Unit)) + for i, u := range ca.unit { + r := reflect.New(reflect.TypeOf(u).Elem()) + r.Elem().Set(reflect.ValueOf(u).Elem()) + + // When testing for gopCache, wait until half-way before pushing the rest of segments. + if i == len(ca.unit)/2 && ca.gopCache { + strm.WaitRunningReader() + } + + if g, ok := r.Interface().(*unit.Generic); ok { + clone := *g.RTPPackets[0] + strm.WriteRTPPacket(desc.Medias[0], desc.Medias[0].Formats[0], &clone, time.Time{}, 0) + } else { + strm.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], r.Interface().(unit.Unit)) + } } }() diff --git a/internal/stream/stream.go b/internal/stream/stream.go index d5400170718..430a36e714d 100644 --- a/internal/stream/stream.go +++ b/internal/stream/stream.go @@ -41,6 +41,9 @@ type Stream struct { streamReaders map[Reader]*streamReader readerRunning chan struct{} + + GopCache bool + CachedUnits []unit.Unit } // Initialize initializes a Stream. @@ -57,6 +60,7 @@ func (s *Stream) Initialize() error { Media: media, GenerateRTPPackets: s.GenerateRTPPackets, DecodeErrLogger: s.DecodeErrLogger, + gopCache: s.GopCache, } err := s.streamMedias[media].initialize() if err != nil { @@ -172,9 +176,102 @@ func (s *Stream) StartReader(reader Reader) { sr.start() - for _, sm := range s.streamMedias { + for m, sm := range s.streamMedias { for _, sf := range sm.formats { sf.startReader(sr) + if m.Type == description.MediaTypeVideo { + cb := sf.runningReaders[sr] + if cb == nil { + continue + } + + framesWithAU := 0 + for _, u := range s.CachedUnits { + if !isEmptyAU(u) { + framesWithAU++ + } + } + if framesWithAU == 0 { + continue + } + + // The previous p-frames must be sent at a certain speed to avoid the video freezing. + // We must update the PTS of the p-frames to have them played back real quick, but not instantly. + // If we do not update the PTS, the client will pause by an amount equal to the time between the p-frames. + // This is an issue because we want to send the p-frames as fast as possible. + playbackFPS := 100 + msPerFrame := 1000 / playbackFPS + ticksPerMs := 90000 / 1000 + rtpPackets := s.CachedUnits[len(s.CachedUnits)-1].GetRTPPackets() + if len(rtpPackets) == 0 { + continue + } + lastTimestamp := rtpPackets[0].Timestamp + lastPts := s.CachedUnits[len(s.CachedUnits)-1].GetPTS() + delta := -ticksPerMs * framesWithAU * msPerFrame + start := time.Now() + for _, u := range s.CachedUnits { + if isEmptyAU(u) { + continue + } + delta += ticksPerMs * msPerFrame + start = start.Add(time.Millisecond * time.Duration(msPerFrame)) + + var clonedU unit.Unit + switch tunit := u.(type) { + case *unit.H264: + clonedU = &unit.H264{ + Base: unit.Base{ + RTPPackets: []*rtp.Packet{ + { + Header: rtp.Header{ + Timestamp: lastTimestamp + uint32(delta), + }, + }, + }, + PTS: lastPts + int64(delta), + }, + AU: tunit.AU, + } + case *unit.H265: + clonedU = &unit.H265{ + Base: unit.Base{ + RTPPackets: []*rtp.Packet{ + { + Header: rtp.Header{ + Timestamp: lastTimestamp + uint32(delta), + }, + }, + }, + PTS: lastPts + int64(delta), + }, + AU: tunit.AU, + } + case *unit.AV1: + clonedU = &unit.AV1{ + Base: unit.Base{ + RTPPackets: []*rtp.Packet{ + { + Header: rtp.Header{ + Timestamp: lastTimestamp + uint32(delta), + }, + }, + }, + PTS: lastPts + int64(delta), + }, + TU: tunit.TU, + } + } + until := start + sr.push(func() error { + size := unitSize(clonedU) + atomic.AddUint64(s.bytesSent, size) + err := cb(clonedU) + time.Sleep(time.Until(until)) + return err + }) + } + } } } diff --git a/internal/stream/stream_format.go b/internal/stream/stream_format.go index cea65f6bd3f..6ffd3057138 100644 --- a/internal/stream/stream_format.go +++ b/internal/stream/stream_format.go @@ -6,6 +6,9 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/mediacommon/v2/pkg/codecs/av1" + "github.com/bluenviron/mediacommon/v2/pkg/codecs/h264" + "github.com/bluenviron/mediacommon/v2/pkg/codecs/h265" "github.com/pion/rtp" "github.com/bluenviron/mediamtx/internal/formatprocessor" @@ -13,6 +16,10 @@ import ( "github.com/bluenviron/mediamtx/internal/unit" ) +const ( + maxCachedGOPSize int = 512 +) + func unitSize(u unit.Unit) uint64 { n := uint64(0) for _, pkt := range u.GetRTPPackets() { @@ -21,6 +28,32 @@ func unitSize(u unit.Unit) uint64 { return n } +func isKeyFrame(u unit.Unit) bool { + switch tunit := u.(type) { + case *unit.H264: + return h264.IsRandomAccess(tunit.AU) + case *unit.H265: + return h265.IsRandomAccess(tunit.AU) + case *unit.AV1: + var isRandomAccess bool + isRandomAccess, _ = av1.IsRandomAccess(tunit.TU) + return isRandomAccess + } + return false +} + +func isEmptyAU(u unit.Unit) bool { + switch tunit := u.(type) { + case *unit.H264: + return len(tunit.AU) == 0 + case *unit.H265: + return len(tunit.AU) == 0 + case *unit.AV1: + return len(tunit.TU) == 0 + } + return true +} + type streamFormat struct { udpMaxPayloadSize int format format.Format @@ -30,6 +63,7 @@ type streamFormat struct { proc formatprocessor.Processor pausedReaders map[*streamReader]ReadFunc runningReaders map[*streamReader]ReadFunc + gopCache bool } func (sf *streamFormat) initialize() error { @@ -78,7 +112,7 @@ func (sf *streamFormat) writeRTPPacket( ntp time.Time, pts int64, ) { - hasNonRTSPReaders := len(sf.pausedReaders) > 0 || len(sf.runningReaders) > 0 + hasNonRTSPReaders := len(sf.pausedReaders) > 0 || len(sf.runningReaders) > 0 || sf.gopCache u, err := sf.proc.ProcessRTPPacket(pkt, ntp, pts, hasNonRTSPReaders) if err != nil { @@ -94,6 +128,33 @@ func (sf *streamFormat) writeUnitInner(s *Stream, medi *description.Media, u uni atomic.AddUint64(s.bytesReceived, size) + if sf.gopCache && medi.Type == description.MediaTypeVideo { + if isKeyFrame(u) { + if s.CachedUnits == nil { + // Initialize the cache and enable caching + s.CachedUnits = make([]unit.Unit, 0, maxCachedGOPSize) + } else { + // Keep the last packets that were used to generate the key frame. + // This is to send a full key frame in the RTSP stream. + i := len(s.CachedUnits) + for ; i > 0; i-- { + if !isEmptyAU(s.CachedUnits[i-1]) { + break + } + } + s.CachedUnits = s.CachedUnits[i:] + } + } + if s.CachedUnits != nil { + s.CachedUnits = append(s.CachedUnits, u) + } + l := len(s.CachedUnits) + if l > maxCachedGOPSize { + s.CachedUnits = s.CachedUnits[l-maxCachedGOPSize:] + sf.decodeErrLogger.Log(logger.Warn, "GOP cache is full, dropping packets") + } + } + if s.rtspStream != nil { for _, pkt := range u.GetRTPPackets() { s.rtspStream.WritePacketRTPWithNTP(medi, pkt, u.GetNTP()) //nolint:errcheck diff --git a/internal/stream/stream_media.go b/internal/stream/stream_media.go index 1ba5b3a00e6..571344772ac 100644 --- a/internal/stream/stream_media.go +++ b/internal/stream/stream_media.go @@ -13,7 +13,8 @@ type streamMedia struct { GenerateRTPPackets bool DecodeErrLogger logger.Writer - formats map[format.Format]*streamFormat + formats map[format.Format]*streamFormat + gopCache bool } func (sm *streamMedia) initialize() error { @@ -25,6 +26,7 @@ func (sm *streamMedia) initialize() error { format: forma, generateRTPPackets: sm.GenerateRTPPackets, decodeErrLogger: sm.DecodeErrLogger, + gopCache: sm.gopCache, } err := sf.initialize() if err != nil { diff --git a/internal/test/medias.go b/internal/test/medias.go index 1fa1d79f9c5..ca76c2a6b88 100644 --- a/internal/test/medias.go +++ b/internal/test/medias.go @@ -8,6 +8,9 @@ import ( // MediaH264 is a dummy H264 media. var MediaH264 = UniqueMediaH264() +// MediaH265 is a dummy H265 media. +var MediaH265 = UniqueMediaH265() + // MediaMPEG4Audio is a dummy MPEG-4 audio media. var MediaMPEG4Audio = UniqueMediaMPEG4Audio() @@ -19,6 +22,14 @@ func UniqueMediaH264() *description.Media { } } +// UniqueMediaH265 is a dummy H265 media. +func UniqueMediaH265() *description.Media { + return &description.Media{ + Type: description.MediaTypeVideo, + Formats: []format.Format{FormatH265}, + } +} + // UniqueMediaMPEG4Audio is a dummy MPEG-4 audio media. func UniqueMediaMPEG4Audio() *description.Media { return &description.Media{ diff --git a/mediamtx.yml b/mediamtx.yml index 8cecd7078fb..6ed001882cf 100644 --- a/mediamtx.yml +++ b/mediamtx.yml @@ -23,6 +23,9 @@ writeQueueSize: 512 # Maximum size of outgoing UDP packets. # This can be decreased to avoid fragmentation on networks with a low UDP MTU. udpMaxPayloadSize: 1472 +# Enable GOP cache to improve initial playback experience for new clients. +# Note: will increase memory usage. +gopCache: false # Command to run when a client connects to the server. # This is terminated with SIGINT when a client disconnects from the server.