Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions gossip/c_block_callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"

"github.com/Fantom-foundation/go-opera/evmcore"
"github.com/Fantom-foundation/go-opera/gossip/blockproc/verwatcher"
"github.com/Fantom-foundation/go-opera/gossip/emitter"
"github.com/Fantom-foundation/go-opera/gossip/evmstore"
Expand Down Expand Up @@ -398,14 +397,11 @@ func consensusCallbackBeginBlockFn(

// Notify about new block
if feed != nil {
feed.newBlock.Send(evmcore.ChainHeadNotify{Block: evmBlock})
var logs []*types.Log
for _, r := range allReceipts {
for _, l := range r.Logs {
logs = append(logs, l)
}
logs = append(logs, r.Logs...)
}
feed.newLogs.Send(logs)
feed.notifyAboutNewBlock(evmBlock, logs)
}

now := time.Now()
Expand Down
7 changes: 5 additions & 2 deletions gossip/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,13 +197,15 @@ func newTestEnv(firstEpoch idx.Epoch, validatorsNum idx.Validator, tb testing.TB
em.Start()
}

env.feed.Start(store.evm)
env.blockProcTasks.Start(1)
env.verWatcher.Start()

return env
}

func (env *testEnv) Close() {
env.feed.Stop()
env.verWatcher.Stop()
env.store.Close()
env.tflusher.Stop()
Expand All @@ -220,8 +222,6 @@ func (env *testEnv) ApplyTxs(spent time.Duration, txs ...*types.Transaction) (ty

externalReceipts := make(types.Receipts, 0, len(txs))

env.txpool.AddRemotes(txs)
defer env.txpool.(*dummyTxPool).Clear()
newBlocks := make(chan evmcore.ChainHeadNotify)
chainHeadSub := env.feed.SubscribeNewBlock(newBlocks)
mu := &sync.Mutex{}
Expand All @@ -246,6 +246,9 @@ func (env *testEnv) ApplyTxs(spent time.Duration, txs ...*types.Transaction) (ty
}
}
}()
env.txpool.AddRemotes(txs)
defer env.txpool.(*dummyTxPool).Clear()

