diff --git a/internal/protocols/mpegts/from_stream.go b/internal/protocols/mpegts/from_stream.go index 4840d739ce8..948e1b823f6 100644 --- a/internal/protocols/mpegts/from_stream.go +++ b/internal/protocols/mpegts/from_stream.go @@ -79,7 +79,11 @@ func FromStream( dts, err := dtsExtractor.Extract(u.Payload.(unit.PayloadH265), u.PTS) if err != nil { - return err + r.Parent.Log(logger.Warn, + "H265 DTS extractor reset after failure: %s; "+ + "waiting for next random access frame", err) + dtsExtractor = nil + return nil } sconn.SetWriteDeadline(time.Now().Add(writeTimeout)) @@ -120,7 +124,11 @@ func FromStream( dts, err := dtsExtractor.Extract(u.Payload.(unit.PayloadH264), u.PTS) if err != nil { - return err + r.Parent.Log(logger.Warn, + "H264 DTS extractor reset after failure: %s; "+ + "waiting for next random access frame", err) + dtsExtractor = nil + return nil } sconn.SetWriteDeadline(time.Now().Add(writeTimeout)) diff --git a/internal/protocols/mpegts/from_stream_recovery_test.go b/internal/protocols/mpegts/from_stream_recovery_test.go new file mode 100644 index 00000000000..ba54f4b92c8 --- /dev/null +++ b/internal/protocols/mpegts/from_stream_recovery_test.go @@ -0,0 +1,135 @@ +package mpegts + +import ( + "bufio" + "bytes" + "fmt" + "reflect" + "strings" + "testing" + "time" + "unsafe" + + "github.com/bluenviron/gortsplib/v5/pkg/description" + "github.com/bluenviron/gortsplib/v5/pkg/format" + srt "github.com/datarhei/gosrt" + "github.com/stretchr/testify/require" + + "github.com/bluenviron/mediamtx/internal/logger" + "github.com/bluenviron/mediamtx/internal/stream" + "github.com/bluenviron/mediamtx/internal/test" + "github.com/bluenviron/mediamtx/internal/unit" +) + +// fakeSRTConn implements only the SetWriteDeadline method used by the +// FromStream callbacks; all other methods inherit from a nil embedded +// interface and would panic if invoked. This is intentional: the tests below +// only exercise code paths that touch SetWriteDeadline (or do not touch +// sconn at all). +type fakeSRTConn struct { + srt.Conn +} + +func (fakeSRTConn) SetWriteDeadline(_ time.Time) error { return nil } + +// extractOnData reaches into the unexported onDatas map of stream.Reader to +// retrieve the callback registered by FromStream. This avoids spinning up a +// full async stream pipeline and keeps the recovery tests focused. +func extractOnData( + t *testing.T, + r *stream.Reader, + medi *description.Media, + forma format.Format, +) stream.OnDataFunc { + t.Helper() + rv := reflect.ValueOf(r).Elem() + field := rv.FieldByName("onDatas") + require.True(t, field.IsValid(), "onDatas field not found") + mapVal := reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).Elem() + + formats := mapVal.MapIndex(reflect.ValueOf(medi)) + require.True(t, formats.IsValid(), "no callbacks registered for media") + cb := formats.MapIndex(reflect.ValueOf(forma)) + require.True(t, cb.IsValid(), "no callback registered for format") + return cb.Interface().(stream.OnDataFunc) +} + +func TestFromStreamH264DTSExtractorRecovery(t *testing.T) { + medi := &description.Media{ + Type: description.MediaTypeVideo, + Formats: []format.Format{test.FormatH264}, + } + desc := &description.Session{Medias: []*description.Media{medi}} + + var warnLogs []string + r := &stream.Reader{ + Parent: test.Logger(func(l logger.Level, format string, args ...any) { + if l == logger.Warn { + warnLogs = append(warnLogs, fmt.Sprintf(format, args...)) + } + }), + } + + var buf bytes.Buffer + bw := bufio.NewWriter(&buf) + + err := FromStream(desc, r, bw, fakeSRTConn{}, time.Second) + require.NoError(t, err) + + cb := extractOnData(t, r, medi, medi.Formats[0]) + + idr := unit.PayloadH264{test.FormatH264.SPS, test.FormatH264.PPS, {0x65}} + + // 1) Prime the extractor with a valid IDR. + require.NoError(t, cb(&unit.Unit{PTS: 90000, Payload: idr})) + + // 2) Send a unit whose PTS goes backwards. Before the fix, the DTS + // extractor returned an error here that propagated out of the reader + // and tore down the SRT receiver. The fix logs a warning, resets the + // extractor, and returns nil so the connection survives. + require.NoError(t, cb(&unit.Unit{PTS: 0, Payload: idr})) + + require.NotEmpty(t, warnLogs, "expected a warn log after the DTS error") + require.True(t, strings.Contains(warnLogs[0], "H264 DTS extractor reset"), + "unexpected warn log: %q", warnLogs[0]) + + // 3) The next valid IDR should re-prime the extractor and succeed. + require.NoError(t, cb(&unit.Unit{PTS: 180000, Payload: idr})) +} + +func TestFromStreamH265DTSExtractorRecovery(t *testing.T) { + medi := &description.Media{ + Type: description.MediaTypeVideo, + Formats: []format.Format{test.FormatH265}, + } + desc := &description.Session{Medias: []*description.Media{medi}} + + var warnLogs []string + r := &stream.Reader{ + Parent: test.Logger(func(l logger.Level, format string, args ...any) { + if l == logger.Warn { + warnLogs = append(warnLogs, fmt.Sprintf(format, args...)) + } + }), + } + + var buf bytes.Buffer + bw := bufio.NewWriter(&buf) + + err := FromStream(desc, r, bw, fakeSRTConn{}, time.Second) + require.NoError(t, err) + + cb := extractOnData(t, r, medi, medi.Formats[0]) + + // IDR (NAL unit type 19, IDR_W_RADL) prefixed with H265 NAL header. + idr := unit.PayloadH265{{0x26, 0x01, 0xaf, 0x08, 0x42, 0x23, 0x48, 0x8a, 0x43, 0xe2}} + + require.NoError(t, cb(&unit.Unit{PTS: 90000, Payload: idr})) + require.NoError(t, cb(&unit.Unit{PTS: 0, Payload: idr})) + + require.NotEmpty(t, warnLogs, "expected a warn log after the DTS error") + require.True(t, strings.Contains(warnLogs[0], "H265 DTS extractor reset"), + "unexpected warn log: %q", warnLogs[0]) + + require.NoError(t, cb(&unit.Unit{PTS: 180000, Payload: idr})) +} diff --git a/internal/protocols/rtmp/from_stream.go b/internal/protocols/rtmp/from_stream.go index eee7d577893..e6882052281 100644 --- a/internal/protocols/rtmp/from_stream.go +++ b/internal/protocols/rtmp/from_stream.go @@ -134,7 +134,11 @@ func FromStream( dts, err := videoDTSExtractor.Extract(u.Payload.(unit.PayloadH265), u.PTS) if err != nil { - return err + r.Parent.Log(logger.Warn, + "H265 DTS extractor reset after failure: %s; "+ + "waiting for next random access frame", err) + videoDTSExtractor = nil + return nil } nconn.SetWriteDeadline(time.Now().Add(writeTimeout)) @@ -196,7 +200,11 @@ func FromStream( dts, err := videoDTSExtractor.Extract(u.Payload.(unit.PayloadH264), u.PTS) if err != nil { - return err + r.Parent.Log(logger.Warn, + "H264 DTS extractor reset after failure: %s; "+ + "waiting for next random access frame", err) + videoDTSExtractor = nil + return nil } nconn.SetWriteDeadline(time.Now().Add(writeTimeout)) diff --git a/internal/protocols/rtmp/from_stream_recovery_test.go b/internal/protocols/rtmp/from_stream_recovery_test.go new file mode 100644 index 00000000000..5d31751764f --- /dev/null +++ b/internal/protocols/rtmp/from_stream_recovery_test.go @@ -0,0 +1,161 @@ +package rtmp + +import ( + "context" + "fmt" + "net" + "net/url" + "strings" + "sync" + "testing" + "time" + + "github.com/bluenviron/gortmplib" + "github.com/bluenviron/gortsplib/v5/pkg/description" + "github.com/bluenviron/gortsplib/v5/pkg/format" + "github.com/stretchr/testify/require" + + "github.com/bluenviron/mediamtx/internal/logger" + "github.com/bluenviron/mediamtx/internal/stream" + "github.com/bluenviron/mediamtx/internal/test" + "github.com/bluenviron/mediamtx/internal/unit" +) + +// captureLogger collects warn-level log lines so the recovery tests can assert +// the DTS extractor reset message was emitted. +type captureLogger struct { + mu sync.Mutex + logs []string +} + +func (c *captureLogger) Log(level logger.Level, format string, args ...any) { + if level != logger.Warn { + return + } + c.mu.Lock() + c.logs = append(c.logs, fmt.Sprintf(format, args...)) + c.mu.Unlock() +} + +func (c *captureLogger) waitFor(t *testing.T, substr string) { + t.Helper() + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + c.mu.Lock() + for _, l := range c.logs { + if strings.Contains(l, substr) { + c.mu.Unlock() + return + } + } + c.mu.Unlock() + time.Sleep(20 * time.Millisecond) + } + c.mu.Lock() + defer c.mu.Unlock() + t.Fatalf("expected warn log containing %q, got %v", substr, c.logs) +} + +// runRecoveryScenario sets up a real RTMP server <-> client pair so that +// FromStream's writer can flush packets, then runs writeUnits and waits for +// the expected warn message. The client only consumes track metadata; the +// individual frames are simply drained by the underlying TCP socket. +func runRecoveryScenario( + t *testing.T, + medias []*description.Media, + writeUnits func(medias []*description.Media, sub *stream.SubStream), + expectedWarn string, +) { + t.Helper() + + strm := &stream.Stream{ + Desc: &description.Session{Medias: medias}, + WriteQueueSize: 512, + RTPMaxPayloadSize: 1450, + Parent: test.NilLogger, + } + require.NoError(t, strm.Initialize()) + defer strm.Close() + + subStream := &stream.SubStream{Stream: strm, UseRTPPackets: false} + require.NoError(t, subStream.Initialize()) + + ln, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + defer ln.Close() + + clientDone := make(chan struct{}) + + go func() { + defer close(clientDone) + + u, perr := url.Parse("rtmp://" + ln.Addr().String() + "/stream") + require.NoError(t, perr) + + c := &gortmplib.Client{URL: u} + require.NoError(t, c.Initialize(context.Background())) + + r := &gortmplib.Reader{Conn: c} + require.NoError(t, r.Initialize()) + }() + + nconn, err := ln.Accept() + require.NoError(t, err) + defer nconn.Close() + + conn := &gortmplib.ServerConn{RW: nconn} + require.NoError(t, conn.Initialize()) + require.NoError(t, conn.Accept()) + + cl := &captureLogger{} + r := &stream.Reader{Parent: cl} + + require.NoError(t, FromStream(strm.Desc, r, conn, nconn, 5*time.Second)) + + strm.AddReader(r) + defer strm.RemoveReader(r) + + writeUnits(medias, subStream) + + cl.waitFor(t, expectedWarn) + + // Reader should still be alive: a permanent failure would have closed + // the channel with a real error before now. + select { + case err := <-r.Error(): + t.Fatalf("reader unexpectedly terminated: %v", err) + default: + } + + <-clientDone // best-effort wait so test cleanup doesn't race +} + +func TestFromStreamH264DTSExtractorRecovery(t *testing.T) { + medias := []*description.Media{{ + Type: description.MediaTypeVideo, + Formats: []format.Format{test.FormatH264}, + }} + + idr := unit.PayloadH264{test.FormatH264.SPS, test.FormatH264.PPS, {0x65}} + + writeUnits := func(medias []*description.Media, sub *stream.SubStream) { + // Prime the extractor with a valid IDR. + sub.WriteUnit(medias[0], medias[0].Formats[0], &unit.Unit{ + PTS: 90000, + Payload: idr, + }) + // Trigger DTS error (PTS goes backwards). The fix logs a warn, + // resets the extractor and returns nil so the reader survives. + sub.WriteUnit(medias[0], medias[0].Formats[0], &unit.Unit{ + PTS: 0, + Payload: idr, + }) + // Subsequent IDR re-primes the extractor. + sub.WriteUnit(medias[0], medias[0].Formats[0], &unit.Unit{ + PTS: 180000, + Payload: idr, + }) + } + + runRecoveryScenario(t, medias, writeUnits, "H264 DTS extractor reset") +} diff --git a/internal/recorder/format_fmp4.go b/internal/recorder/format_fmp4.go index f573e8594f1..ccad961f0f5 100644 --- a/internal/recorder/format_fmp4.go +++ b/internal/recorder/format_fmp4.go @@ -388,7 +388,11 @@ func (f *formatFMP4) initialize() bool { dts, err := dtsExtractor.Extract(u.Payload.(unit.PayloadH265), u.PTS) if err != nil { - return err + f.ri.Log(logger.Warn, + "H265 DTS extractor reset after failure: %s; "+ + "waiting for next random access frame", err) + dtsExtractor = nil + return nil } var sampl fmp4.Sample @@ -464,7 +468,11 @@ func (f *formatFMP4) initialize() bool { dts, err := dtsExtractor.Extract(u.Payload.(unit.PayloadH264), u.PTS) if err != nil { - return err + f.ri.Log(logger.Warn, + "H264 DTS extractor reset after failure: %s; "+ + "waiting for next random access frame", err) + dtsExtractor = nil + return nil } var sampl fmp4.Sample diff --git a/internal/recorder/format_mpegts.go b/internal/recorder/format_mpegts.go index 7eaafb92dab..88dd5ed9332 100644 --- a/internal/recorder/format_mpegts.go +++ b/internal/recorder/format_mpegts.go @@ -109,7 +109,11 @@ func (f *formatMPEGTS) initialize() bool { dts, err := dtsExtractor.Extract(u.Payload.(unit.PayloadH265), u.PTS) if err != nil { - return err + f.ri.Log(logger.Warn, + "H265 DTS extractor reset after failure: %s; "+ + "waiting for next random access frame", err) + dtsExtractor = nil + return nil } return track.write( @@ -151,7 +155,11 @@ func (f *formatMPEGTS) initialize() bool { dts, err := dtsExtractor.Extract(u.Payload.(unit.PayloadH264), u.PTS) if err != nil { - return err + f.ri.Log(logger.Warn, + "H264 DTS extractor reset after failure: %s; "+ + "waiting for next random access frame", err) + dtsExtractor = nil + return nil } return track.write( diff --git a/internal/recorder/recorder_test.go b/internal/recorder/recorder_test.go index e9af32da8de..4fcf3b74455 100644 --- a/internal/recorder/recorder_test.go +++ b/internal/recorder/recorder_test.go @@ -218,7 +218,10 @@ func TestRecorder(t *testing.T) { 52*90000, time.Date(2008, 5, 20, 22, 15, 27, 0, time.UTC)) - // simulate a write error + // stream discontinuity: a unit with a PTS that goes backwards. + // Previously this propagated as an error from the DTS extractor and + // killed the recorder, but the extractor now resets and logs a + // warning so the segment stays open. subStream.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.Unit{ PTS: 0, Payload: unit.PayloadH264{ @@ -226,10 +229,8 @@ func TestRecorder(t *testing.T) { }, }) - for range 2 { - <-segCreated - <-segDone - } + <-segCreated // segment 0 (2008-05-20_22-15-25) + <-segDone // segment 0 done (du = 2s) if ca == "fmp4" { var init fmp4.Init @@ -314,31 +315,19 @@ func TestRecorder(t *testing.T) { }, }, }, init) - - _, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-15-27-000000."+ext)) - require.NoError(t, err) } else { _, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-15-25-000000."+ext)) require.NoError(t, err) - - _, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-15-27-000000."+ext)) - require.NoError(t, err) } time.Sleep(50 * time.Millisecond) - writeToStream(subStream, - 300*90000, - time.Date(2010, 5, 20, 22, 15, 25, 0, time.UTC)) - - time.Sleep(50 * time.Millisecond) - w.Close() - <-segCreated - <-segDone + <-segCreated // segment 1 file is created lazily on close + <-segDone // segment 1 done (closed by w.Close) - _, err = os.Stat(filepath.Join(dir, "mypath", "2010-05-20_22-15-25-000000."+ext)) + _, err = os.Stat(filepath.Join(dir, "mypath", "2008-05-20_22-15-27-000000."+ext)) require.NoError(t, err) if ca == "fmp4" {