From e9cbadbee54abe1e29393065a9c048b11ba9a215 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Rafa=C5=82=20Leszko?= <rafal@livepeer.org>
Date: Wed, 24 Apr 2024 14:49:05 +0200
Subject: [PATCH] Initialize round by any B/O who has the initializeRound flag
 set to true (#3029)

---
 cmd/livepeer/livepeer.go        |   1 +
 cmd/livepeer/starter/starter.go | 205 ++++++++++++++++----------------
 eth/roundinitializer.go         |  95 ++++-----------
 eth/roundinitializer_test.go    | 112 +----------------
 4 files changed, 131 insertions(+), 282 deletions(-)

diff --git a/cmd/livepeer/livepeer.go b/cmd/livepeer/livepeer.go
index d875a079f..030d89795 100755
--- a/cmd/livepeer/livepeer.go
+++ b/cmd/livepeer/livepeer.go
@@ -162,6 +162,7 @@ func parseLivepeerConfig() starter.LivepeerConfig {
 	cfg.MaxGasPrice = flag.Int("maxGasPrice", *cfg.MaxGasPrice, "Maximum gas price (priority fee + base fee) for ETH transactions in wei, 40 Gwei = 40000000000")
 	cfg.EthController = flag.String("ethController", *cfg.EthController, "Protocol smart contract address")
 	cfg.InitializeRound = flag.Bool("initializeRound", *cfg.InitializeRound, "Set to true if running as a transcoder and the node should automatically initialize new rounds")
+	cfg.InitializeRoundMaxDelay = flag.Duration("initializeRoundMaxDelay", *cfg.InitializeRoundMaxDelay, "Maximum delay to wait before initializing a round")
 	cfg.TicketEV = flag.String("ticketEV", *cfg.TicketEV, "The expected value for PM tickets")
 	cfg.MaxFaceValue = flag.String("maxFaceValue", *cfg.MaxFaceValue, "set max ticket face value in WEI")
 	// Broadcaster max acceptable ticket EV
diff --git a/cmd/livepeer/starter/starter.go b/cmd/livepeer/starter/starter.go
index 2896d9543..f9296abfa 100755
--- a/cmd/livepeer/starter/starter.go
+++ b/cmd/livepeer/starter/starter.go
@@ -74,76 +74,77 @@ const (
 )
 
 type LivepeerConfig struct {
-	Network                *string
-	RtmpAddr               *string
-	CliAddr                *string
-	HttpAddr               *string
-	ServiceAddr            *string
-	OrchAddr               *string
-	VerifierURL            *string
-	EthController          *string
-	VerifierPath           *string
-	LocalVerify            *bool
-	HttpIngest             *bool
-	Orchestrator           *bool
-	Transcoder             *bool
-	Broadcaster            *bool
-	OrchSecret             *string
-	TranscodingOptions     *string
-	MaxAttempts            *int
-	SelectRandWeight       *float64
-	SelectStakeWeight      *float64
-	SelectPriceWeight      *float64
-	SelectPriceExpFactor   *float64
-	OrchPerfStatsURL       *string
-	Region                 *string
-	MaxPricePerUnit        *string
-	MinPerfScore           *float64
-	MaxSessions            *string
-	CurrentManifest        *bool
-	Nvidia                 *string
-	Netint                 *string
-	TestTranscoder         *bool
-	EthAcctAddr            *string
-	EthPassword            *string
-	EthKeystorePath        *string
-	EthOrchAddr            *string
-	EthUrl                 *string
-	TxTimeout              *time.Duration
-	MaxTxReplacements      *int
-	GasLimit               *int
-	MinGasPrice            *int64
-	MaxGasPrice            *int
-	InitializeRound        *bool
-	TicketEV               *string
-	MaxFaceValue           *string
-	MaxTicketEV            *string
-	MaxTotalEV             *string
-	DepositMultiplier      *int
-	PricePerUnit           *string
-	PixelsPerUnit          *string
-	PriceFeedAddr          *string
-	AutoAdjustPrice        *bool
-	PricePerBroadcaster    *string
-	BlockPollingInterval   *int
-	Redeemer               *bool
-	RedeemerAddr           *string
-	Reward                 *bool
-	Monitor                *bool
-	MetricsPerStream       *bool
-	MetricsExposeClientIP  *bool
-	MetadataQueueUri       *string
-	MetadataAmqpExchange   *string
-	MetadataPublishTimeout *time.Duration
-	Datadir                *string
-	Objectstore            *string
-	Recordstore            *string
-	FVfailGsBucket         *string
-	FVfailGsKey            *string
-	AuthWebhookURL         *string
-	OrchWebhookURL         *string
-	OrchBlacklist          *string
-	TestOrchAvail          *bool
+	Network                 *string
+	RtmpAddr                *string
+	CliAddr                 *string
+	HttpAddr                *string
+	ServiceAddr             *string
+	OrchAddr                *string
+	VerifierURL             *string
+	EthController           *string
+	VerifierPath            *string
+	LocalVerify             *bool
+	HttpIngest              *bool
+	Orchestrator            *bool
+	Transcoder              *bool
+	Broadcaster             *bool
+	OrchSecret              *string
+	TranscodingOptions      *string
+	MaxAttempts             *int
+	SelectRandWeight        *float64
+	SelectStakeWeight       *float64
+	SelectPriceWeight       *float64
+	SelectPriceExpFactor    *float64
+	OrchPerfStatsURL        *string
+	Region                  *string
+	MaxPricePerUnit         *string
+	MinPerfScore            *float64
+	MaxSessions             *string
+	CurrentManifest         *bool
+	Nvidia                  *string
+	Netint                  *string
+	TestTranscoder          *bool
+	EthAcctAddr             *string
+	EthPassword             *string
+	EthKeystorePath         *string
+	EthOrchAddr             *string
+	EthUrl                  *string
+	TxTimeout               *time.Duration
+	MaxTxReplacements       *int
+	GasLimit                *int
+	MinGasPrice             *int64
+	MaxGasPrice             *int
+	InitializeRound         *bool
+	InitializeRoundMaxDelay *time.Duration
+	TicketEV                *string
+	MaxFaceValue            *string
+	MaxTicketEV             *string
+	MaxTotalEV              *string
+	DepositMultiplier       *int
+	PricePerUnit            *string
+	PixelsPerUnit           *string
+	PriceFeedAddr           *string
+	AutoAdjustPrice         *bool
+	PricePerBroadcaster     *string
+	BlockPollingInterval    *int
+	Redeemer                *bool
+	RedeemerAddr            *string
+	Reward                  *bool
+	Monitor                 *bool
+	MetricsPerStream        *bool
+	MetricsExposeClientIP   *bool
+	MetadataQueueUri        *string
+	MetadataAmqpExchange    *string
+	MetadataPublishTimeout  *time.Duration
+	Datadir                 *string
+	Objectstore             *string
+	Recordstore             *string
+	FVfailGsBucket          *string
+	FVfailGsKey             *string
+	AuthWebhookURL          *string
+	OrchWebhookURL          *string
+	OrchBlacklist           *string
+	TestOrchAvail           *bool
 }
 
 // DefaultLivepeerConfig creates LivepeerConfig exactly the same as when no flags are passed to the livepeer process.
@@ -190,6 +191,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
 	defaultMaxGasPrice := 0
 	defaultEthController := ""
 	defaultInitializeRound := false
+	defaultInitializeRoundMaxDelay := 30 * time.Second
 	defaultTicketEV := "8000000000"
 	defaultMaxFaceValue := "0"
 	defaultMaxTicketEV := "3000000000000"
@@ -264,36 +266,37 @@ func DefaultLivepeerConfig() LivepeerConfig {
 		TestTranscoder:       &defaultTestTranscoder,
 
 		// Onchain:
-		EthAcctAddr:            &defaultEthAcctAddr,
-		EthPassword:            &defaultEthPassword,
-		EthKeystorePath:        &defaultEthKeystorePath,
-		EthOrchAddr:            &defaultEthOrchAddr,
-		EthUrl:                 &defaultEthUrl,
-		TxTimeout:              &defaultTxTimeout,
-		MaxTxReplacements:      &defaultMaxTxReplacements,
-		GasLimit:               &defaultGasLimit,
-		MaxGasPrice:            &defaultMaxGasPrice,
-		EthController:          &defaultEthController,
-		InitializeRound:        &defaultInitializeRound,
-		TicketEV:               &defaultTicketEV,
-		MaxFaceValue:           &defaultMaxFaceValue,
-		MaxTicketEV:            &defaultMaxTicketEV,
-		MaxTotalEV:             &defaultMaxTotalEV,
-		DepositMultiplier:      &defaultDepositMultiplier,
-		MaxPricePerUnit:        &defaultMaxPricePerUnit,
-		PixelsPerUnit:          &defaultPixelsPerUnit,
-		PriceFeedAddr:          &defaultPriceFeedAddr,
-		AutoAdjustPrice:        &defaultAutoAdjustPrice,
-		PricePerBroadcaster:    &defaultPricePerBroadcaster,
-		BlockPollingInterval:   &defaultBlockPollingInterval,
-		Redeemer:               &defaultRedeemer,
-		RedeemerAddr:           &defaultRedeemerAddr,
-		Monitor:                &defaultMonitor,
-		MetricsPerStream:       &defaultMetricsPerStream,
-		MetricsExposeClientIP:  &defaultMetricsExposeClientIP,
-		MetadataQueueUri:       &defaultMetadataQueueUri,
-		MetadataAmqpExchange:   &defaultMetadataAmqpExchange,
-		MetadataPublishTimeout: &defaultMetadataPublishTimeout,
+		EthAcctAddr:             &defaultEthAcctAddr,
+		EthPassword:             &defaultEthPassword,
+		EthKeystorePath:         &defaultEthKeystorePath,
+		EthOrchAddr:             &defaultEthOrchAddr,
+		EthUrl:                  &defaultEthUrl,
+		TxTimeout:               &defaultTxTimeout,
+		MaxTxReplacements:       &defaultMaxTxReplacements,
+		GasLimit:                &defaultGasLimit,
+		MaxGasPrice:             &defaultMaxGasPrice,
+		EthController:           &defaultEthController,
+		InitializeRound:         &defaultInitializeRound,
+		InitializeRoundMaxDelay: &defaultInitializeRoundMaxDelay,
+		TicketEV:                &defaultTicketEV,
+		MaxFaceValue:            &defaultMaxFaceValue,
+		MaxTicketEV:             &defaultMaxTicketEV,
+		MaxTotalEV:              &defaultMaxTotalEV,
+		DepositMultiplier:       &defaultDepositMultiplier,
+		MaxPricePerUnit:         &defaultMaxPricePerUnit,
+		PixelsPerUnit:           &defaultPixelsPerUnit,
+		PriceFeedAddr:           &defaultPriceFeedAddr,
+		AutoAdjustPrice:         &defaultAutoAdjustPrice,
+		PricePerBroadcaster:     &defaultPricePerBroadcaster,
+		BlockPollingInterval:    &defaultBlockPollingInterval,
+		Redeemer:                &defaultRedeemer,
+		RedeemerAddr:            &defaultRedeemerAddr,
+		Monitor:                 &defaultMonitor,
+		MetricsPerStream:        &defaultMetricsPerStream,
+		MetricsExposeClientIP:   &defaultMetricsExposeClientIP,
+		MetadataQueueUri:        &defaultMetadataQueueUri,
+		MetadataAmqpExchange:    &defaultMetadataAmqpExchange,
+		MetadataPublishTimeout:  &defaultMetadataPublishTimeout,
 
 		// Ingest:
 		HttpIngest: &defaultHttpIngest,
@@ -975,7 +978,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
 		if *cfg.InitializeRound {
 			// Start round initializer
 			// The node will only initialize rounds if it in the upcoming active set for the round
-			initializer := eth.NewRoundInitializer(n.Eth, timeWatcher)
+			initializer := eth.NewRoundInitializer(n.Eth, timeWatcher, *cfg.InitializeRoundMaxDelay)
 			go func() {
 				if err := initializer.Start(); err != nil {
 					serviceErr <- err
diff --git a/eth/roundinitializer.go b/eth/roundinitializer.go
index d3f2cbeee..a7aaff666 100644
--- a/eth/roundinitializer.go
+++ b/eth/roundinitializer.go
@@ -2,10 +2,11 @@ package eth
 
 import (
 	"math/big"
+	"math/rand"
 	"sync"
+	"time"
 
 	"github.com/ethereum/go-ethereum/core/types"
-	"github.com/ethereum/go-ethereum/crypto"
 	"github.com/ethereum/go-ethereum/event"
 	"github.com/golang/glog"
 )
@@ -29,20 +30,22 @@ type timeWatcher interface {
 // This selection process is purely a client side implementation that attempts to minimize on-chain transaction collisions, but
 // collisions are still possible if initialization transactions are submitted by parties that are not using this selection process
 type RoundInitializer struct {
-	client LivepeerEthClient
-	tw     timeWatcher
-	quit   chan struct{}
+	maxDelay time.Duration
+	client   LivepeerEthClient
+	tw       timeWatcher
+	quit     chan struct{}
 
 	nextRoundStartL1Block *big.Int
 	mu                    sync.Mutex
 }
 
 // NewRoundInitializer creates a RoundInitializer instance
-func NewRoundInitializer(client LivepeerEthClient, tw timeWatcher) *RoundInitializer {
+func NewRoundInitializer(client LivepeerEthClient, tw timeWatcher, maxDelay time.Duration) *RoundInitializer {
 	return &RoundInitializer{
-		client: client,
-		tw:     tw,
-		quit:   make(chan struct{}),
+		maxDelay: maxDelay,
+		client:   client,
+		tw:       tw,
+		quit:     make(chan struct{}),
 	}
 }
 
@@ -104,23 +107,23 @@ func (r *RoundInitializer) tryInitialize() error {
 	r.mu.Lock()
 	defer r.mu.Unlock()
 
-	currentL1Blk := r.tw.LastSeenL1Block()
-	lastInitializedL1BlkHash := r.tw.LastInitializedL1BlockHash()
-
-	epochSeed := r.currentEpochSeed(currentL1Blk, r.nextRoundStartL1Block, lastInitializedL1BlkHash)
-
-	ok, err := r.shouldInitialize(epochSeed)
-	if err != nil {
-		return err
+	if r.tw.LastSeenL1Block().Cmp(r.nextRoundStartL1Block) < 0 {
+		// Round already initialized
+		return nil
 	}
 
-	// Noop if the caller should not initialize the round
-	if !ok {
-		return nil
+	if r.maxDelay > 0 {
+		randDelay := time.Duration(rand.Int63n(int64(r.maxDelay)))
+		glog.Infof("Waiting %v before attempting to initialize round", randDelay)
+		time.Sleep(randDelay)
+
+		if r.tw.LastSeenL1Block().Cmp(r.nextRoundStartL1Block) < 0 {
+			glog.Infof("Round is already initialized, not initializing")
+			return nil
+		}
 	}
 
 	currentRound := new(big.Int).Add(r.tw.LastInitializedRound(), big.NewInt(1))
-
 	glog.Infof("New round - preparing to initialize round to join active set, current round is %d", currentRound)
 
 	tx, err := r.client.InitializeRound()
@@ -136,55 +139,3 @@ func (r *RoundInitializer) tryInitialize() error {
 
 	return nil
 }
-
-func (r *RoundInitializer) shouldInitialize(epochSeed *big.Int) (bool, error) {
-	transcoders, err := r.client.TranscoderPool()
-	if err != nil {
-		return false, err
-	}
-
-	numActive := big.NewInt(int64(len(transcoders)))
-
-	// Should not initialize if the upcoming active set is empty
-	if numActive.Cmp(big.NewInt(0)) == 0 {
-		return false, nil
-	}
-
-	// Find the caller's rank in the upcoming active set
-	rank := int64(-1)
-	maxRank := numActive.Int64()
-	caller := r.client.Account().Address
-	for i := int64(0); i < maxRank; i++ {
-		if transcoders[i].Address == caller {
-			rank = i
-			break
-		}
-	}
-
-	// Should not initialize if the caller is not in the upcoming active set
-	if rank == -1 {
-		return false, nil
-	}
-
-	// Use the seed to select a position within the active set
-	selection := new(big.Int).Mod(epochSeed, numActive)
-	// Should not initialize if the selection does not match the caller's rank in the active set
-	if selection.Int64() != int64(rank) {
-		return false, nil
-	}
-
-	// If the selection matches the caller's rank the caller should initialize the round
-	return true, nil
-}
-
-// Returns the seed used to select a round initializer in the current epoch for the current round
-// This seed is not meant to be unpredictable. The only requirement for the seed is that it is calculated the same way for each
-// party running the round initializer
-func (r *RoundInitializer) currentEpochSeed(currentL1Block, roundStartL1Block *big.Int, lastInitializedL1BlkHash [32]byte) *big.Int {
-	epochNum := new(big.Int).Sub(currentL1Block, roundStartL1Block)
-	epochNum.Div(epochNum, epochL1Blocks)
-
-	// The seed for the current epoch is calculated as:
-	// keccak256(lastInitializedL1BlkHash | epochNum)
-	return crypto.Keccak256Hash(append(lastInitializedL1BlkHash[:], epochNum.Bytes()...)).Big()
-}
diff --git a/eth/roundinitializer_test.go b/eth/roundinitializer_test.go
index 25e60205a..a96cb3b2f 100644
--- a/eth/roundinitializer_test.go
+++ b/eth/roundinitializer_test.go
@@ -16,84 +16,6 @@ import (
 	"github.com/stretchr/testify/mock"
 )
 
-func TestRoundInitializer_CurrentEpochSeed(t *testing.T) {
-	initializer := NewRoundInitializer(nil, nil)
-
-	assert := assert.New(t)
-
-	// Test epochNum = 0
-	blkHash := [32]byte{123}
-
-	epochSeed := initializer.currentEpochSeed(big.NewInt(5), big.NewInt(5), blkHash)
-	// epochNum = (5 - 5) / 5 = 0
-	// epochSeed = keccak256(blkHash | 0) = 53205358842179480591542570540016728811976439286094436690881169143335261643310
-	expEpochSeed, _ := new(big.Int).SetString("53205358842179480591542570540016728811976439286094436690881169143335261643310", 10)
-	assert.Equal(expEpochSeed, epochSeed)
-
-	// Test epochNum > 0
-	epochSeed = initializer.currentEpochSeed(big.NewInt(20), big.NewInt(5), blkHash)
-	// epochNum = (20 - 5) / 5 = 3
-	// epochSeed = keccak256(blkHash | 3) = 42541119854153860846042329644941941146216657514071318786342840580076059276721
-	expEpochSeed.SetString("42541119854153860846042329644941941146216657514071318786342840580076059276721", 10)
-	assert.Equal(expEpochSeed, epochSeed)
-
-	// Test epochNum > 0 with some # of blocks into the epoch
-	epochSeed = initializer.currentEpochSeed(big.NewInt(20), big.NewInt(4), blkHash)
-	// epochNum = (20 - 4) / 5 = 3.2 -> 3
-	assert.Equal(expEpochSeed, epochSeed)
-}
-
-func TestRoundInitializer_ShouldInitialize(t *testing.T) {
-	client := &MockClient{}
-	tw := &stubTimeWatcher{}
-	initializer := NewRoundInitializer(client, tw)
-
-	assert := assert.New(t)
-
-	// Test error getting transcoders
-	expErr := errors.New("TranscoderPool error")
-	client.On("TranscoderPool").Return(nil, expErr).Once()
-
-	ok, err := initializer.shouldInitialize(nil)
-	assert.EqualError(err, expErr.Error())
-	assert.False(ok)
-
-	// Test active set is empty because no registered transcoders
-	client.On("TranscoderPool").Return([]*lpTypes.Transcoder{}, nil).Once()
-	ok, err = initializer.shouldInitialize(nil)
-	assert.Nil(err)
-	assert.False(ok)
-
-	// Test that caller is not in active set because it is not registered
-	caller := ethcommon.BytesToAddress([]byte("foo"))
-	client.On("Account").Return(accounts.Account{Address: caller})
-
-	registered := []*lpTypes.Transcoder{
-		{Address: ethcommon.BytesToAddress([]byte("jar"))},
-		{Address: ethcommon.BytesToAddress([]byte("bar"))},
-	}
-	client.On("TranscoderPool").Return(registered, nil).Once()
-
-	ok, err = initializer.shouldInitialize(nil)
-	assert.Nil(err)
-	assert.False(ok)
-
-	// Test not selected
-	registered = append(registered, &lpTypes.Transcoder{Address: caller})
-	client.On("TranscoderPool").Return(registered, nil)
-
-	seed := big.NewInt(3)
-	ok, err = initializer.shouldInitialize(seed)
-	assert.Nil(err)
-	assert.False(ok)
-
-	// Test caller selected
-	seed = big.NewInt(5)
-	ok, err = initializer.shouldInitialize(seed)
-	assert.Nil(err)
-	assert.True(ok)
-}
-
 func TestRoundInitializer_TryInitialize(t *testing.T) {
 	client := &MockClient{}
 	tw := &stubTimeWatcher{
@@ -101,45 +23,17 @@ func TestRoundInitializer_TryInitialize(t *testing.T) {
 		lastInitializedRound:     big.NewInt(100),
 		lastInitializedBlockHash: [32]byte{123},
 	}
-	initializer := NewRoundInitializer(client, tw)
+	initializer := NewRoundInitializer(client, tw, 0)
 	initializer.nextRoundStartL1Block = big.NewInt(5)
 	assert := assert.New(t)
 
-	// Test error checking should initialize
-	expErr := errors.New("shouldInitialize error")
-	client.On("TranscoderPool").Return(nil, expErr).Once()
-
-	err := initializer.tryInitialize()
-	assert.EqualError(err, expErr.Error())
-
-	// Test should not initialize
-	caller := ethcommon.BytesToAddress([]byte("foo"))
-	client.On("Account").Return(accounts.Account{Address: caller})
-
-	registered := []*lpTypes.Transcoder{
-		{Address: ethcommon.BytesToAddress([]byte("jar"))},
-	}
-	client.On("TranscoderPool").Return(registered, nil).Once()
-
-	err = initializer.tryInitialize()
-	assert.Nil(err)
-
-	// Test error when submitting initialization tx
-	registered = []*lpTypes.Transcoder{{Address: caller}}
-	client.On("TranscoderPool").Return(registered, nil)
-	expErr = errors.New("InitializeRound error")
-	client.On("InitializeRound").Return(nil, expErr).Once()
-
-	err = initializer.tryInitialize()
-	assert.EqualError(err, expErr.Error())
-
 	// Test error checking initialization tx
 	tx := &types.Transaction{}
 	client.On("InitializeRound").Return(tx, nil)
-	expErr = errors.New("CheckTx error")
+	expErr := errors.New("CheckTx error")
 	client.On("CheckTx", mock.Anything).Return(expErr).Once()
 
-	err = initializer.tryInitialize()
+	err := initializer.tryInitialize()
 	assert.EqualError(err, expErr.Error())
 
 	// Test success