Skip to content

Commit

Permalink
Fix unit tests and enable tests in CI for ai-video (#3190)
Browse files Browse the repository at this point in the history
  • Loading branch information
leszko authored Oct 21, 2024
1 parent b5d50db commit d5f0db7
Show file tree
Hide file tree
Showing 19 changed files with 145 additions and 259 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ on:
pull_request:
push:
branches:
# - master
- ai-video
tags:
- "v*"
Expand Down Expand Up @@ -338,7 +337,7 @@ jobs:
destination: "build.livepeer.live/${{ github.event.repository.name }}/ai-video/stable"
parent: false
process_gcloudignore: false

# Update the latest branch manifest
- name: Upload branch manifest file to Google Cloud stable folder
id: upload-manifest-latest
Expand Down
11 changes: 6 additions & 5 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
name: Trigger test suite

on:
# pull_request:
# branches:
# - master
pull_request:
branches:
- master
- ai-video
push:
branches:
- master
Expand Down Expand Up @@ -94,9 +95,9 @@ jobs:
- name: Lint
uses: golangci/golangci-lint-action@v4
with:
version: v1.52.2
version: v1.61.0
skip-pkg-cache: true
args: '--disable-all --enable=gofmt --enable=vet --enable=golint --deadline=4m pm verification'
args: '--out-format=colored-line-number --disable-all --enable=gofmt --enable=govet --enable=revive --timeout=4m pm verification'

- name: Run Revive Action by building from repository
uses: docker://morphy/revive-action:v2
Expand Down
10 changes: 6 additions & 4 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1464,10 +1464,12 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
// take the port to listen to from the service URI
*cfg.HttpAddr = defaultAddr(*cfg.HttpAddr, "", n.GetServiceURI().Port())
if !*cfg.Transcoder && !*cfg.AIWorker {
if *cfg.AIModels != "" && n.OrchSecret == "" {
glog.Info("Running an orchestrator in AI External Container mode")
} else {
glog.Exit("Running an orchestrator requires an -orchSecret for standalone mode or -transcoder for orchestrator+transcoder mode")
if n.OrchSecret == "" {
if *cfg.AIModels != "" {
glog.Info("Running an orchestrator in AI External Container mode")
} else if n.OrchSecret == "" {
glog.Exit("Running an orchestrator requires an -orchSecret for standalone mode or -transcoder for orchestrator+transcoder mode")
}
}
}
} else if n.NodeType == core.TranscoderNode {
Expand Down
2 changes: 1 addition & 1 deletion common/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func IgnoreRoutines() []goleak.Option {
"github.com/livepeer/go-livepeer/server.(*LivepeerServer).StartMediaServer", "github.com/livepeer/go-livepeer/core.(*RemoteTranscoderManager).Manage.func1",
"github.com/livepeer/go-livepeer/server.(*LivepeerServer).HandlePush.func1", "github.com/rjeczalik/notify.(*nonrecursiveTree).dispatch",
"github.com/rjeczalik/notify.(*nonrecursiveTree).internal", "github.com/livepeer/lpms/stream.NewBasicRTMPVideoStream.func1", "github.com/patrickmn/go-cache.(*janitor).Run",
"github.com/golang/glog.(*fileSink).flushDaemon", "github.com/livepeer/go-livepeer/core.(*LivepeerNode).transcodeFrames.func2",
"github.com/golang/glog.(*fileSink).flushDaemon", "github.com/livepeer/go-livepeer/core.(*LivepeerNode).transcodeFrames.func2", "github.com/ipfs/go-log/writer.(*MirrorWriter).logRoutine",
}

res := make([]goleak.Option, 0, len(funcs2ignore))
Expand Down
90 changes: 0 additions & 90 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ import (
"sort"
"strconv"
"strings"
"testing"
"time"

"github.com/ethereum/go-ethereum/crypto"
"github.com/golang/glog"
"github.com/jaypipes/ghw"
"github.com/jaypipes/ghw/pkg/gpu"
"github.com/jaypipes/ghw/pkg/pci"
Expand Down Expand Up @@ -66,7 +64,6 @@ const priceScalingFactor = int64(1000)

var (
ErrParseBigInt = fmt.Errorf("failed to parse big integer")
ErrProfile = fmt.Errorf("failed to parse profile")

ErrChromaFormat = fmt.Errorf("unknown VideoProfile ChromaFormat")
ErrFormatProto = fmt.Errorf("unknown VideoProfile format for protobufs")
Expand Down Expand Up @@ -100,93 +97,6 @@ func ParseBigInt(num string) (*big.Int, error) {
}
}

func WaitUntil(waitTime time.Duration, condition func() bool) {
start := time.Now()
for time.Since(start) < waitTime {
if condition() == false {
time.Sleep(100 * time.Millisecond)
continue
}
break
}
}

func WaitAssert(t *testing.T, waitTime time.Duration, condition func() bool, msg string) {
start := time.Now()
for time.Since(start) < waitTime {
if condition() == false {
time.Sleep(100 * time.Millisecond)
continue
}
break
}

if condition() == false {
t.Errorf(msg)
}
}

func Retry(attempts int, sleep time.Duration, fn func() error) error {
if err := fn(); err != nil {
if attempts--; attempts > 0 {
time.Sleep(sleep)
return Retry(attempts, 2*sleep, fn)
}
return err
}

return nil
}

func TxDataToVideoProfile(txData string) ([]ffmpeg.VideoProfile, error) {
profiles := make([]ffmpeg.VideoProfile, 0)

if len(txData) == 0 {
return profiles, nil
}
if len(txData) < VideoProfileIDSize {
return nil, ErrProfile
}

for i := 0; i+VideoProfileIDSize <= len(txData); i += VideoProfileIDSize {
txp := txData[i : i+VideoProfileIDSize]

p, ok := ffmpeg.VideoProfileLookup[VideoProfileNameLookup[txp]]
if !ok {
glog.Errorf("Cannot find video profile for job: %v", txp)
return nil, ErrProfile // monitor to see if this is too aggressive
}
profiles = append(profiles, p)
}

return profiles, nil
}

func BytesToVideoProfile(txData []byte) ([]ffmpeg.VideoProfile, error) {
profiles := make([]ffmpeg.VideoProfile, 0)

if len(txData) == 0 {
return profiles, nil
}
if len(txData) < VideoProfileIDBytes {
return nil, ErrProfile
}

for i := 0; i+VideoProfileIDBytes <= len(txData); i += VideoProfileIDBytes {
var txp [VideoProfileIDBytes]byte
copy(txp[:], txData[i:i+VideoProfileIDBytes])

p, ok := ffmpeg.VideoProfileLookup[VideoProfileByteLookup[txp]]
if !ok {
glog.Errorf("Cannot find video profile for job: %v", txp)
return nil, ErrProfile // monitor to see if this is too aggressive
}
profiles = append(profiles, p)
}

return profiles, nil
}

func FFmpegProfiletoNetProfile(ffmpegProfiles []ffmpeg.VideoProfile) ([]*net.VideoProfile, error) {
profiles := make([]*net.VideoProfile, 0, len(ffmpegProfiles))
for _, profile := range ffmpegProfiles {
Expand Down
60 changes: 0 additions & 60 deletions common/util_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package common

import (
"encoding/hex"
"fmt"
"math"
"math/big"
Expand All @@ -19,45 +18,6 @@ import (
"github.com/stretchr/testify/assert"
)

func TestTxDataToVideoProfile(t *testing.T) {
if res, err := TxDataToVideoProfile(""); err != nil && len(res) != 0 {
t.Error("Unexpected return on empty input")
}
if _, err := TxDataToVideoProfile("abc"); err != ErrProfile {
t.Error("Unexpected return on too-short input", err)
}
if _, err := TxDataToVideoProfile("abcdefghijk"); err != ErrProfile {
t.Error("Unexpected return on invalid input", err)
}
res, err := TxDataToVideoProfile("93c717e7c0a6517a")
if err != nil || res[1] != ffmpeg.P240p30fps16x9 || res[0] != ffmpeg.P360p30fps16x9 {
t.Error("Unexpected profile! ", err, res)
}
}

func TestVideoProfileBytes(t *testing.T) {
if len(VideoProfileByteLookup) != len(VideoProfileNameLookup) {
t.Error("Video profile byte map was not created correctly")
}
if res, err := BytesToVideoProfile(nil); err != nil && len(res) != 0 {
t.Error("Unexpected return on empty input")
}
if res, err := BytesToVideoProfile([]byte{}); err != nil && len(res) != 0 {
t.Error("Unexpected return on empty input")
}
if _, err := BytesToVideoProfile([]byte("abc")); err != ErrProfile {
t.Error("Unexpected return on too-short input", err)
}
if _, err := BytesToVideoProfile([]byte("abcdefghijk")); err != ErrProfile {
t.Error("Unexpected return on invalid input", err)
}
b, _ := hex.DecodeString("93c717e7c0a6517a")
res, err := BytesToVideoProfile(b)
if err != nil || res[1] != ffmpeg.P240p30fps16x9 || res[0] != ffmpeg.P360p30fps16x9 {
t.Error("Unexpected profile! ", err, res)
}
}

func TestFFmpegProfiletoNetProfile(t *testing.T) {
assert := assert.New(t)

Expand Down Expand Up @@ -158,26 +118,6 @@ func TestFFmpegProfiletoNetProfile(t *testing.T) {
assert.Nil(fullProfiles)
}

func TestProfilesToHex(t *testing.T) {
assert := assert.New(t)
// Sanity checking against an existing eth impl that we know works
compare := func(profiles []ffmpeg.VideoProfile) {
pCopy := make([]ffmpeg.VideoProfile, len(profiles))
copy(pCopy, profiles)
b1, err := hex.DecodeString(ProfilesToHex(profiles))
assert.Nil(err, "Error hex encoding/decoding")
b2, err := BytesToVideoProfile(b1)
assert.Nil(err, "Error converting back to profile")
assert.Equal(pCopy, b2)
}
// XXX double check which one is wrong! ethcommon method produces "0" zero string
// compare(nil)
// compare([]ffmpeg.VideoProfile{})
compare([]ffmpeg.VideoProfile{ffmpeg.P240p30fps16x9})
compare([]ffmpeg.VideoProfile{ffmpeg.P240p30fps16x9, ffmpeg.P360p30fps16x9})
compare([]ffmpeg.VideoProfile{ffmpeg.P360p30fps16x9, ffmpeg.P240p30fps16x9})
}

func TestVideoProfile_FormatMimeType(t *testing.T) {
inp := []ffmpeg.Format{ffmpeg.FormatNone, ffmpeg.FormatMPEGTS, ffmpeg.FormatMP4}
exp := []string{"video/mp2t", "video/mp2t", "video/mp4"}
Expand Down
36 changes: 20 additions & 16 deletions core/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,17 +490,19 @@ func (c *Capabilities) ToNetCapabilities() *net.Capabilities {
for capability, capacity := range c.capacities {
netCaps.Capacities[uint32(capability)] = uint32(capacity)
}
for capability, constraints := range c.constraints.perCapability {
models := make(map[string]*net.Capabilities_CapabilityConstraints_ModelConstraint)
for modelID, modelConstraint := range constraints.Models {
models[modelID] = &net.Capabilities_CapabilityConstraints_ModelConstraint{
Warm: modelConstraint.Warm,
Capacity: uint32(modelConstraint.Capacity),
if c.constraints.perCapability != nil {
for capability, constraints := range c.constraints.perCapability {
models := make(map[string]*net.Capabilities_CapabilityConstraints_ModelConstraint)
for modelID, modelConstraint := range constraints.Models {
models[modelID] = &net.Capabilities_CapabilityConstraints_ModelConstraint{
Warm: modelConstraint.Warm,
Capacity: uint32(modelConstraint.Capacity),
}
}
}

netCaps.Constraints.PerCapability[uint32(capability)] = &net.Capabilities_CapabilityConstraints{
Models: models,
netCaps.Constraints.PerCapability[uint32(capability)] = &net.Capabilities_CapabilityConstraints{
Models: models,
}
}
}
return netCaps
Expand Down Expand Up @@ -533,14 +535,16 @@ func CapabilitiesFromNetCapabilities(caps *net.Capabilities) *Capabilities {
}
}

for capabilityInt, constraints := range caps.Constraints.PerCapability {
models := make(map[string]*ModelConstraint)
for modelID, modelConstraint := range constraints.Models {
models[modelID] = &ModelConstraint{Warm: modelConstraint.Warm, Capacity: int(modelConstraint.Capacity)}
}
if caps.Constraints != nil && caps.Constraints.PerCapability != nil {
for capabilityInt, constraints := range caps.Constraints.PerCapability {
models := make(map[string]*ModelConstraint)
for modelID, modelConstraint := range constraints.Models {
models[modelID] = &ModelConstraint{Warm: modelConstraint.Warm, Capacity: int(modelConstraint.Capacity)}
}

coreCaps.constraints.perCapability[Capability(capabilityInt)] = &CapabilityConstraints{
Models: models,
coreCaps.constraints.perCapability[Capability(capabilityInt)] = &CapabilityConstraints{
Models: models,
}
}
}

Expand Down
Loading

0 comments on commit d5f0db7

Please sign in to comment.