Skip to content

Commit

Permalink
core,server: Gracefully notify orchestrator in case of a panic in tra…
Browse files Browse the repository at this point in the history
…nscoder
  • Loading branch information
leszko authored Nov 9, 2021
1 parent 9c0d0b5 commit f161de2
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#### Orchestrator

#### Transcoder
- \#2094 Gracefully notify orchestrator in case of a panic in transcoder (@leszko)

### Bug Fixes 🐞

Expand Down
27 changes: 25 additions & 2 deletions core/transcoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,20 @@ type LocalTranscoder struct {
workDir string
}

type UnrecoverableError struct {
error
}

func NewUnrecoverableError(err error) UnrecoverableError {
return UnrecoverableError{err}
}

var WorkDir string

func (lt *LocalTranscoder) Transcode(md *SegTranscodingMetadata) (*TranscodeData, error) {
func (lt *LocalTranscoder) Transcode(md *SegTranscodingMetadata) (td *TranscodeData, retErr error) {
// Returns UnrecoverableError instead of panicking to gracefully notify orchestrator about transcoder's failure
defer recoverFromPanic(&retErr)

// Set up in / out config
in := &ffmpeg.TranscodeOptionsIn{
Fname: md.Fname,
Expand Down Expand Up @@ -69,7 +80,9 @@ type NvidiaTranscoder struct {
session *ffmpeg.Transcoder
}

func (nv *NvidiaTranscoder) Transcode(md *SegTranscodingMetadata) (*TranscodeData, error) {
func (nv *NvidiaTranscoder) Transcode(md *SegTranscodingMetadata) (td *TranscodeData, retErr error) {
// Returns UnrecoverableError instead of panicking to gracefully notify orchestrator about transcoder's failure
defer recoverFromPanic(&retErr)

in := &ffmpeg.TranscodeOptionsIn{
Fname: md.Fname,
Expand Down Expand Up @@ -251,3 +264,13 @@ func detectorsToTranscodeOptions(workDir string, accel ffmpeg.Acceleration, prof
}
return opts
}

func recoverFromPanic(retErr *error) {
if r := recover(); r != nil {
err, ok := r.(error)
if !ok {
err = errors.New("unrecoverable transcoding failure")
}
*retErr = NewUnrecoverableError(err)
}
}
31 changes: 30 additions & 1 deletion core/transcoder_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package core

import (
"errors"
"io/ioutil"
"os"
"testing"
Expand Down Expand Up @@ -38,7 +39,6 @@ func TestLocalTranscoder(t *testing.T) {
t.Errorf("Wrong data %v", len(res.Segments[1].Data))
}
}

func TestNvidia_Transcoder(t *testing.T) {
dev := os.Getenv("NV_DEVICE")
if dev == "" {
Expand Down Expand Up @@ -287,3 +287,32 @@ func TestTranscoder_Formats(t *testing.T) {
// sanity check the base format wasn't overwritten (has happened before!)
assert.Equal(ffmpeg.FormatNone, ffmpeg.P144p30fps16x9.Format)
}

func TestRecoverFromPanic(t *testing.T) {
assert := assert.New(t)

f := func() (err error) {
defer recoverFromPanic(&err)
panic(struct{}{})
}

err := f()

assert.NotNil(err)
assert.Equal("unrecoverable transcoding failure", err.Error())
assert.IsType(UnrecoverableError{}, err)
}

func TestRecoverFromPanic_WithError(t *testing.T) {
assert := assert.New(t)
sampleErr := errors.New("sample error")

f := func() (err error) {
defer recoverFromPanic(&err)
panic(sampleErr)
}

err := f()

assert.Equal(NewUnrecoverableError(sampleErr), err)
}
3 changes: 3 additions & 0 deletions server/ot_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ func runTranscode(n *core.LivepeerNode, orchAddr string, httpc *http.Client, not
err = errors.New("segment / profile mismatch")
}
if err != nil {
if _, ok := err.(core.UnrecoverableError); ok {
defer panic(err)
}
glog.Error("Unable to transcode ", err)
body.Write([]byte(err.Error()))
contentType = transcodingErrorMimeType
Expand Down
18 changes: 17 additions & 1 deletion server/ot_rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package server
import (
"bytes"
"crypto/tls"
"errors"
"fmt"
"io"
"io/ioutil"
Expand Down Expand Up @@ -175,7 +176,7 @@ func TestRemoteTranscoder_FullProfiles(t *testing.T) {
assert.Nil(nil, tr.profiles)
}

func TestRemoteTranscoderError(t *testing.T) {
func TestRemoteTranscoder_Error(t *testing.T) {
httpc := &http.Client{Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}}
profiles := []ffmpeg.VideoProfile{ffmpeg.P720p60fps16x9, ffmpeg.P144p30fps16x9}

Expand Down Expand Up @@ -223,4 +224,19 @@ func TestRemoteTranscoderError(t *testing.T) {
assert.Equal(2, tr.called)
assert.NotNil(body)
assert.Equal("segment / profile mismatch", string(body))

// unrecoverable error
// send the response and panic
tr.err = core.NewUnrecoverableError(errors.New("some error"))
panicked := false
defer func() {
if r := recover(); r != nil {
panicked = true
}
}()
runTranscode(node, parsedURL.Host, httpc, notify)
assert.Equal(3, tr.called)
assert.NotNil(body)
assert.Equal("some error", string(body))
assert.True(panicked)
}

0 comments on commit f161de2

Please sign in to comment.