Skip to content

Commit

Permalink
Add Orchestrator's latency metrics to Kafka events (#3391)
Browse files Browse the repository at this point in the history
  • Loading branch information
leszko authored Feb 14, 2025
1 parent 1c1c280 commit 4fb4712
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 10 deletions.
6 changes: 4 additions & 2 deletions common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math/big"
"net/url"
"sync"
"time"

ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/livepeer/go-livepeer/net"
Expand Down Expand Up @@ -58,8 +59,9 @@ const (
)

type OrchestratorLocalInfo struct {
URL *url.URL `json:"Url"`
Score float32
URL *url.URL `json:"Url"`
Score float32
Latency *time.Duration
}

// combines B's local metadata about O with info received from this O
Expand Down
28 changes: 24 additions & 4 deletions discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import (
"math"
"math/rand"
"net/url"
"strconv"
"strings"
"time"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/livepeer/go-livepeer/clog"
"github.com/livepeer/go-livepeer/common"
"github.com/livepeer/go-livepeer/monitor"
Expand Down Expand Up @@ -108,14 +110,21 @@ func (o *orchestratorPool) GetOrchestrators(ctx context.Context, numOrchestrator
getOrchInfo := func(ctx context.Context, od common.OrchestratorDescriptor, infoCh chan common.OrchestratorDescriptor, errCh chan error) {
start := time.Now()
info, err := serverGetOrchInfo(ctx, o.bcast, od.LocalInfo.URL, caps.ToNetCapabilities())
clog.V(common.DEBUG).Infof(ctx, "Received GetOrchInfo RPC Response from uri=%v, latency=%v", od.LocalInfo.URL, time.Since(start))
latency := time.Since(start)
clog.V(common.DEBUG).Infof(ctx, "Received GetOrchInfo RPC Response from uri=%v, latency=%v", od.LocalInfo.URL, latency)
if err == nil && !isBlacklisted(info) && isCompatible(info) {
od.RemoteInfo = info
infoCh <- od
infoCh <- common.OrchestratorDescriptor{
LocalInfo: &common.OrchestratorLocalInfo{
URL: od.LocalInfo.URL,
Score: od.LocalInfo.Score,
Latency: &latency,
},
RemoteInfo: info,
}
return
}
clog.V(common.DEBUG).Infof(ctx, "Discovery unsuccessful for orchestrator %s, err=%q", od.LocalInfo.URL.String(), err)
if err != nil && !errors.Is(err, context.Canceled) {
clog.V(common.DEBUG).Infof(ctx, "err=%q", err)
if monitor.Enabled {
monitor.LogDiscoveryError(ctx, od.LocalInfo.URL.String(), err.Error())
}
Expand Down Expand Up @@ -179,6 +188,17 @@ func (o *orchestratorPool) GetOrchestrators(ctx context.Context, numOrchestrator
}
}

if monitor.Enabled && len(ods) > 0 {
var discoveryResults []map[string]string
for _, o := range ods {
discoveryResults = append(discoveryResults, map[string]string{
"address": hexutil.Encode(o.RemoteInfo.Address),
"url": o.RemoteInfo.Transcoder,
"latency_ms": strconv.FormatInt(o.LocalInfo.Latency.Milliseconds(), 10),
})
}
monitor.SendQueueEventAsync("discovery_results", discoveryResults)
}
clog.Infof(ctx, "Done fetching orch info numOrch=%d responses=%d/%d timedOut=%t",
len(ods), nbResp, len(linfos), timedOut)
return ods, nil
Expand Down
17 changes: 13 additions & 4 deletions discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1138,7 +1138,7 @@ func TestNewWHOrchestratorPoolCache(t *testing.T) {

for _, addr := range addresses {
uri, _ := url.ParseRequestURI(addr)
assert.Contains(infos, common.OrchestratorLocalInfo{URL: uri})
assert.Contains(removeLatency(infos), common.OrchestratorLocalInfo{URL: uri, Latency: nil})
}

// assert that list is not refreshed if lastRequest is more than 1 min ago and hash is the same
Expand All @@ -1158,7 +1158,7 @@ func TestNewWHOrchestratorPoolCache(t *testing.T) {

for _, addr := range addresses {
uri, _ := url.ParseRequestURI(addr)
assert.Contains(infos, common.OrchestratorLocalInfo{URL: uri})
assert.Contains(removeLatency(infos), common.OrchestratorLocalInfo{URL: uri, Latency: nil})
}

// mock a change in webhook addresses
Expand All @@ -1181,7 +1181,7 @@ func TestNewWHOrchestratorPoolCache(t *testing.T) {

for _, addr := range addresses {
uri, _ := url.ParseRequestURI(addr)
assert.NotContains(infos, common.OrchestratorLocalInfo{URL: uri})
assert.NotContains(removeLatency(infos), common.OrchestratorLocalInfo{URL: uri, Latency: nil})
}

// assert that list is refreshed if lastRequest is longer than 1 min ago and hash is not the same
Expand All @@ -1201,7 +1201,7 @@ func TestNewWHOrchestratorPoolCache(t *testing.T) {

for _, addr := range addresses {
uri, _ := url.ParseRequestURI(addr)
assert.Contains(infos, common.OrchestratorLocalInfo{URL: uri})
assert.Contains(removeLatency(infos), common.OrchestratorLocalInfo{URL: uri, Latency: nil})
}
}

Expand Down Expand Up @@ -1649,3 +1649,12 @@ func TestSetGetOrchestratorTimeout(t *testing.T) {
assert.Equal(poolCache.discoveryTimeout, 1000*time.Millisecond)

}

func removeLatency(infos []common.OrchestratorLocalInfo) []common.OrchestratorLocalInfo {
var res []common.OrchestratorLocalInfo
for _, i := range infos {
i.Latency = nil
res = append(res, i)
}
return res
}

0 comments on commit 4fb4712

Please sign in to comment.