Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions internal/protocols/mpegts/from_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ func FromStream(

dts, err := dtsExtractor.Extract(u.Payload.(unit.PayloadH265), u.PTS)
if err != nil {
return err
r.Parent.Log(logger.Warn,
"H265 DTS extractor reset after failure: %s; "+
"waiting for next random access frame", err)
dtsExtractor = nil
return nil
}

sconn.SetWriteDeadline(time.Now().Add(writeTimeout))
Expand Down Expand Up @@ -120,7 +124,11 @@ func FromStream(

dts, err := dtsExtractor.Extract(u.Payload.(unit.PayloadH264), u.PTS)
if err != nil {
return err
r.Parent.Log(logger.Warn,
"H264 DTS extractor reset after failure: %s; "+
"waiting for next random access frame", err)
dtsExtractor = nil
return nil
}

sconn.SetWriteDeadline(time.Now().Add(writeTimeout))
Expand Down
135 changes: 135 additions & 0 deletions internal/protocols/mpegts/from_stream_recovery_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package mpegts

import (
"bufio"
"bytes"
"fmt"
"reflect"
"strings"
"testing"
"time"
"unsafe"

"github.com/bluenviron/gortsplib/v5/pkg/description"
"github.com/bluenviron/gortsplib/v5/pkg/format"
srt "github.com/datarhei/gosrt"
"github.com/stretchr/testify/require"

"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/stream"
"github.com/bluenviron/mediamtx/internal/test"
"github.com/bluenviron/mediamtx/internal/unit"
)

// fakeSRTConn implements only the SetWriteDeadline method used by the
// FromStream callbacks; all other methods inherit from a nil embedded
// interface and would panic if invoked. This is intentional: the tests below
// only exercise code paths that touch SetWriteDeadline (or do not touch
// sconn at all).
type fakeSRTConn struct {
srt.Conn
}

func (fakeSRTConn) SetWriteDeadline(_ time.Time) error { return nil }

// extractOnData reaches into the unexported onDatas map of stream.Reader to
// retrieve the callback registered by FromStream. This avoids spinning up a
// full async stream pipeline and keeps the recovery tests focused.
func extractOnData(
t *testing.T,
r *stream.Reader,
medi *description.Media,
forma format.Format,
) stream.OnDataFunc {
t.Helper()
rv := reflect.ValueOf(r).Elem()
field := rv.FieldByName("onDatas")
require.True(t, field.IsValid(), "onDatas field not found")
mapVal := reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).Elem()

formats := mapVal.MapIndex(reflect.ValueOf(medi))
require.True(t, formats.IsValid(), "no callbacks registered for media")
cb := formats.MapIndex(reflect.ValueOf(forma))
require.True(t, cb.IsValid(), "no callback registered for format")
return cb.Interface().(stream.OnDataFunc)
}

func TestFromStreamH264DTSExtractorRecovery(t *testing.T) {
medi := &description.Media{
Type: description.MediaTypeVideo,
Formats: []format.Format{test.FormatH264},
}
desc := &description.Session{Medias: []*description.Media{medi}}

var warnLogs []string
r := &stream.Reader{
Parent: test.Logger(func(l logger.Level, format string, args ...any) {
if l == logger.Warn {
warnLogs = append(warnLogs, fmt.Sprintf(format, args...))
}
}),
}

var buf bytes.Buffer
bw := bufio.NewWriter(&buf)

err := FromStream(desc, r, bw, fakeSRTConn{}, time.Second)
require.NoError(t, err)

cb := extractOnData(t, r, medi, medi.Formats[0])

idr := unit.PayloadH264{test.FormatH264.SPS, test.FormatH264.PPS, {0x65}}

// 1) Prime the extractor with a valid IDR.
require.NoError(t, cb(&unit.Unit{PTS: 90000, Payload: idr}))

// 2) Send a unit whose PTS goes backwards. Before the fix, the DTS
// extractor returned an error here that propagated out of the reader
// and tore down the SRT receiver. The fix logs a warning, resets the
// extractor, and returns nil so the connection survives.
require.NoError(t, cb(&unit.Unit{PTS: 0, Payload: idr}))

require.NotEmpty(t, warnLogs, "expected a warn log after the DTS error")
require.True(t, strings.Contains(warnLogs[0], "H264 DTS extractor reset"),
"unexpected warn log: %q", warnLogs[0])

// 3) The next valid IDR should re-prime the extractor and succeed.
require.NoError(t, cb(&unit.Unit{PTS: 180000, Payload: idr}))
}

func TestFromStreamH265DTSExtractorRecovery(t *testing.T) {
medi := &description.Media{
Type: description.MediaTypeVideo,
Formats: []format.Format{test.FormatH265},
}
desc := &description.Session{Medias: []*description.Media{medi}}

var warnLogs []string
r := &stream.Reader{
Parent: test.Logger(func(l logger.Level, format string, args ...any) {
if l == logger.Warn {
warnLogs = append(warnLogs, fmt.Sprintf(format, args...))
}
}),
}

var buf bytes.Buffer
bw := bufio.NewWriter(&buf)

err := FromStream(desc, r, bw, fakeSRTConn{}, time.Second)
require.NoError(t, err)

cb := extractOnData(t, r, medi, medi.Formats[0])

// IDR (NAL unit type 19, IDR_W_RADL) prefixed with H265 NAL header.
idr := unit.PayloadH265{{0x26, 0x01, 0xaf, 0x08, 0x42, 0x23, 0x48, 0x8a, 0x43, 0xe2}}

require.NoError(t, cb(&unit.Unit{PTS: 90000, Payload: idr}))
require.NoError(t, cb(&unit.Unit{PTS: 0, Payload: idr}))

require.NotEmpty(t, warnLogs, "expected a warn log after the DTS error")
require.True(t, strings.Contains(warnLogs[0], "H265 DTS extractor reset"),
"unexpected warn log: %q", warnLogs[0])

require.NoError(t, cb(&unit.Unit{PTS: 180000, Payload: idr}))
}
12 changes: 10 additions & 2 deletions internal/protocols/rtmp/from_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,11 @@ func FromStream(

dts, err := videoDTSExtractor.Extract(u.Payload.(unit.PayloadH265), u.PTS)
if err != nil {
return err
r.Parent.Log(logger.Warn,
"H265 DTS extractor reset after failure: %s; "+
"waiting for next random access frame", err)
videoDTSExtractor = nil
return nil
}

nconn.SetWriteDeadline(time.Now().Add(writeTimeout))
Expand Down Expand Up @@ -196,7 +200,11 @@ func FromStream(

dts, err := videoDTSExtractor.Extract(u.Payload.(unit.PayloadH264), u.PTS)
if err != nil {
return err
r.Parent.Log(logger.Warn,
"H264 DTS extractor reset after failure: %s; "+
"waiting for next random access frame", err)
videoDTSExtractor = nil
return nil
}

nconn.SetWriteDeadline(time.Now().Add(writeTimeout))
Expand Down
161 changes: 161 additions & 0 deletions internal/protocols/rtmp/from_stream_recovery_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package rtmp

import (
"context"
"fmt"
"net"
"net/url"
"strings"
"sync"
"testing"
"time"

"github.com/bluenviron/gortmplib"
"github.com/bluenviron/gortsplib/v5/pkg/description"
"github.com/bluenviron/gortsplib/v5/pkg/format"
"github.com/stretchr/testify/require"

"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/stream"
"github.com/bluenviron/mediamtx/internal/test"
"github.com/bluenviron/mediamtx/internal/unit"
)

// captureLogger collects warn-level log lines so the recovery tests can assert
// the DTS extractor reset message was emitted.
type captureLogger struct {
mu sync.Mutex
logs []string
}

func (c *captureLogger) Log(level logger.Level, format string, args ...any) {
if level != logger.Warn {
return
}
c.mu.Lock()
c.logs = append(c.logs, fmt.Sprintf(format, args...))
c.mu.Unlock()
}

func (c *captureLogger) waitFor(t *testing.T, substr string) {
t.Helper()
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
c.mu.Lock()
for _, l := range c.logs {
if strings.Contains(l, substr) {
c.mu.Unlock()
return
}
}
c.mu.Unlock()
time.Sleep(20 * time.Millisecond)
}
c.mu.Lock()
defer c.mu.Unlock()
t.Fatalf("expected warn log containing %q, got %v", substr, c.logs)
}

// runRecoveryScenario sets up a real RTMP server <-> client pair so that
// FromStream's writer can flush packets, then runs writeUnits and waits for
// the expected warn message. The client only consumes track metadata; the
// individual frames are simply drained by the underlying TCP socket.
func runRecoveryScenario(
t *testing.T,
medias []*description.Media,
writeUnits func(medias []*description.Media, sub *stream.SubStream),
expectedWarn string,
) {
t.Helper()

strm := &stream.Stream{
Desc: &description.Session{Medias: medias},
WriteQueueSize: 512,
RTPMaxPayloadSize: 1450,
Parent: test.NilLogger,
}
require.NoError(t, strm.Initialize())
defer strm.Close()

subStream := &stream.SubStream{Stream: strm, UseRTPPackets: false}
require.NoError(t, subStream.Initialize())

ln, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
defer ln.Close()

clientDone := make(chan struct{})

go func() {
defer close(clientDone)

u, perr := url.Parse("rtmp://" + ln.Addr().String() + "/stream")
require.NoError(t, perr)

c := &gortmplib.Client{URL: u}
require.NoError(t, c.Initialize(context.Background()))

r := &gortmplib.Reader{Conn: c}
require.NoError(t, r.Initialize())
}()

nconn, err := ln.Accept()
require.NoError(t, err)
defer nconn.Close()

conn := &gortmplib.ServerConn{RW: nconn}
require.NoError(t, conn.Initialize())
require.NoError(t, conn.Accept())

cl := &captureLogger{}
r := &stream.Reader{Parent: cl}

require.NoError(t, FromStream(strm.Desc, r, conn, nconn, 5*time.Second))

strm.AddReader(r)
defer strm.RemoveReader(r)

writeUnits(medias, subStream)

cl.waitFor(t, expectedWarn)

// Reader should still be alive: a permanent failure would have closed
// the channel with a real error before now.
select {
case err := <-r.Error():

Check failure on line 125 in internal/protocols/rtmp/from_stream_recovery_test.go

View workflow job for this annotation

GitHub Actions / go

shadow: declaration of "err" shadows declaration at line 83 (govet)
t.Fatalf("reader unexpectedly terminated: %v", err)
default:
}

<-clientDone // best-effort wait so test cleanup doesn't race
}

func TestFromStreamH264DTSExtractorRecovery(t *testing.T) {
medias := []*description.Media{{
Type: description.MediaTypeVideo,
Formats: []format.Format{test.FormatH264},
}}

idr := unit.PayloadH264{test.FormatH264.SPS, test.FormatH264.PPS, {0x65}}

writeUnits := func(medias []*description.Media, sub *stream.SubStream) {
// Prime the extractor with a valid IDR.
sub.WriteUnit(medias[0], medias[0].Formats[0], &unit.Unit{
PTS: 90000,
Payload: idr,
})
// Trigger DTS error (PTS goes backwards). The fix logs a warn,
// resets the extractor and returns nil so the reader survives.
sub.WriteUnit(medias[0], medias[0].Formats[0], &unit.Unit{
PTS: 0,
Payload: idr,
})
// Subsequent IDR re-primes the extractor.
sub.WriteUnit(medias[0], medias[0].Formats[0], &unit.Unit{
PTS: 180000,
Payload: idr,
})
}

runRecoveryScenario(t, medias, writeUnits, "H264 DTS extractor reset")
}
12 changes: 10 additions & 2 deletions internal/recorder/format_fmp4.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,11 @@ func (f *formatFMP4) initialize() bool {

dts, err := dtsExtractor.Extract(u.Payload.(unit.PayloadH265), u.PTS)
if err != nil {
return err
f.ri.Log(logger.Warn,
"H265 DTS extractor reset after failure: %s; "+
"waiting for next random access frame", err)
dtsExtractor = nil
return nil
}

var sampl fmp4.Sample
Expand Down Expand Up @@ -464,7 +468,11 @@ func (f *formatFMP4) initialize() bool {

dts, err := dtsExtractor.Extract(u.Payload.(unit.PayloadH264), u.PTS)
if err != nil {
return err
f.ri.Log(logger.Warn,
"H264 DTS extractor reset after failure: %s; "+
"waiting for next random access frame", err)
dtsExtractor = nil
return nil
}

var sampl fmp4.Sample
Expand Down
Loading
Loading