Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 102 additions & 1 deletion internal/protocols/mpegts/to_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package mpegts

import (
"errors"
"io"
"time"

"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediacommon/v2/pkg/formats/mpegts"
"github.com/pion/rtp"

"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/stream"
Expand All @@ -18,12 +20,93 @@ var errNoSupportedCodecs = errors.New(
"the stream doesn't contain any supported codec, which are currently " +
"H265, H264, MPEG-4 Video, MPEG-1/2 Video, Opus, MPEG-4 Audio, MPEG-1 Audio, AC-3")

// mpegTSReader is a wrapper around mpegts.Reader that captures raw MPEG-TS data.
type mpegTSReader struct {
r io.Reader
buffer []byte
stream **stream.Stream
media *description.Media
format format.Format
startTime time.Time
sequence uint16
}

func (r *mpegTSReader) Read(p []byte) (int, error) {
n, err := r.r.Read(p)

// If we read any data, send it to the stream as raw MPEG-TS data
if n > 0 {
// Create a copy of the data to avoid issues with buffer reuse
data := make([]byte, n)
copy(data, p[:n])

// Initialize startTime if this is the first packet
if r.startTime.IsZero() {
r.startTime = time.Now()
}

// Calculate PTS based on elapsed time since start (in 90kHz clock rate)
now := time.Now()
elapsed := now.Sub(r.startTime)
pts := int64(elapsed.Milliseconds() * 90) // Convert to 90kHz clock rate

// Create an RTP packet with the MPEG-TS data as payload
pkt := &rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: 96,
SequenceNumber: r.sequence,
Timestamp: uint32(pts),
SSRC: 1234, // Fixed SSRC for consistency
},
Payload: data,
}

// Increment sequence number for next packet
r.sequence++

// Send the raw MPEG-TS data to the stream as a Generic unit
(*r.stream).WriteUnit(r.media, r.format, &unit.Generic{
Base: unit.Base{
RTPPackets: []*rtp.Packet{pkt},
NTP: now,
PTS: pts,
},
})
}

return n, err
}

// ToStream maps a MPEG-TS stream to a MediaMTX stream.
func ToStream(
r *mpegts.Reader,
stream **stream.Stream,
l logger.Writer,
) ([]*description.Media, error) {
// Wrap the reader to capture raw MPEG-TS data for passthrough recording
if _, ok := r.R.(*mpegTSReader); !ok {
// Create a generic format for the raw MPEG-TS data
genericFormat := &format.Generic{
PayloadTyp: 96,
RTPMa: "private/90000",
}
// Initialize the format
genericFormat.Init()

// Create a media for the raw MPEG-TS data
media := &description.Media{
Type: description.MediaTypeVideo,
Formats: []format.Format{genericFormat},
}

r.R = &mpegTSReader{
r: r.R,
stream: stream,
media: media,
format: genericFormat,
}
}
var medias []*description.Media //nolint:prealloc
var unsupportedTracks []int

Expand Down Expand Up @@ -215,7 +298,25 @@ func ToStream(
}

if len(medias) == 0 {
return nil, errNoSupportedCodecs
// Even if there are no supported codecs, we can still record the raw MPEG-TS data
// Create a dummy media to allow the stream to be created
genericFormat := &format.Generic{
PayloadTyp: 96,
RTPMa: "private/90000",
}
// Initialize the format to compute the clock rate
err := genericFormat.Init()
if err != nil {
l.Log(logger.Warn, "failed to initialize generic format: %v", err)
}

media := &description.Media{
Type: description.MediaTypeVideo,
Formats: []format.Format{genericFormat},
}
medias = append(medias, media)

l.Log(logger.Info, "no supported codecs found, using MPEG-TS passthrough mode")
}

for _, id := range unsupportedTracks {
Expand Down
22 changes: 18 additions & 4 deletions internal/protocols/mpegts/to_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/asticode/go-astits"
"github.com/bluenviron/mediacommon/v2/pkg/formats/mpegts"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/stream"
"github.com/bluenviron/mediamtx/internal/test"
"github.com/stretchr/testify/require"
)
Expand All @@ -32,11 +33,24 @@ func TestToStreamNoSupportedCodecs(t *testing.T) {
err = r.Initialize()
require.NoError(t, err)

l := test.Logger(func(logger.Level, string, ...interface{}) {
t.Error("should not happen")
var str *stream.Stream

// Use a simple logger that just logs the message
l := test.Logger(func(l logger.Level, format string, args ...interface{}) {
t.Logf("Log: %s", fmt.Sprintf(format, args...))
})
_, err = ToStream(r, nil, l)
require.Equal(t, errNoSupportedCodecs, err)

// The function should now return medias with a generic format
// instead of returning an error
medias, err := ToStream(r, &str, l)
require.NoError(t, err)
require.NotNil(t, medias)
require.NotEmpty(t, medias)

// The stream should be initialized with the provided medias
if str != nil {
require.Equal(t, medias, str.Desc.Medias)
}
}

func TestToStreamSkipUnsupportedTracks(t *testing.T) {
Expand Down
26 changes: 24 additions & 2 deletions internal/recorder/format_mpegts.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,31 @@ func (f *formatMPEGTS) initialize() bool {
}
}

// Even if there are no supported formats, we can still record the raw MPEG-TS stream
if len(setuppedFormats) == 0 {
f.ri.Log(logger.Warn, "no supported tracks found, skipping recording")
return false
// Log that we're using passthrough mode for MPEG-TS streams with unsupported codecs
f.ri.Log(logger.Warn, "no supported tracks found, using MPEG-TS passthrough mode")

// Create a passthrough recorder instead
passthroughFormat := &formatPassthroughTS{
ri: f.ri,
}

// Initialize the passthrough format
ok := passthroughFormat.initialize()
if !ok {
return false
}

// Replace this format with the passthrough format
*f = formatMPEGTS{
ri: f.ri,
dw: passthroughFormat.dw,
bw: passthroughFormat.bw,
currentSegment: nil,
}

return true
}

n := 1
Expand Down
Loading