Skip to content

Commit

Permalink
[ENG-2343] Stream attempts metric
Browse files Browse the repository at this point in the history
  • Loading branch information
pwilczynskiclearcode committed Jan 10, 2025
1 parent b026313 commit 9d4f9fe
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 1 deletion.
13 changes: 13 additions & 0 deletions monitor/census.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ type (
mAIResultSaveFailed *stats.Int64Measure
mAICurrentLivePipelines *stats.Int64Measure
mAIFirstSegmentDelay *stats.Int64Measure
mAILiveAttempts *stats.Int64Measure

lock sync.Mutex
emergeTimes map[uint64]map[uint64]time.Time // nonce:seqNo
Expand Down Expand Up @@ -377,6 +378,7 @@ func InitCensus(nodeType NodeType, version string) {
census.mAIResultSaveFailed = stats.Int64("ai_result_upload_failed_total", "AIResultUploadFailed", "tot")
census.mAICurrentLivePipelines = stats.Int64("ai_current_live_pipelines", "Number of live AI pipelines currently running", "tot")
census.mAIFirstSegmentDelay = stats.Int64("ai_first_segment_delay_ms", "Delay of the first live AI segment being processed", "ms")
census.mAILiveAttempts = stats.Int64("ai_live_attempts", "AI Live stream attempted", "tot")

glog.Infof("Compiler: %s Arch %s OS %s Go version %s", runtime.Compiler, runtime.GOARCH, runtime.GOOS, runtime.Version())
glog.Infof("Livepeer version: %s", version)
Expand Down Expand Up @@ -991,6 +993,13 @@ func InitCensus(nodeType NodeType, version string) {
TagKeys: baseTagsWithOrchInfo,
Aggregation: view.Distribution(0, .10, .20, .50, .100, .150, .200, .500, .1000, .5000, 10.000),
},
{
Name: "ai_live_attempt",
Measure: census.mAILiveAttempts,
Description: "AI Live stream attempted",
TagKeys: baseTags,
Aggregation: view.Count(),
},
}

// Register the views
Expand Down Expand Up @@ -1992,6 +2001,10 @@ func AIFirstSegmentDelay(delayMs int64, orchInfo *lpnet.OrchestratorInfo) {
}
}

func AILiveVideoAttempt() {
stats.Record(census.ctx, census.mAILiveAttempts.M(1))

Check warning on line 2005 in monitor/census.go

View check run for this annotation

Codecov / codecov/patch

monitor/census.go#L2004-L2005

Added lines #L2004 - L2005 were not covered by tests
}

// Convert wei to gwei
func wei2gwei(wei *big.Int) float64 {
gwei, _ := new(big.Float).Quo(new(big.Float).SetInt(wei), big.NewFloat(float64(gweiConversionFactor))).Float64()
Expand Down
3 changes: 3 additions & 0 deletions server/ai_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ func aiHttpHandle[I any](h *lphttp, decoderFunc func(*I, *http.Request) error) h

func (h *lphttp) StartLiveVideoToVideo() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if monitor.Enabled {
monitor.AILiveVideoAttempt()
}

Check warning on line 100 in server/ai_http.go

View check run for this annotation

Codecov / codecov/patch

server/ai_http.go#L98-L100

Added lines #L98 - L100 were not covered by tests

remoteAddr := getRemoteAddr(r)
ctx := clog.AddVal(r.Context(), clog.ClientIP, remoteAddr)
Expand Down
5 changes: 4 additions & 1 deletion server/ai_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -1079,9 +1079,12 @@ func submitLiveVideoToVideo(ctx context.Context, params aiRequestParams, sess *A
startControlPublish(control, params)
startTricklePublish(ctx, pub, params, sess)
startTrickleSubscribe(ctx, sub, params, func() {
delayMs := time.Since(startTime).Milliseconds()

Check warning on line 1082 in server/ai_process.go

View check run for this annotation

Codecov / codecov/patch

server/ai_process.go#L1082

Added line #L1082 was not covered by tests
if monitor.Enabled {
monitor.AIFirstSegmentDelay(time.Since(startTime).Milliseconds(), sess.OrchestratorInfo)
monitor.AIFirstSegmentDelay(delayMs, sess.OrchestratorInfo)

Check warning on line 1084 in server/ai_process.go

View check run for this annotation

Codecov / codecov/patch

server/ai_process.go#L1084

Added line #L1084 was not covered by tests
}
clog.V(common.VERBOSE).Infof(ctx, "First Segment delay=%dms streamID=%s", delayMs, params.liveParams.streamID)

Check warning on line 1086 in server/ai_process.go

View check run for this annotation

Codecov / codecov/patch

server/ai_process.go#L1086

Added line #L1086 was not covered by tests

})
startEventsSubscribe(ctx, events, params, sess)
return resp, nil
Expand Down

0 comments on commit 9d4f9fe

Please sign in to comment.