Skip to content

Commit

Permalink
Send requestID and streamID through to the runner for logging (#3384)
Browse files Browse the repository at this point in the history
* Send requestID and streamID through to the runner for logging

* Fix tests
  • Loading branch information
mjh1 authored Feb 12, 2025
1 parent da84ef0 commit 390af43
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 15 deletions.
9 changes: 7 additions & 2 deletions ai/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,14 +604,19 @@ func (w *Worker) TextToSpeech(ctx context.Context, req GenTextToSpeechJSONReques
return resp.JSON200, nil
}

func (w *Worker) LiveVideoToVideo(ctx context.Context, req GenLiveVideoToVideoJSONRequestBody) (*LiveVideoToVideoResponse, error) {
func (w *Worker) LiveVideoToVideo(ctx context.Context, requestID, streamID string, req GenLiveVideoToVideoJSONRequestBody) (*LiveVideoToVideoResponse, error) {
// Live video containers keep running after the initial request, so we use a background context to borrow the container.
c, err := w.borrowContainer(context.Background(), "live-video-to-video", *req.ModelId)
if err != nil {
return nil, err
}

resp, err := c.Client.GenLiveVideoToVideoWithResponse(ctx, req)
setHeaders := func(ctx context.Context, req *http.Request) error {
req.Header.Set("requestID", requestID)
req.Header.Set("streamID", streamID)
return nil
}
resp, err := c.Client.GenLiveVideoToVideoWithResponse(ctx, req, setHeaders)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion core/ai.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type AI interface {
SegmentAnything2(context.Context, worker.GenSegmentAnything2MultipartRequestBody) (*worker.MasksResponse, error)
ImageToText(context.Context, worker.GenImageToTextMultipartRequestBody) (*worker.ImageToTextResponse, error)
TextToSpeech(context.Context, worker.GenTextToSpeechJSONRequestBody) (*worker.AudioResponse, error)
LiveVideoToVideo(context.Context, worker.GenLiveVideoToVideoJSONRequestBody) (*worker.LiveVideoToVideoResponse, error)
LiveVideoToVideo(context.Context, string, string, worker.GenLiveVideoToVideoJSONRequestBody) (*worker.LiveVideoToVideoResponse, error)
Warm(context.Context, string, string, worker.RunnerEndpoint, worker.OptimizationFlags) error
Stop(context.Context) error
HasCapacity(string, string) bool
Expand Down
8 changes: 4 additions & 4 deletions core/ai_orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,10 +551,10 @@ func (orch *orchestrator) TextToImage(ctx context.Context, requestID string, req
return res.Results, nil
}

func (orch *orchestrator) LiveVideoToVideo(ctx context.Context, requestID string, req worker.GenLiveVideoToVideoJSONRequestBody) (interface{}, error) {
func (orch *orchestrator) LiveVideoToVideo(ctx context.Context, requestID, streamID string, req worker.GenLiveVideoToVideoJSONRequestBody) (interface{}, error) {
// local AIWorker processes job if combined orchestrator/ai worker
if orch.node.AIWorker != nil {
workerResp, err := orch.node.LiveVideoToVideo(ctx, req)
workerResp, err := orch.node.LiveVideoToVideo(ctx, requestID, streamID, req)
if err == nil {
return orch.node.saveLocalAIWorkerResults(ctx, *workerResp, requestID, "application/json")
} else {
Expand Down Expand Up @@ -1060,8 +1060,8 @@ func (n *LivepeerNode) TextToSpeech(ctx context.Context, req worker.GenTextToSpe
return n.AIWorker.TextToSpeech(ctx, req)
}

func (n *LivepeerNode) LiveVideoToVideo(ctx context.Context, req worker.GenLiveVideoToVideoJSONRequestBody) (*worker.LiveVideoToVideoResponse, error) {
return n.AIWorker.LiveVideoToVideo(ctx, req)
func (n *LivepeerNode) LiveVideoToVideo(ctx context.Context, requestID, streamID string, req worker.GenLiveVideoToVideoJSONRequestBody) (*worker.LiveVideoToVideoResponse, error) {
return n.AIWorker.LiveVideoToVideo(ctx, requestID, streamID, req)
}

// transcodeFrames converts a series of image URLs into a video segment for the image-to-video pipeline.
Expand Down
2 changes: 1 addition & 1 deletion core/ai_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ func (a *stubAIWorker) TextToSpeech(ctx context.Context, req worker.GenTextToSpe
return &worker.AudioResponse{Audio: worker.MediaURL{Url: "http://example.com/audio.wav"}}, nil
}

func (a *stubAIWorker) LiveVideoToVideo(ctx context.Context, req worker.GenLiveVideoToVideoJSONRequestBody) (*worker.LiveVideoToVideoResponse, error) {
func (a *stubAIWorker) LiveVideoToVideo(ctx context.Context, requestID, streamID string, req worker.GenLiveVideoToVideoJSONRequestBody) (*worker.LiveVideoToVideoResponse, error) {
return &worker.LiveVideoToVideoResponse{}, nil
}

Expand Down
11 changes: 9 additions & 2 deletions server/ai_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,14 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
remoteAddr := getRemoteAddr(r)
ctx := clog.AddVal(r.Context(), clog.ClientIP, remoteAddr)
requestID := string(core.RandomManifestID())
streamID := r.Header.Get("streamID")
requestID := r.Header.Get("requestID")

if requestID == "" {
requestID = string(core.RandomManifestID())
}
ctx = clog.AddVal(ctx, "request_id", requestID)
ctx = clog.AddVal(ctx, "stream_id", streamID)

var req worker.GenLiveVideoToVideoJSONRequestBody
if err := jsonDecoder[worker.GenLiveVideoToVideoJSONRequestBody](&req, r); err != nil {
Expand Down Expand Up @@ -231,7 +238,7 @@ func (h *lphttp) StartLiveVideoToVideo() http.Handler {
}

// Send request to the worker
_, err = orch.LiveVideoToVideo(ctx, requestID, workerReq)
_, err = orch.LiveVideoToVideo(ctx, requestID, streamID, workerReq)
if err != nil {
if monitor.Enabled {
monitor.AIProcessingError(err.Error(), pipeline, modelID, ethcommon.Address{}.String())
Expand Down
17 changes: 16 additions & 1 deletion server/ai_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -1038,9 +1038,24 @@ func submitLiveVideoToVideo(ctx context.Context, params aiRequestParams, sess *A
}
return nil, err
}
setHeaders, balUpdate, err := prepareAIPayment(ctx, sess, initPixelsToPay)
paymentHeaders, balUpdate, err := prepareAIPayment(ctx, sess, initPixelsToPay)
if err != nil {
if monitor.Enabled {
monitor.AIRequestError(err.Error(), "LiveVideoToVideo", *req.ModelId, sess.OrchestratorInfo)
}
return nil, err
}
defer completeBalanceUpdate(sess.BroadcastSession, balUpdate)

setHeaders := func(ctx context.Context, req *http.Request) error {
if err := paymentHeaders(ctx, req); err != nil {
return err
}
req.Header.Set("requestID", params.liveParams.requestID)
req.Header.Set("streamID", params.liveParams.streamID)
return nil
}

// Send request to orchestrator
resp, err := client.GenLiveVideoToVideoWithResponse(ctx, req, setHeaders)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion server/ai_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ func (a *stubAIWorker) TextToSpeech(ctx context.Context, req worker.GenTextToSpe
}
}

func (a *stubAIWorker) LiveVideoToVideo(ctx context.Context, req worker.GenLiveVideoToVideoJSONRequestBody) (*worker.LiveVideoToVideoResponse, error) {
func (a *stubAIWorker) LiveVideoToVideo(ctx context.Context, requestID, streamID string, req worker.GenLiveVideoToVideoJSONRequestBody) (*worker.LiveVideoToVideoResponse, error) {
a.Called++
if a.Err != nil {
return nil, a.Err
Expand Down
2 changes: 1 addition & 1 deletion server/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type Orchestrator interface {
SegmentAnything2(ctx context.Context, requestID string, req worker.GenSegmentAnything2MultipartRequestBody) (interface{}, error)
ImageToText(ctx context.Context, requestID string, req worker.GenImageToTextMultipartRequestBody) (interface{}, error)
TextToSpeech(ctx context.Context, requestID string, req worker.GenTextToSpeechJSONRequestBody) (interface{}, error)
LiveVideoToVideo(ctx context.Context, requestID string, req worker.GenLiveVideoToVideoJSONRequestBody) (interface{}, error)
LiveVideoToVideo(ctx context.Context, requestID, streamID string, req worker.GenLiveVideoToVideoJSONRequestBody) (interface{}, error)
}

// Balance describes methods for a session's balance maintenance
Expand Down
4 changes: 2 additions & 2 deletions server/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (r *stubOrchestrator) TextToSpeech(ctx context.Context, requestID string, r
return nil, nil
}

func (r *stubOrchestrator) LiveVideoToVideo(ctx context.Context, requestID string, req worker.GenLiveVideoToVideoJSONRequestBody) (interface{}, error) {
func (r *stubOrchestrator) LiveVideoToVideo(ctx context.Context, requestID, streamID string, req worker.GenLiveVideoToVideoJSONRequestBody) (interface{}, error) {
return nil, nil
}

Expand Down Expand Up @@ -1429,7 +1429,7 @@ func (r *mockOrchestrator) ImageToText(ctx context.Context, requestID string, re
func (r *mockOrchestrator) TextToSpeech(ctx context.Context, requestID string, req worker.GenTextToSpeechJSONRequestBody) (interface{}, error) {
return nil, nil
}
func (r *mockOrchestrator) LiveVideoToVideo(ctx context.Context, requestID string, req worker.GenLiveVideoToVideoJSONRequestBody) (interface{}, error) {
func (r *mockOrchestrator) LiveVideoToVideo(ctx context.Context, requestID, streamID string, req worker.GenLiveVideoToVideoJSONRequestBody) (interface{}, error) {
return nil, nil
}
func (r *mockOrchestrator) CheckAICapacity(pipeline, modelID string) bool {
Expand Down

0 comments on commit 390af43

Please sign in to comment.