Skip to content

Commit

Permalink
Enable parallel MediaMTX playback (#3396)
Browse files Browse the repository at this point in the history
* Enable parallel MediaMTX playback

* Fix format

* DRY

* Switch to a custom multiwriter so that errors on one writer don't affect the others
  • Loading branch information
mjh1 authored Feb 18, 2025
1 parent 39db9b6 commit 232df3a
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 43 deletions.
112 changes: 78 additions & 34 deletions server/ai_live_video.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,34 @@ func startTricklePublish(ctx context.Context, url *url.URL, params aiRequestPara
clog.Infof(ctx, "trickle pub")
}

type multiWriter struct {
ctx context.Context
writers []io.Writer
isErrLogged bool
}

func (t *multiWriter) Write(p []byte) (n int, err error) {
success := false
for _, w := range t.writers {
bytesWritten, err := w.Write(p)
if err != nil {
if !t.isErrLogged {
clog.Errorf(t.ctx, "multiWriter error %v", err)
t.isErrLogged = true
}
} else {
success = true
n = bytesWritten
}
}
if !success {
// all writes failed, return the error
return 0, err
}

return n, nil
}

func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestParams, onFistSegment func()) {
// subscribe to the outputs and send them into LPMS
subscriber := trickle.NewTrickleSubscriber(url.String())
Expand All @@ -131,15 +159,24 @@ func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestPa
params.liveParams.stopPipeline(fmt.Errorf("error getting pipe for trickle-ffmpeg. url=%s %w", url, err))
return
}
rMediaMTX, wMediaMTX, err := os.Pipe()
if err != nil {
params.liveParams.stopPipeline(fmt.Errorf("error getting pipe for MediaMTX trickle-ffmpeg. url=%s %w", url, err))
return
}
ctx = clog.AddVal(ctx, "url", url.Redacted())
ctx = clog.AddVal(ctx, "outputRTMPURL", params.liveParams.outputRTMPURL)
ctx = clog.AddVal(ctx, "mediaMTXOutputRTMPURL", params.liveParams.mediaMTXOutputRTMPURL)

multiWriter := &multiWriter{ctx: ctx, writers: []io.Writer{w, wMediaMTX}}

// read segments from trickle subscription
go func() {
var err error
firstSegment := true

defer w.Close()
defer wMediaMTX.Close()
retries := 0
// we're trying to keep (retryPause x maxRetries) duration to fall within one output GOP length
const retryPause = 300 * time.Millisecond
Expand Down Expand Up @@ -178,7 +215,7 @@ func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestPa
seq := trickle.GetSeq(segment)
clog.V(8).Infof(ctx, "trickle subscribe read data received seq=%d", seq)

n, err := copySegment(segment, w)
n, err := copySegment(segment, multiWriter)
if err != nil {
params.liveParams.stopPipeline(fmt.Errorf("trickle subscribe error copying: %w", err))
return
Expand All @@ -191,43 +228,50 @@ func startTrickleSubscribe(ctx context.Context, url *url.URL, params aiRequestPa
}
}()

go func() {
defer func() {
r.Close()
if rec := recover(); rec != nil {
// panicked, so shut down the stream and handle it
err, ok := rec.(error)
if !ok {
err = errors.New("unknown error")
}
clog.Errorf(ctx, "LPMS panic err=%v", err)
params.liveParams.stopPipeline(fmt.Errorf("LPMS panic %w", err))
}
}()
for {
clog.V(6).Infof(ctx, "Starting output rtmp")
if !params.inputStreamExists() {
clog.Errorf(ctx, "Stopping output rtmp stream, input stream does not exist.")
break
}
// Studio Output ffmpeg process
go ffmpegOutput(ctx, params.liveParams.outputRTMPURL, r, params)

cmd := exec.Command("ffmpeg",
"-i", "pipe:0",
"-c:a", "copy",
"-c:v", "copy",
"-f", "flv",
params.liveParams.outputRTMPURL,
)
cmd.Stdin = r
output, err := cmd.CombinedOutput()
if err != nil {
clog.Errorf(ctx, "Error sending RTMP out: %v", err)
clog.Infof(ctx, "Process output: %s", output)
return
// MediaMTX Output ffmpeg process
go ffmpegOutput(ctx, params.liveParams.mediaMTXOutputRTMPURL, rMediaMTX, params)
}

func ffmpegOutput(ctx context.Context, outputUrl string, r io.ReadCloser, params aiRequestParams) {
ctx = clog.AddVal(ctx, "rtmpOut", outputUrl)
defer func() {
r.Close()
if rec := recover(); rec != nil {
// panicked, so shut down the stream and handle it
err, ok := rec.(error)
if !ok {
err = errors.New("unknown error")
}
time.Sleep(5 * time.Second)
clog.Errorf(ctx, "LPMS panic err=%v", err)
params.liveParams.stopPipeline(fmt.Errorf("LPMS panic %w", err))
}
}()
for {
clog.V(6).Infof(ctx, "Starting output rtmp")
if !params.inputStreamExists() {
clog.Errorf(ctx, "Stopping output rtmp stream, input stream does not exist.")
break
}

cmd := exec.Command("ffmpeg",
"-i", "pipe:0",
"-c:a", "copy",
"-c:v", "copy",
"-f", "flv",
outputUrl,
)
cmd.Stdin = r
output, err := cmd.CombinedOutput()
clog.Infof(ctx, "Process output: %s", output)
if err != nil {
clog.Errorf(ctx, "Error sending RTMP out: %v", err)
return
}
time.Sleep(5 * time.Second)
}
}

func copySegment(segment *http.Response, w io.Writer) (int64, error) {
Expand Down
5 changes: 4 additions & 1 deletion server/ai_mediaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,10 +428,12 @@ func (ls *LivepeerServer) StartLiveVideo() http.Handler {
}
// If auth webhook is set and returns an output URL, this will be replaced
outputURL := qp.Get("rtmpOutput")

mediaMTXOutputURL := fmt.Sprintf("rtmp://%s/aiWebrtc/%s-out", remoteHost, streamName)
if outputURL == "" {
// re-publish to ourselves for now
// Not sure if we want this to be permanent
outputURL = fmt.Sprintf("rtmp://%s/%s-out", remoteHost, streamName)
outputURL = mediaMTXOutputURL
}

// convention to avoid re-subscribing to our own streams
Expand Down Expand Up @@ -556,6 +558,7 @@ func (ls *LivepeerServer) StartLiveVideo() http.Handler {
liveParams: liveRequestParams{
segmentReader: ssr,
outputRTMPURL: outputURL,
mediaMTXOutputRTMPURL: mediaMTXOutputURL,
stream: streamName,
paymentProcessInterval: ls.livePaymentInterval,
requestID: requestID,
Expand Down
13 changes: 7 additions & 6 deletions server/ai_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,13 @@ func (a aiRequestParams) inputStreamExists() bool {

// For live video pipelines
type liveRequestParams struct {
segmentReader *media.SwitchableSegmentReader
outputRTMPURL string
stream string
requestID string
streamID string
pipelineID string
segmentReader *media.SwitchableSegmentReader
outputRTMPURL string
mediaMTXOutputRTMPURL string
stream string
requestID string
streamID string
pipelineID string

paymentProcessInterval time.Duration

Expand Down
2 changes: 0 additions & 2 deletions server/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,6 @@ type AIAuthRequest struct {

// Gateway host
GatewayHost string `json:"gateway_host"`

// TODO not sure what params we need yet
}

// Contains the configuration parameters for this AI job
Expand Down

0 comments on commit 232df3a

Please sign in to comment.