err := env.EmitUntil(func() bool {
mu.Lock()
defer mu.Unlock()
Expand Down
13 changes: 9 additions & 4 deletions gossip/mps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,18 +575,23 @@ func TestMisbehaviourProofsWrongBlockEpoch(t *testing.T) {
require.ErrorIs(err, heavycheck.ErrUnknownEpochBVs)

goodEpochMp := copyMP(correctMp)
goodEpochMp.WrongBlockVote.Pals[0].Val.Epoch = env.store.FindBlockEpoch(goodEpochMp.WrongBlockVote.Block)
goodEpochMp.WrongBlockVote.Pals[1].Val.Epoch = env.store.FindBlockEpoch(goodEpochMp.WrongBlockVote.Block)
// Get epoch number when vote will occur
epoch := env.store.FindBlockEpoch(goodEpochMp.WrongBlockVote.Block)
goodEpochMp.WrongBlockVote.Pals[0].Val.Epoch = epoch - 1
goodEpochMp.WrongBlockVote.Pals[1].Val.Epoch = epoch - 1
err = env.ApplyMPs(nextEpoch, goodEpochMp)
require.ErrorIs(err, heavycheck.ErrWrongPayloadHash)
sign(&goodEpochMp)
err = env.ApplyMPs(nextEpoch, goodEpochMp)
require.NoError(err)
require.Equal(idx.Validator(3), env.store.GetValidators().Len())
// Get epoch state, before validators penalty
epochState := env.store.GetHistoryEpochState(epoch)
require.Equal(idx.Validator(3), epochState.Validators.Len())

err = env.ApplyMPs(nextEpoch, correctMp)
require.NoError(err)
require.Equal(idx.Validator(1), env.store.GetValidators().Len())
epochState = env.store.GetHistoryEpochState(epoch + 1)
require.Equal(idx.Validator(1), epochState.Validators.Len())
require.False(env.store.GetValidators().Exists(1))
require.False(env.store.GetValidators().Exists(2))
}
Expand Down
98 changes: 91 additions & 7 deletions gossip/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"math/big"
"math/rand"
"slices"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -41,6 +42,7 @@ import (
"github.com/Fantom-foundation/go-opera/gossip/blockproc/sealmodule"
"github.com/Fantom-foundation/go-opera/gossip/blockproc/verwatcher"
"github.com/Fantom-foundation/go-opera/gossip/emitter"
"github.com/Fantom-foundation/go-opera/gossip/evmstore"
"github.com/Fantom-foundation/go-opera/gossip/filters"
"github.com/Fantom-foundation/go-opera/gossip/gasprice"
"github.com/Fantom-foundation/go-opera/gossip/proclogger"
Expand All @@ -60,6 +62,15 @@ type ServiceFeed struct {
newEmittedEvent notify.Feed
newBlock notify.Feed
newLogs notify.Feed

incomingUpdates chan<- feedUpdate // < channel to send updates to the background feed loop
stopFeeder chan<- struct{} // < if closed, the background feed loop will stop
feederDone <-chan struct{} // < if closed, the background feed loop has stopped
}

type feedUpdate struct {
block *evmcore.EvmBlock
logs []*types.Log
}

func (f *ServiceFeed) SubscribeNewEpoch(ch chan<- idx.Epoch) notify.Subscription {
Expand All @@ -78,6 +89,77 @@ func (f *ServiceFeed) SubscribeNewLogs(ch chan<- []*types.Log) notify.Subscripti
return f.scope.Track(f.newLogs.Subscribe(ch))
}

func (f *ServiceFeed) Start(store *evmstore.Store) {
incoming := make(chan feedUpdate, 1024)
f.incomingUpdates = incoming
stop := make(chan struct{})
done := make(chan struct{})
f.stopFeeder = stop
f.feederDone = done
go func() {
defer close(done)
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
pending := []feedUpdate{}
for {
select {
case <-stop:
return
case update := <-incoming:
pending = append(pending, update)
// sorting could be replaced by a heap or skipped if updates
// are guaranteed to be delivered in order.
slices.SortFunc(pending, func(a, b feedUpdate) int {
return a.block.Number.Cmp(b.block.Number)
})

case <-ticker.C:
}

if len(pending) == 0 {
continue
}

height, empty, err := store.GetArchiveBlockHeight()
if err != nil {
log.Error("failed to get archive block height", "err", err)
continue
}
if empty {
continue
}
for _, update := range pending {
if update.block.Number.Uint64() > height {
break
}
f.newBlock.Send(evmcore.ChainHeadNotify{Block: update.block})
f.newLogs.Send(update.logs)
pending = pending[1:]
}
}
}()
}

func (f *ServiceFeed) notifyAboutNewBlock(
block *evmcore.EvmBlock,
logs []*types.Log,
) {
f.incomingUpdates <- feedUpdate{
block: block,
logs: logs,
}
}

func (f *ServiceFeed) Stop() {
if f.stopFeeder == nil {
return
}
close(f.stopFeeder)
f.stopFeeder = nil
<-f.feederDone
f.scope.Close()
}

type BlockProc struct {
SealerModule blockproc.SealerModule
TxListenerModule blockproc.TxListenerModule
Expand Down Expand Up @@ -131,7 +213,7 @@ type Service struct {
blockBusyFlag uint32
eventBusyFlag uint32

feed ServiceFeed
feed ServiceFeed

gpo *gasprice.Oracle

Expand Down Expand Up @@ -250,11 +332,11 @@ func newService(config Config, store *Store, blockProc BlockProc, engine lachesi
defer done()
return svc.processEvent(event)
},
SwitchEpochTo: svc.SwitchEpochTo,
BVs: svc.ProcessBlockVotes,
BR: svc.ProcessFullBlockRecord,
EV: svc.ProcessEpochVote,
ER: svc.ProcessFullEpochRecord,
SwitchEpochTo: svc.SwitchEpochTo,
BVs: svc.ProcessBlockVotes,
BR: svc.ProcessFullBlockRecord,
EV: svc.ProcessEpochVote,
ER: svc.ProcessFullEpochRecord,
},
})
if err != nil {
Expand Down Expand Up @@ -435,6 +517,8 @@ func (s *Service) Start() error {
if s.store.evm.CheckLiveStateHash(blockState.LastBlock.Idx, blockState.FinalizedStateRoot) != nil {
return errors.New("fullsync isn't possible because state root is missing")
}
// start notification feeder
s.feed.Start(s.store.evm)

// start blocks processor
s.blockProcTasks.Start(1)
Expand Down Expand Up @@ -475,7 +559,7 @@ func (s *Service) Stop() error {
s.operaDialCandidates.Close()

s.handler.Stop()
s.feed.scope.Close()
s.feed.Stop()
s.gpo.Stop()
// it's safe to stop tflusher only before locking engineMu
s.tflusher.Stop()
Expand Down