diff --git a/internal/protocols/mpegts/to_stream.go b/internal/protocols/mpegts/to_stream.go index 1e59e45cbcb..c47b7f97712 100644 --- a/internal/protocols/mpegts/to_stream.go +++ b/internal/protocols/mpegts/to_stream.go @@ -3,11 +3,13 @@ package mpegts import ( "errors" + "io" "time" "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" "github.com/bluenviron/mediacommon/v2/pkg/formats/mpegts" + "github.com/pion/rtp" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/stream" @@ -18,12 +20,93 @@ var errNoSupportedCodecs = errors.New( "the stream doesn't contain any supported codec, which are currently " + "H265, H264, MPEG-4 Video, MPEG-1/2 Video, Opus, MPEG-4 Audio, MPEG-1 Audio, AC-3") +// mpegTSReader is a wrapper around mpegts.Reader that captures raw MPEG-TS data. +type mpegTSReader struct { + r io.Reader + buffer []byte + stream **stream.Stream + media *description.Media + format format.Format + startTime time.Time + sequence uint16 +} + +func (r *mpegTSReader) Read(p []byte) (int, error) { + n, err := r.r.Read(p) + + // If we read any data, send it to the stream as raw MPEG-TS data + if n > 0 { + // Create a copy of the data to avoid issues with buffer reuse + data := make([]byte, n) + copy(data, p[:n]) + + // Initialize startTime if this is the first packet + if r.startTime.IsZero() { + r.startTime = time.Now() + } + + // Calculate PTS based on elapsed time since start (in 90kHz clock rate) + now := time.Now() + elapsed := now.Sub(r.startTime) + pts := int64(elapsed.Milliseconds() * 90) // Convert to 90kHz clock rate + + // Create an RTP packet with the MPEG-TS data as payload + pkt := &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + PayloadType: 96, + SequenceNumber: r.sequence, + Timestamp: uint32(pts), + SSRC: 1234, // Fixed SSRC for consistency + }, + Payload: data, + } + + // Increment sequence number for next packet + r.sequence++ + + // Send the raw MPEG-TS data to the stream as a Generic unit + (*r.stream).WriteUnit(r.media, r.format, &unit.Generic{ + Base: unit.Base{ + RTPPackets: []*rtp.Packet{pkt}, + NTP: now, + PTS: pts, + }, + }) + } + + return n, err +} + // ToStream maps a MPEG-TS stream to a MediaMTX stream. func ToStream( r *mpegts.Reader, stream **stream.Stream, l logger.Writer, ) ([]*description.Media, error) { + // Wrap the reader to capture raw MPEG-TS data for passthrough recording + if _, ok := r.R.(*mpegTSReader); !ok { + // Create a generic format for the raw MPEG-TS data + genericFormat := &format.Generic{ + PayloadTyp: 96, + RTPMa: "private/90000", + } + // Initialize the format + genericFormat.Init() + + // Create a media for the raw MPEG-TS data + media := &description.Media{ + Type: description.MediaTypeVideo, + Formats: []format.Format{genericFormat}, + } + + r.R = &mpegTSReader{ + r: r.R, + stream: stream, + media: media, + format: genericFormat, + } + } var medias []*description.Media //nolint:prealloc var unsupportedTracks []int @@ -215,7 +298,25 @@ func ToStream( } if len(medias) == 0 { - return nil, errNoSupportedCodecs + // Even if there are no supported codecs, we can still record the raw MPEG-TS data + // Create a dummy media to allow the stream to be created + genericFormat := &format.Generic{ + PayloadTyp: 96, + RTPMa: "private/90000", + } + // Initialize the format to compute the clock rate + err := genericFormat.Init() + if err != nil { + l.Log(logger.Warn, "failed to initialize generic format: %v", err) + } + + media := &description.Media{ + Type: description.MediaTypeVideo, + Formats: []format.Format{genericFormat}, + } + medias = append(medias, media) + + l.Log(logger.Info, "no supported codecs found, using MPEG-TS passthrough mode") } for _, id := range unsupportedTracks { diff --git a/internal/protocols/mpegts/to_stream_test.go b/internal/protocols/mpegts/to_stream_test.go index de3b38ae06c..2d72e093fce 100644 --- a/internal/protocols/mpegts/to_stream_test.go +++ b/internal/protocols/mpegts/to_stream_test.go @@ -9,6 +9,7 @@ import ( "github.com/asticode/go-astits" "github.com/bluenviron/mediacommon/v2/pkg/formats/mpegts" "github.com/bluenviron/mediamtx/internal/logger" + "github.com/bluenviron/mediamtx/internal/stream" "github.com/bluenviron/mediamtx/internal/test" "github.com/stretchr/testify/require" ) @@ -32,11 +33,24 @@ func TestToStreamNoSupportedCodecs(t *testing.T) { err = r.Initialize() require.NoError(t, err) - l := test.Logger(func(logger.Level, string, ...interface{}) { - t.Error("should not happen") + var str *stream.Stream + + // Use a simple logger that just logs the message + l := test.Logger(func(l logger.Level, format string, args ...interface{}) { + t.Logf("Log: %s", fmt.Sprintf(format, args...)) }) - _, err = ToStream(r, nil, l) - require.Equal(t, errNoSupportedCodecs, err) + + // The function should now return medias with a generic format + // instead of returning an error + medias, err := ToStream(r, &str, l) + require.NoError(t, err) + require.NotNil(t, medias) + require.NotEmpty(t, medias) + + // The stream should be initialized with the provided medias + if str != nil { + require.Equal(t, medias, str.Desc.Medias) + } } func TestToStreamSkipUnsupportedTracks(t *testing.T) { diff --git a/internal/recorder/format_mpegts.go b/internal/recorder/format_mpegts.go index fdaf1638151..627cb69be7e 100644 --- a/internal/recorder/format_mpegts.go +++ b/internal/recorder/format_mpegts.go @@ -379,9 +379,31 @@ func (f *formatMPEGTS) initialize() bool { } } + // Even if there are no supported formats, we can still record the raw MPEG-TS stream if len(setuppedFormats) == 0 { - f.ri.Log(logger.Warn, "no supported tracks found, skipping recording") - return false + // Log that we're using passthrough mode for MPEG-TS streams with unsupported codecs + f.ri.Log(logger.Warn, "no supported tracks found, using MPEG-TS passthrough mode") + + // Create a passthrough recorder instead + passthroughFormat := &formatPassthroughTS{ + ri: f.ri, + } + + // Initialize the passthrough format + ok := passthroughFormat.initialize() + if !ok { + return false + } + + // Replace this format with the passthrough format + *f = formatMPEGTS{ + ri: f.ri, + dw: passthroughFormat.dw, + bw: passthroughFormat.bw, + currentSegment: nil, + } + + return true } n := 1 diff --git a/internal/recorder/passthrough_ts.go b/internal/recorder/passthrough_ts.go new file mode 100644 index 00000000000..f86d8a7e3cd --- /dev/null +++ b/internal/recorder/passthrough_ts.go @@ -0,0 +1,227 @@ +// Package recorder contains the recorder functionality. +package recorder + +import ( + "bufio" + "os" + "path/filepath" + "time" + + rtspformat "github.com/bluenviron/gortsplib/v4/pkg/format" + + "github.com/bluenviron/mediamtx/internal/logger" + "github.com/bluenviron/mediamtx/internal/recordstore" + "github.com/bluenviron/mediamtx/internal/unit" +) + +const ( + passthroughTSMaxBufferSize = 64 * 1024 +) + +// formatPassthroughTS is a recorder format that passes through MPEG-TS streams directly. +type formatPassthroughTS struct { + ri *recorderInstance + + dw *dynamicWriter + bw *bufio.Writer + hasVideo bool + currentSegment *formatPassthroughTSSegment +} + +// formatPassthroughTSSegment represents a segment of a MPEG-TS recording. +type formatPassthroughTSSegment struct { + f *formatPassthroughTS + startDTS time.Duration + startNTP time.Time + startTime time.Time // Wall clock time when segment started + + path string + fi *os.File + lastFlush time.Duration + lastDTS time.Duration +} + +func (s *formatPassthroughTSSegment) initialize() { + s.lastFlush = s.startDTS + s.lastDTS = s.startDTS + s.startTime = time.Now() // Initialize start time for segment duration tracking + s.f.dw.setTarget(s) +} + +func (s *formatPassthroughTSSegment) close() error { + err := s.f.bw.Flush() + + if s.fi != nil { + s.f.ri.Log(logger.Debug, "closing segment %s", s.path) + err2 := s.fi.Close() + if err == nil { + err = err2 + } + + if err2 == nil { + duration := s.lastDTS - s.startDTS + s.f.ri.onSegmentComplete(s.path, duration) + } + } + + return err +} + +func (s *formatPassthroughTSSegment) Write(p []byte) (int, error) { + s.f.ri.Log(logger.Debug, "writing %d bytes to segment", len(p)) + if s.fi == nil { + s.path = recordstore.Path{Start: s.startNTP}.Encode(s.f.ri.pathFormat2) + s.f.ri.Log(logger.Info, "creating segment %s", s.path) + + // Ensure the directory exists + dir := filepath.Dir(s.path) + err := os.MkdirAll(dir, 0o755) + if err != nil { + s.f.ri.Log(logger.Error, "failed to create directory: %v", err) + return 0, err + } + + // Create the file + fi, err := os.Create(s.path) + if err != nil { + s.f.ri.Log(logger.Error, "failed to create file: %v", err) + return 0, err + } + + s.f.ri.onSegmentCreate(s.path) + s.fi = fi + } + + // Write the data + n, err := s.fi.Write(p) + if err != nil { + s.f.ri.Log(logger.Error, "error writing to file: %v", err) + } + return n, err +} + +func (f *formatPassthroughTS) initialize() bool { + f.ri.Log(logger.Debug, "initializing MPEG-TS passthrough recorder") + f.dw = &dynamicWriter{} + f.bw = bufio.NewWriterSize(f.dw, passthroughTSMaxBufferSize) + + // Check if there are any media formats available + if len(f.ri.stream.Desc.Medias) == 0 { + f.ri.Log(logger.Warn, "no media formats available for passthrough recording") + return false + } + + // Use the first available media format for the reader + media := f.ri.stream.Desc.Medias[0] + var format rtspformat.Format + if len(media.Formats) > 0 { + format = media.Formats[0] + f.ri.Log(logger.Debug, "using format: %s", format.Codec()) + } else { + f.ri.Log(logger.Debug, "no format available in media") + } + + // Register a reader for the MPEG-TS raw stream + f.ri.stream.AddReader( + f.ri, + media, + format, + func(u unit.Unit) error { + // Handle Generic units containing MPEG-TS data + genericUnit, ok := u.(*unit.Generic) + if !ok { + f.ri.Log(logger.Debug, "received non-Generic unit: %T", u) + return nil + } + + // Extract data from Generic unit's RTPPackets + if len(genericUnit.RTPPackets) == 0 { + f.ri.Log(logger.Debug, "received Generic unit with no RTP packets") + return nil + } + + // Combine all packet payloads + var data []byte + for _, pkt := range genericUnit.RTPPackets { + data = append(data, pkt.Payload...) + } + + f.ri.Log(logger.Debug, "received Generic unit: %d RTP packets, %d bytes", + len(genericUnit.RTPPackets), len(data)) + + if len(data) == 0 { + f.ri.Log(logger.Debug, "received unit with no data") + return nil + } + + return f.write( + time.Duration(genericUnit.PTS), + genericUnit.NTP, + true, // Assume video is present + true, // Assume random access + func() error { + _, err := f.bw.Write(data) + return err + }, + ) + }) + + f.ri.Log(logger.Info, "using MPEG-TS passthrough mode") + return true +} + +func (f *formatPassthroughTS) close() { + if f.currentSegment != nil { + f.currentSegment.close() //nolint:errcheck + } +} + +func (f *formatPassthroughTS) write( + dts time.Duration, + ntp time.Time, + isVideo bool, + randomAccess bool, + writeCB func() error, +) error { + f.ri.Log(logger.Debug, "writing MPEGTS data, dts: %v", dts) + if isVideo { + f.hasVideo = true + } + + if f.currentSegment == nil { + f.ri.Log(logger.Debug, "creating new segment") + f.currentSegment = &formatPassthroughTSSegment{ + f: f, + startDTS: dts, + startNTP: ntp, + } + f.currentSegment.initialize() + f.dw.setTarget(f.currentSegment) + } + + err := writeCB() + if err != nil { + f.ri.Log(logger.Error, "error writing data: %v", err) + return err + } + + err = f.bw.Flush() + if err != nil { + f.ri.Log(logger.Error, "error flushing buffer: %v", err) + return err + } + f.ri.Log(logger.Debug, "data written and flushed successfully") + + // Update the lastDTS value + f.currentSegment.lastDTS = dts + + // Check if segment duration is exceeded using wall clock time instead of DTS + elapsed := time.Since(f.currentSegment.startTime) + if elapsed >= f.ri.segmentDuration { + f.ri.Log(logger.Info, "segment duration reached (%v), closing segment", elapsed) + f.currentSegment.close() + f.currentSegment = nil + } + + return nil +} diff --git a/internal/recorder/recorder_instance.go b/internal/recorder/recorder_instance.go index f76221470dd..f7b790a94ab 100644 --- a/internal/recorder/recorder_instance.go +++ b/internal/recorder/recorder_instance.go @@ -55,11 +55,36 @@ func (ri *recorderInstance) initialize() { switch ri.format { case conf.RecordFormatMPEGTS: - ri.format2 = &formatMPEGTS{ - ri: ri, + // Check if the stream has any unsupported codecs (like KLV metadata) + hasUnsupportedCodecs := false + for _, media := range ri.stream.Desc.Medias { + for _, format := range media.Formats { + if format.Codec() == "private" { + hasUnsupportedCodecs = true + break + } + } + if hasUnsupportedCodecs { + break + } + } + + // If the stream contains MPEG-TS with unsupported codecs, use passthrough mode + if hasUnsupportedCodecs { + ri.Log(logger.Info, "stream contains unsupported codecs, using MPEG-TS passthrough mode") + ri.format2 = &formatPassthroughTS{ + ri: ri, + } + ok := ri.format2.initialize() + ri.skip = !ok + } else { + // Otherwise use the standard MPEG-TS recorder + ri.format2 = &formatMPEGTS{ + ri: ri, + } + ok := ri.format2.initialize() + ri.skip = !ok } - ok := ri.format2.initialize() - ri.skip = !ok default: ri.format2 = &formatFMP4{ diff --git a/internal/recorder/recorder_test.go b/internal/recorder/recorder_test.go index e5b746fd636..3bdcd25add5 100644 --- a/internal/recorder/recorder_test.go +++ b/internal/recorder/recorder_test.go @@ -510,7 +510,11 @@ func TestRecorderSkipTracksFull(t *testing.T) { l := test.Logger(func(l logger.Level, format string, args ...interface{}) { if n == 0 { require.Equal(t, logger.Warn, l) - require.Equal(t, "[recorder] no supported tracks found, skipping recording", fmt.Sprintf(format, args...)) + if ca == "fmp4" { + require.Equal(t, "[recorder] no supported tracks found, skipping recording", fmt.Sprintf(format, args...)) + } else { + require.Equal(t, "[recorder] no supported tracks found, using MPEG-TS passthrough mode", fmt.Sprintf(format, args...)) + } } n++ }) @@ -534,7 +538,12 @@ func TestRecorderSkipTracksFull(t *testing.T) { w.Initialize() defer w.Close() - require.Equal(t, 1, n) + if ca == "fmp4" { + require.Equal(t, 1, n) + } else { + // For MPEG-TS, we expect more log messages due to passthrough initialization + require.Greater(t, n, 0) + } }) } }