Skip to content

Commit

Permalink
Make processingRetryTimeout configurable (#3387)
Browse files Browse the repository at this point in the history
  • Loading branch information
leszko authored Feb 12, 2025
1 parent 66ab0f9 commit da84ef0
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 15 deletions.
1 change: 1 addition & 0 deletions cmd/livepeer/livepeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ func parseLivepeerConfig() starter.LivepeerConfig {
cfg.AIModelsDir = flag.String("aiModelsDir", *cfg.AIModelsDir, "Set directory where AI model weights are stored")
cfg.AIRunnerImage = flag.String("aiRunnerImage", *cfg.AIRunnerImage, "[Deprecated] Specify the base Docker image for the AI runner. Example: livepeer/ai-runner:0.0.1. Use -aiRunnerImageOverrides instead.")
cfg.AIRunnerImageOverrides = flag.String("aiRunnerImageOverrides", *cfg.AIRunnerImageOverrides, `Specify overrides for the Docker images used by the AI runner. Example: '{"default": "livepeer/ai-runner:v1.0", "batch": {"text-to-speech": "livepeer/ai-runner:text-to-speech-v1.0"}, "live": {"another-pipeline": "livepeer/ai-runner:another-pipeline-v1.0"}}'`)
cfg.AIProcessingRetryTimeout = flag.Duration("aiProcessingRetryTimeout", *cfg.AIProcessingRetryTimeout, "Timeout for retrying to initiate AI processing request")

// Live AI:
cfg.MediaMTXApiPassword = flag.String("mediaMTXApiPassword", "", "HTTP basic auth password for MediaMTX API requests")
Expand Down
22 changes: 13 additions & 9 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ type LivepeerConfig struct {
TestOrchAvail *bool
AIRunnerImage *string
AIRunnerImageOverrides *string
AIProcessingRetryTimeout *time.Duration
KafkaBootstrapServers *string
KafkaUsername *string
KafkaPassword *string
Expand Down Expand Up @@ -215,6 +216,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
defaultAIModels := ""
defaultAIModelsDir := ""
defaultAIRunnerImage := "livepeer/ai-runner:latest"
defaultAIProcessingRetryTimeout := 2 * time.Second
defaultAIRunnerImageOverrides := ""
defaultLiveAIAuthWebhookURL := ""
defaultLivePaymentInterval := 5 * time.Second
Expand Down Expand Up @@ -320,15 +322,16 @@ func DefaultLivepeerConfig() LivepeerConfig {
TestTranscoder: &defaultTestTranscoder,

// AI:
AIServiceRegistry: &defaultAIServiceRegistry,
AIWorker: &defaultAIWorker,
AIModels: &defaultAIModels,
AIModelsDir: &defaultAIModelsDir,
AIRunnerImage: &defaultAIRunnerImage,
AIRunnerImageOverrides: &defaultAIRunnerImageOverrides,
LiveAIAuthWebhookURL: &defaultLiveAIAuthWebhookURL,
LivePaymentInterval: &defaultLivePaymentInterval,
GatewayHost: &defaultGatewayHost,
AIServiceRegistry: &defaultAIServiceRegistry,
AIWorker: &defaultAIWorker,
AIModels: &defaultAIModels,
AIModelsDir: &defaultAIModelsDir,
AIRunnerImage: &defaultAIRunnerImage,
AIProcessingRetryTimeout: &defaultAIProcessingRetryTimeout,
AIRunnerImageOverrides: &defaultAIRunnerImageOverrides,
LiveAIAuthWebhookURL: &defaultLiveAIAuthWebhookURL,
LivePaymentInterval: &defaultLivePaymentInterval,
GatewayHost: &defaultGatewayHost,

// Onchain:
EthAcctAddr: &defaultEthAcctAddr,
Expand Down Expand Up @@ -513,6 +516,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
if err != nil {
glog.Errorf("Error creating livepeer node: %v", err)
}
n.AIProcesssingRetryTimeout = *cfg.AIProcessingRetryTimeout

if *cfg.OrchSecret != "" {
n.OrchSecret, _ = common.ReadFromFile(*cfg.OrchSecret)
Expand Down
5 changes: 3 additions & 2 deletions core/livepeernode.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ type LivepeerNode struct {
Database *common.DB

// AI worker public fields
AIWorker AI
AIWorkerManager *RemoteAIWorkerManager
AIWorker AI
AIWorkerManager *RemoteAIWorkerManager
AIProcesssingRetryTimeout time.Duration

// Transcoder public fields
SegmentChans map[ManifestID]SegmentChan
Expand Down
3 changes: 2 additions & 1 deletion server/ai_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/livepeer/lpms/stream"
)

const processingRetryTimeout = 2 * time.Second
const defaultTextToImageModelID = "stabilityai/sdxl-turbo"
const defaultImageToImageModelID = "stabilityai/sdxl-turbo"
const defaultImageToVideoModelID = "stabilityai/stable-video-diffusion-img2vid-xt"
Expand Down Expand Up @@ -1464,13 +1463,15 @@ func processAIRequest(ctx context.Context, params aiRequestParams, req interface
}
capName := cap.String()
ctx = clog.AddVal(ctx, "capability", capName)
ctx = clog.AddVal(ctx, "model_id", modelID)

clog.V(common.VERBOSE).Infof(ctx, "Received AI request model_id=%s", modelID)
start := time.Now()
defer clog.Infof(ctx, "Processed AI request model_id=%v took=%v", modelID, time.Since(start))

var resp interface{}

processingRetryTimeout := params.node.AIProcesssingRetryTimeout
cctx, cancel := context.WithTimeout(ctx, processingRetryTimeout)
defer cancel()

Expand Down
6 changes: 3 additions & 3 deletions server/ai_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ type AISessionSelector struct {
os drivers.OSSession
}

func NewAISessionSelector(cap core.Capability, modelID string, node *core.LivepeerNode, ttl time.Duration) (*AISessionSelector, error) {
func NewAISessionSelector(ctx context.Context, cap core.Capability, modelID string, node *core.LivepeerNode, ttl time.Duration) (*AISessionSelector, error) {
var stakeRdr stakeReader
if node.Eth != nil {
stakeRdr = &storeStakeReader{store: node.Database}
Expand Down Expand Up @@ -193,7 +193,7 @@ func NewAISessionSelector(cap core.Capability, modelID string, node *core.Livepe
os: drivers.NodeStorage.NewSession(strconv.Itoa(int(cap)) + "_" + modelID),
}

if err := sel.Refresh(context.Background()); err != nil {
if err := sel.Refresh(ctx); err != nil {
return nil, err
}

Expand Down Expand Up @@ -405,7 +405,7 @@ func (c *AISessionManager) getSelector(ctx context.Context, cap core.Capability,
if !ok {
// Create the selector
var err error
sel, err = NewAISessionSelector(cap, modelID, c.node, c.ttl)
sel, err = NewAISessionSelector(ctx, cap, modelID, c.node, c.ttl)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit da84ef0

Please sign in to comment.