Skip to content

Commit

Permalink
update to transcoders having separate secrets
Browse files Browse the repository at this point in the history
  • Loading branch information
ad-astra-video committed Dec 28, 2022
1 parent c0ce835 commit 6764729
Show file tree
Hide file tree
Showing 9 changed files with 198 additions and 23 deletions.
7 changes: 5 additions & 2 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,8 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
if *cfg.OrchSecret != "" {
n.OrchSecret, _ = common.ReadFromFile(*cfg.OrchSecret)
}
//set up transcoder secrets
n.GetTranscoderSecrets()

var transcoderCaps []core.Capability
if *cfg.Transcoder {
Expand Down Expand Up @@ -1089,7 +1091,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
// if http addr is not provided, listen to all ifaces
// take the port to listen to from the service URI
*cfg.HttpAddr = defaultAddr(*cfg.HttpAddr, "", n.GetServiceURI().Port())
if !*cfg.Transcoder && n.OrchSecret == "" {
if !*cfg.Transcoder && len(n.OrchSecret) == 0 {
glog.Fatal("Running an orchestrator requires an -orchSecret for standalone mode or -transcoder for orchestrator+transcoder mode")
}
}
Expand Down Expand Up @@ -1178,7 +1180,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
}()

if n.NodeType == core.TranscoderNode {
if n.OrchSecret == "" {
if len(n.OrchSecret) == 0 {
glog.Fatal("Missing -orchSecret")
}
if len(orchURLs) <= 0 {
Expand Down Expand Up @@ -1412,3 +1414,4 @@ func getBroadcasterPrices(broadcasterPrices string) []BroadcasterPrice {

return pricesSet.Prices
}

81 changes: 81 additions & 0 deletions common/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type DB struct {
// prepared statements
updateOrch *sql.Stmt
selectKV *sql.Stmt
selectKVLike *sql.Stmt
updateKV *sql.Stmt
insertUnbondingLock *sql.Stmt
deleteUnbondingLock *sql.Stmt
Expand Down Expand Up @@ -197,6 +198,15 @@ func InitDB(dbPath string) (*DB, error) {
}
d.selectKV = stmt

// selectKVLike prepared statement
stmt, err = db.Prepare("SELECT key, value from kv WHERE key LIKE ?")
if err != nil {
glog.Error("Unable to prepare selectKVLike stmt ", err)
d.Close()
return nil, err
}
d.selectKVLike = stmt

// updateKV prepared statement
stmt, err = db.Prepare("INSERT OR REPLACE INTO kv(key, value, updatedAt) VALUES(?1, ?2, datetime())")
if err != nil {
Expand Down Expand Up @@ -369,6 +379,9 @@ func (db *DB) Close() {
if db.selectKV != nil {
db.selectKV.Close()
}
if db.selectKVLike != nil {
db.selectKVLike.Close()
}
if db.updateKV != nil {
db.updateKV.Close()
}
Expand Down Expand Up @@ -457,6 +470,50 @@ func (db *DB) SetChainID(id *big.Int) error {
if err := db.updateKVStore("chainID", id.String()); err != nil {
return err
}

return nil
}

func (db *DB) GetTranscoderSecret(secret string) (bool, error) {
active, err := db.selectKVStore("secret:"+secret)
if err != nil {
return false, err
}

is_active, err := strconv.ParseBool(active)
if err == nil {
return is_active, nil
} else {
return false, err
}
}

func(db *DB) GetTranscoderSecrets() (map[string]bool, error) {
var (
ks string
secrets map[string]bool
is_active bool
)
kvs, err := db.selectKVStoreLike("secret:")
if err == nil {
secrets = make(map[string]bool)
for k, v := range kvs {
ks = strings.Replace(k, "secret:", "", 1)
is_active, _ = strconv.ParseBool(v)
secrets[ks] = is_active
}
return secrets, nil
} else {
return nil, err
}
}

func (db *DB) UpdateTranscoderSecret(secret string, active bool) error {
err := db.updateKVStore("secret:"+secret, strconv.FormatBool(active))
if err != nil {
return err
}

return nil
}

Expand All @@ -473,6 +530,30 @@ func (db *DB) selectKVStore(key string) (string, error) {
return valueString, nil
}

func (db *DB) selectKVStoreLike(key string) (map[string]string, error) {
rows, err := db.selectKVLike.Query(key)
defer rows.Close()
kvs := make(map[string]string)
if err != nil {
return kvs, err
}
for rows.Next() {
var (
key string
value string
)
if err := rows.Scan(&key, &value); err != nil {
if err.Error() != "sql: no rows in result set" {
return kvs, fmt.Errorf("could not retrieve key from database: %v", err)
}
// If there is no result return no error, just zero value
return kvs, nil
}
kvs[key] = value
}
return kvs, nil
}

func (db *DB) updateKVStore(key, value string) error {
_, err := db.updateKV.Exec(key, value)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type RemoteTranscoderInfo struct {
Priority int
PPNS float64
RTR float64
Secret string
}

type StreamInfo struct {
Expand Down
21 changes: 21 additions & 0 deletions core/livepeernode.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,24 @@ func (n *LivepeerNode) SetMaxSessions(s int) {
lpmon.MaxSessions(MaxSessions)
}
}

func (n* LivepeerNode) GetTranscoderSecrets() {
n.TranscoderManager.RTmutex.Lock()
defer n.TranscoderManager.RTmutex.Unlock()
secrets, err := n.Database.GetTranscoderSecrets()
//include OrchSecret
n.TranscoderManager.transcoderSecrets[n.OrchSecret] = true
//get other transcoder secrets
if err == nil {
for k, v := range secrets {
n.TranscoderManager.transcoderSecrets[k] = v
}
}
}

func (n* LivepeerNode) UpdateTranscoderSecret(secret string, active bool) {
n.TranscoderManager.RTmutex.Lock()
defer n.TranscoderManager.RTmutex.Unlock()
n.Database.UpdateTranscoderSecret(secret, active)
n.TranscoderManager.transcoderSecrets[secret] = active
}
70 changes: 55 additions & 15 deletions core/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,13 @@ func (orch *orchestrator) Address() ethcommon.Address {
return orch.address
}

func (orch *orchestrator) TranscoderSecret() string {
return orch.node.OrchSecret
func (rtm *RemoteTranscoderManager) CheckTranscoderSecret(secret string) (bool, bool) {
is_active, ok := rtm.transcoderSecrets[secret]
if ok {
return is_active, ok
} else {
return false, ok
}
}

func (orch *orchestrator) CheckCapacity(mid ManifestID) error {
Expand All @@ -104,8 +109,8 @@ func (orch *orchestrator) TranscodeSeg(ctx context.Context, md *SegTranscodingMe
return orch.node.sendToTranscodeLoop(ctx, md, seg)
}

func (orch *orchestrator) ServeTranscoder(stream net.Transcoder_RegisterTranscoderServer, capacity int, capabilities *net.Capabilities) {
orch.node.serveTranscoder(stream, capacity, capabilities)
func (orch *orchestrator) ServeTranscoder(stream net.Transcoder_RegisterTranscoderServer, capacity int, capabilities *net.Capabilities, secret string) {
orch.node.serveTranscoder(stream, capacity, capabilities, secret)
}

func (orch *orchestrator) TranscoderResults(tcID int64, res *RemoteTranscoderResult) {
Expand Down Expand Up @@ -692,8 +697,10 @@ func (n *LivepeerNode) endTranscodingSession(sessionId string, logCtx context.Co
_ = sess.stream.Send(msg)
}
n.TranscoderManager.completeStreamSession(sessionId)

n.TranscoderManager.RTmutex.Unlock()
}

n.segmentMutex.Lock()
mid := ManifestID(sessionId)
if _, ok := n.SegmentChans[mid]; ok {
Expand All @@ -706,13 +713,13 @@ func (n *LivepeerNode) endTranscodingSession(sessionId string, logCtx context.Co
n.segmentMutex.Unlock()
}

func (n *LivepeerNode) serveTranscoder(stream net.Transcoder_RegisterTranscoderServer, capacity int, capabilities *net.Capabilities) {
func (n *LivepeerNode) serveTranscoder(stream net.Transcoder_RegisterTranscoderServer, capacity int, capabilities *net.Capabilities, secret string) {
from := common.GetConnectionAddr(stream.Context())
coreCaps := CapabilitiesFromNetCapabilities(capabilities)
n.Capabilities.AddCapacity(coreCaps)
defer n.Capabilities.RemoveCapacity(coreCaps)
// Manage blocks while transcoder is connected
n.TranscoderManager.Manage(stream, capacity, capabilities)
n.TranscoderManager.Manage(stream, capacity, capabilities, secret)
glog.V(common.DEBUG).Infof("Closing transcoder=%s channel", from)
}

Expand All @@ -735,6 +742,7 @@ type RemoteTranscoder struct {
ppns float64
rtr float64
priority int
secret string
}

// RemoteTranscoderFatalError wraps error to indicate that error is fatal
Expand Down Expand Up @@ -824,7 +832,7 @@ func (rt *RemoteTranscoder) Transcode(logCtx context.Context, md *SegTranscoding
return chanData.TranscodeData, chanData.Err
}
}
func NewRemoteTranscoder(m *RemoteTranscoderManager, stream net.Transcoder_RegisterTranscoderServer, capacity int, caps *Capabilities) *RemoteTranscoder {
func NewRemoteTranscoder(m *RemoteTranscoderManager, stream net.Transcoder_RegisterTranscoderServer, capacity int, caps *Capabilities, secret string) *RemoteTranscoder {
t_uri := common.GetConnectionAddr(stream.Context())
pr := 1
if strings.Contains(t_uri, "127.0.0.1") {
Expand All @@ -840,13 +848,15 @@ func NewRemoteTranscoder(m *RemoteTranscoderManager, stream net.Transcoder_Regis
capabilities: caps,
ppns: 2,
priority: pr,
secret: secret,
}
}

func NewRemoteTranscoderManager() *RemoteTranscoderManager {
return &RemoteTranscoderManager{
remoteTranscoders: []*RemoteTranscoder{},
liveTranscoders: map[net.Transcoder_RegisterTranscoderServer]*RemoteTranscoder{},
transcoderSecrets: make(map[string]bool),
RTmutex: sync.Mutex{},

taskMutex: &sync.RWMutex{},
Expand Down Expand Up @@ -895,6 +905,7 @@ func (r byTranscodeTime) Less(i, j int) bool {
type RemoteTranscoderManager struct {
remoteTranscoders []*RemoteTranscoder
liveTranscoders map[net.Transcoder_RegisterTranscoderServer]*RemoteTranscoder
transcoderSecrets map[string]bool
RTmutex sync.Mutex

// For tracking tasks assigned to remote transcoders
Expand All @@ -904,6 +915,7 @@ type RemoteTranscoderManager struct {
sortMethod int
// Map for keeping track of sessions and their respective transcoders
streamSessions map[string]*RemoteTranscoder

}

func (rtm *RemoteTranscoderManager) Sort() {
Expand Down Expand Up @@ -937,16 +949,16 @@ func (rtm *RemoteTranscoderManager) RegisteredTranscodersInfo() []common.RemoteT
rtm.RTmutex.Lock()
res := make([]common.RemoteTranscoderInfo, 0, len(rtm.liveTranscoders))
for _, transcoder := range rtm.liveTranscoders {
res = append(res, common.RemoteTranscoderInfo{Address: transcoder.addr, Capacity: transcoder.capacity, Priority: transcoder.priority, RTR: transcoder.rtr, PPNS: transcoder.ppns})
res = append(res, common.RemoteTranscoderInfo{Address: transcoder.addr, Capacity: transcoder.capacity, Priority: transcoder.priority, RTR: transcoder.rtr, PPNS: transcoder.ppns, Secret: transcoder.secret})
}
rtm.RTmutex.Unlock()
return res
}

// Manage adds transcoder to list of live transcoders. Doesn't return until transcoder disconnects
func (rtm *RemoteTranscoderManager) Manage(stream net.Transcoder_RegisterTranscoderServer, capacity int, capabilities *net.Capabilities) {
func (rtm *RemoteTranscoderManager) Manage(stream net.Transcoder_RegisterTranscoderServer, capacity int, capabilities *net.Capabilities, secret string) {
from := common.GetConnectionAddr(stream.Context())
transcoder := NewRemoteTranscoder(rtm, stream, capacity, CapabilitiesFromNetCapabilities(capabilities))
transcoder := NewRemoteTranscoder(rtm, stream, capacity, CapabilitiesFromNetCapabilities(capabilities), secret)
go func() {
ctx := stream.Context()
<-ctx.Done()
Expand Down Expand Up @@ -984,6 +996,22 @@ func (rtm *RemoteTranscoderManager) Manage(stream net.Transcoder_RegisterTransco
}
}

func (rtm *RemoteTranscoderManager) deleteLiveTranscoder(t *RemoteTranscoder) {
rtm.RTmutex.Lock()
var totalLoad, totalCapacity, liveTranscodersNum int
delete(rtm.liveTranscoders, t.stream)

if monitor.Enabled {
totalLoad, totalCapacity, liveTranscodersNum = rtm.totalLoadAndCapacity()
}
rtm.RTmutex.Unlock()

if monitor.Enabled {
monitor.SetTranscodersNumberAndLoad(totalLoad, totalCapacity, liveTranscodersNum)
monitor.SetTranscoderStats(t.addr, int(0), int(0), float64(0), 0)
}
}

func removeFromRemoteTranscoders(rt *RemoteTranscoder, remoteTranscoders []*RemoteTranscoder) []*RemoteTranscoder {
if len(remoteTranscoders) == 0 {
// No transocerds to remove, return
Expand Down Expand Up @@ -1020,10 +1048,14 @@ func (rtm *RemoteTranscoderManager) selectTranscoder(sessionId string, caps *Cap

findCompatibleTranscoder := func(rtm *RemoteTranscoderManager) int {
for i := len(rtm.remoteTranscoders) - 1; i >= 0; i-- {
// no capabilities = default capabilities, all transcoders must support them
if caps == nil || caps.bitstring.CompatibleWith(rtm.remoteTranscoders[i].capabilities.bitstring) {
if rtm.remoteTranscoders[i].load < rtm.remoteTranscoders[i].capacity {
return i
//check if transcoder secret is active
is_active, _ := rtm.CheckTranscoderSecret(rtm.remoteTranscoders[i].secret)
if is_active == true {
// no capabilities = default capabilities, all transcoders must support them
if caps == nil || caps.bitstring.CompatibleWith(rtm.remoteTranscoders[i].capabilities.bitstring) {
if rtm.remoteTranscoders[i].load < rtm.remoteTranscoders[i].capacity {
return i
}
}
}
}
Expand Down Expand Up @@ -1091,8 +1123,16 @@ func (rtm *RemoteTranscoderManager) completeStreamSession(sessionId string) {
if monitor.Enabled {
monitor.SetTranscoderLoad(t.addr, t.load)
}
rtm.Sort()

delete(rtm.streamSessions, sessionId)

is_active, _ := rtm.CheckTranscoderSecret(t.secret)
if is_active == false {
glog.V(common.DEBUG).Infof("Transcoder %v secret %v deactivated, closing", t.addr, t.secret)
t.done()
}

rtm.Sort()
}

// Caller of this function should hold RTmutex lock
Expand Down
26 changes: 26 additions & 0 deletions server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1500,6 +1500,32 @@ func (s *LivepeerServer) setMaxSessionsHandler() http.Handler {
})
}

func (s *LivepeerServer) activateTranscoderSecretHandler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if s.LivepeerNode.NodeType == core.OrchestratorNode {
secret := common.RandName()
s.LivepeerNode.UpdateTranscoderSecret(secret, true)
glog.Infof("Transcoder secret activated: %v", secret)
respondOk(w, []byte(secret))
}
})
}

func (s *LivepeerServer) deactivateTranscoderSecretHandler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if s.LivepeerNode.NodeType == core.OrchestratorNode {
secret := r.FormValue("secret")
if secret == "" {
respond400(w, "need to set secret")
}

s.LivepeerNode.UpdateTranscoderSecret(secret, false)
glog.Infof("Transcoder secret deactivated: %v", secret)
respondOk(w, []byte(""))
}
})
}

// Helpers
func respondOk(w http.ResponseWriter, msg []byte) {
w.WriteHeader(http.StatusOK)
Expand Down
Loading

0 comments on commit 6764729

Please sign in to comment.