diff --git a/gossip/c_block_callbacks.go b/gossip/c_block_callbacks.go index 72486c55e..330c000e9 100644 --- a/gossip/c_block_callbacks.go +++ b/gossip/c_block_callbacks.go @@ -18,7 +18,6 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" - "github.com/Fantom-foundation/go-opera/evmcore" "github.com/Fantom-foundation/go-opera/gossip/blockproc/verwatcher" "github.com/Fantom-foundation/go-opera/gossip/emitter" "github.com/Fantom-foundation/go-opera/gossip/evmstore" @@ -398,14 +397,11 @@ func consensusCallbackBeginBlockFn( // Notify about new block if feed != nil { - feed.newBlock.Send(evmcore.ChainHeadNotify{Block: evmBlock}) var logs []*types.Log for _, r := range allReceipts { - for _, l := range r.Logs { - logs = append(logs, l) - } + logs = append(logs, r.Logs...) } - feed.newLogs.Send(logs) + feed.notifyAboutNewBlock(evmBlock, logs) } now := time.Now() diff --git a/gossip/common_test.go b/gossip/common_test.go index 567050384..5e12d3839 100644 --- a/gossip/common_test.go +++ b/gossip/common_test.go @@ -197,6 +197,7 @@ func newTestEnv(firstEpoch idx.Epoch, validatorsNum idx.Validator, tb testing.TB em.Start() } + env.feed.Start(store.evm) env.blockProcTasks.Start(1) env.verWatcher.Start() @@ -204,6 +205,7 @@ func newTestEnv(firstEpoch idx.Epoch, validatorsNum idx.Validator, tb testing.TB } func (env *testEnv) Close() { + env.feed.Stop() env.verWatcher.Stop() env.store.Close() env.tflusher.Stop() @@ -220,8 +222,6 @@ func (env *testEnv) ApplyTxs(spent time.Duration, txs ...*types.Transaction) (ty externalReceipts := make(types.Receipts, 0, len(txs)) - env.txpool.AddRemotes(txs) - defer env.txpool.(*dummyTxPool).Clear() newBlocks := make(chan evmcore.ChainHeadNotify) chainHeadSub := env.feed.SubscribeNewBlock(newBlocks) mu := &sync.Mutex{} @@ -246,6 +246,9 @@ func (env *testEnv) ApplyTxs(spent time.Duration, txs ...*types.Transaction) (ty } } }() + env.txpool.AddRemotes(txs) + defer env.txpool.(*dummyTxPool).Clear() + err := env.EmitUntil(func() bool { mu.Lock() defer mu.Unlock() diff --git a/gossip/mps_test.go b/gossip/mps_test.go index d7c6ff9ba..4b7fd7156 100644 --- a/gossip/mps_test.go +++ b/gossip/mps_test.go @@ -575,18 +575,23 @@ func TestMisbehaviourProofsWrongBlockEpoch(t *testing.T) { require.ErrorIs(err, heavycheck.ErrUnknownEpochBVs) goodEpochMp := copyMP(correctMp) - goodEpochMp.WrongBlockVote.Pals[0].Val.Epoch = env.store.FindBlockEpoch(goodEpochMp.WrongBlockVote.Block) - goodEpochMp.WrongBlockVote.Pals[1].Val.Epoch = env.store.FindBlockEpoch(goodEpochMp.WrongBlockVote.Block) + // Get epoch number when vote will occur + epoch := env.store.FindBlockEpoch(goodEpochMp.WrongBlockVote.Block) + goodEpochMp.WrongBlockVote.Pals[0].Val.Epoch = epoch - 1 + goodEpochMp.WrongBlockVote.Pals[1].Val.Epoch = epoch - 1 err = env.ApplyMPs(nextEpoch, goodEpochMp) require.ErrorIs(err, heavycheck.ErrWrongPayloadHash) sign(&goodEpochMp) err = env.ApplyMPs(nextEpoch, goodEpochMp) require.NoError(err) - require.Equal(idx.Validator(3), env.store.GetValidators().Len()) + // Get epoch state, before validators penalty + epochState := env.store.GetHistoryEpochState(epoch) + require.Equal(idx.Validator(3), epochState.Validators.Len()) err = env.ApplyMPs(nextEpoch, correctMp) require.NoError(err) - require.Equal(idx.Validator(1), env.store.GetValidators().Len()) + epochState = env.store.GetHistoryEpochState(epoch + 1) + require.Equal(idx.Validator(1), epochState.Validators.Len()) require.False(env.store.GetValidators().Exists(1)) require.False(env.store.GetValidators().Exists(2)) } diff --git a/gossip/service.go b/gossip/service.go index 3d49c3cd9..c74a86644 100644 --- a/gossip/service.go +++ b/gossip/service.go @@ -5,6 +5,7 @@ import ( "fmt" "math/big" "math/rand" + "slices" "sync" "sync/atomic" "time" @@ -41,6 +42,7 @@ import ( "github.com/Fantom-foundation/go-opera/gossip/blockproc/sealmodule" "github.com/Fantom-foundation/go-opera/gossip/blockproc/verwatcher" "github.com/Fantom-foundation/go-opera/gossip/emitter" + "github.com/Fantom-foundation/go-opera/gossip/evmstore" "github.com/Fantom-foundation/go-opera/gossip/filters" "github.com/Fantom-foundation/go-opera/gossip/gasprice" "github.com/Fantom-foundation/go-opera/gossip/proclogger" @@ -60,6 +62,15 @@ type ServiceFeed struct { newEmittedEvent notify.Feed newBlock notify.Feed newLogs notify.Feed + + incomingUpdates chan<- feedUpdate // < channel to send updates to the background feed loop + stopFeeder chan<- struct{} // < if closed, the background feed loop will stop + feederDone <-chan struct{} // < if closed, the background feed loop has stopped +} + +type feedUpdate struct { + block *evmcore.EvmBlock + logs []*types.Log } func (f *ServiceFeed) SubscribeNewEpoch(ch chan<- idx.Epoch) notify.Subscription { @@ -78,6 +89,77 @@ func (f *ServiceFeed) SubscribeNewLogs(ch chan<- []*types.Log) notify.Subscripti return f.scope.Track(f.newLogs.Subscribe(ch)) } +func (f *ServiceFeed) Start(store *evmstore.Store) { + incoming := make(chan feedUpdate, 1024) + f.incomingUpdates = incoming + stop := make(chan struct{}) + done := make(chan struct{}) + f.stopFeeder = stop + f.feederDone = done + go func() { + defer close(done) + ticker := time.NewTicker(10 * time.Millisecond) + defer ticker.Stop() + pending := []feedUpdate{} + for { + select { + case <-stop: + return + case update := <-incoming: + pending = append(pending, update) + // sorting could be replaced by a heap or skipped if updates + // are guaranteed to be delivered in order. + slices.SortFunc(pending, func(a, b feedUpdate) int { + return a.block.Number.Cmp(b.block.Number) + }) + + case <-ticker.C: + } + + if len(pending) == 0 { + continue + } + + height, empty, err := store.GetArchiveBlockHeight() + if err != nil { + log.Error("failed to get archive block height", "err", err) + continue + } + if empty { + continue + } + for _, update := range pending { + if update.block.Number.Uint64() > height { + break + } + f.newBlock.Send(evmcore.ChainHeadNotify{Block: update.block}) + f.newLogs.Send(update.logs) + pending = pending[1:] + } + } + }() +} + +func (f *ServiceFeed) notifyAboutNewBlock( + block *evmcore.EvmBlock, + logs []*types.Log, +) { + f.incomingUpdates <- feedUpdate{ + block: block, + logs: logs, + } +} + +func (f *ServiceFeed) Stop() { + if f.stopFeeder == nil { + return + } + close(f.stopFeeder) + f.stopFeeder = nil + <-f.feederDone + f.scope.Close() +} + type BlockProc struct { SealerModule blockproc.SealerModule TxListenerModule blockproc.TxListenerModule @@ -131,7 +213,7 @@ type Service struct { blockBusyFlag uint32 eventBusyFlag uint32 - feed ServiceFeed + feed ServiceFeed gpo *gasprice.Oracle @@ -250,11 +332,11 @@ func newService(config Config, store *Store, blockProc BlockProc, engine lachesi defer done() return svc.processEvent(event) }, - SwitchEpochTo: svc.SwitchEpochTo, - BVs: svc.ProcessBlockVotes, - BR: svc.ProcessFullBlockRecord, - EV: svc.ProcessEpochVote, - ER: svc.ProcessFullEpochRecord, + SwitchEpochTo: svc.SwitchEpochTo, + BVs: svc.ProcessBlockVotes, + BR: svc.ProcessFullBlockRecord, + EV: svc.ProcessEpochVote, + ER: svc.ProcessFullEpochRecord, }, }) if err != nil { @@ -435,6 +517,8 @@ func (s *Service) Start() error { if s.store.evm.CheckLiveStateHash(blockState.LastBlock.Idx, blockState.FinalizedStateRoot) != nil { return errors.New("fullsync isn't possible because state root is missing") } + // start notification feeder + s.feed.Start(s.store.evm) // start blocks processor s.blockProcTasks.Start(1) @@ -475,7 +559,7 @@ func (s *Service) Stop() error { s.operaDialCandidates.Close() s.handler.Stop() - s.feed.scope.Close() + s.feed.Stop() s.gpo.Stop() // it's safe to stop tflusher only before locking engineMu s.tflusher.Stop()