Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ var (
// sealToBroadcastTimer measures latency from seal+write completion to broadcast start.
// This captures event delivery delay through the TypeMux subscription channel.
sealToBroadcastTimer = metrics.NewRegisteredTimer("eth/seal2broadcast", nil)
// broadcastLoopTimer measures the time spent in each iteration of minedBroadcastLoop,
// covering both block propagation and witness hash announcement to all peers.
broadcastLoopTimer = metrics.NewRegisteredTimer("eth/broadcast_loop_duration", nil)
)

// txPool defines the methods needed from a transaction pool implementation to
Expand Down Expand Up @@ -826,8 +829,10 @@ func (h *handler) minedBroadcastLoop() {
delay := common.PrettyDuration(time.Millisecond * time.Duration(delayInMs))
log.Info("[block tracker] Broadcasting mined block", "number", ev.Block.NumberU64(), "hash", ev.Block.Hash(), "blockTime", ev.Block.Time(), "now", now.Unix(), "delay", delay, "delayInMs", delayInMs, "sealToBroadcast", common.PrettyDuration(sealToBcast))
}
loopStart := time.Now()
h.BroadcastBlock(ev.Block, ev.Witness, true) // First propagate block to peers
h.BroadcastBlock(ev.Block, ev.Witness, false) // Only then announce to the rest
broadcastLoopTimer.Update(time.Since(loopStart))
}
}
}
Expand Down
48 changes: 9 additions & 39 deletions eth/protocols/wit/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ type Peer struct {
resDispatch chan *response // Dispatch channel to fulfill witness requests

term chan struct{} // Termination channel to stop the broadcaster
// TODO(@pratikspatil024) - review all the instances of the lock and unlock
// and see if we can use a more efficient locking strategy
lock sync.RWMutex // Mutex protecting the internal fields
}

// NewPeer creates a new WIT peer and starts its background processes.
Expand Down Expand Up @@ -77,9 +74,6 @@ func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, logger log.Logger)

// sendWitness sends witness to the peer
func (p *Peer) sendNewWitness(witness *stateless.Witness) error {
p.lock.Lock()
defer p.lock.Unlock()

p.knownWitnesses.Add(witness.Header().Hash())

return p2p.Send(p.rw, NewWitnessMsg, &NewWitnessPacket{
Expand All @@ -96,9 +90,6 @@ func (p *Peer) sendNewWitnessHashes(packet *NewWitnessHashesPacket) error {
// witness will be sent in the background to avoid blocking the caller. If the
// queue is full, the witness will be dropped.
func (p *Peer) AsyncSendNewWitness(witness *stateless.Witness) {
p.lock.RLock()
defer p.lock.RUnlock()

log.Debug("AsyncSendNewWitness", "hash", witness.Header().Hash(), "peer", p.id)

// Queue the witness for broadcast
Expand All @@ -113,9 +104,6 @@ func (p *Peer) AsyncSendNewWitness(witness *stateless.Witness) {

// AsyncSendNewWitnessHash queues witness hash for broadcast to the peer.
func (p *Peer) AsyncSendNewWitnessHash(hash common.Hash, number uint64) {
p.lock.RLock()
defer p.lock.RUnlock()

// Queue the witness hashes for broadcast
select {
case p.queuedWitnessAnns <- &NewWitnessHashesPacket{
Expand All @@ -130,9 +118,6 @@ func (p *Peer) AsyncSendNewWitnessHash(hash common.Hash, number uint64) {

// RequestWitness sends a request to the peer for witnesses by witness pages.
func (p *Peer) RequestWitness(witnessPages []WitnessPageRequest, sink chan *Response) (*Request, error) {
p.lock.Lock()
defer p.lock.Unlock()

log.Debug("Requesting witnesses", "peer", p.id, "count", len(witnessPages))
id := rand.Uint64()

Expand All @@ -156,9 +141,6 @@ func (p *Peer) RequestWitness(witnessPages []WitnessPageRequest, sink chan *Resp

// RequestWitnessMetadata sends a request to the peer for witness metadata (page count only).
func (p *Peer) RequestWitnessMetadata(hashes []common.Hash, sink chan *Response) (*Request, error) {
p.lock.Lock()
defer p.lock.Unlock()

log.Debug("Requesting witness metadata", "peer", p.id, "count", len(hashes))
id := rand.Uint64()

Expand Down Expand Up @@ -204,45 +186,30 @@ func (p *Peer) Log() log.Logger {

// KnownWitnesses retrieves the set of witness hashes known to be known by this peer.
func (p *Peer) KnownWitnesses() *KnownCache {
p.lock.RLock()
defer p.lock.RUnlock()
return p.knownWitnesses
}

// AddKnownWitnesses adds a witness hash to the set of known witness hashes.
func (p *Peer) AddKnownWitness(hash common.Hash) {
p.lock.Lock()
defer p.lock.Unlock()
p.knownWitnesses.Add(hash)
}

// KnownWitnessesCount returns the number of known witness.
func (p *Peer) KnownWitnessesCount() int {
p.lock.RLock()
defer p.lock.RUnlock()
return p.knownWitnesses.Cardinality()
}

// KnownWitnessesContains checks if a witness is known to be known by this peer.
func (p *Peer) KnownWitnessesContains(witness *stateless.Witness) bool {
p.lock.RLock()
defer p.lock.RUnlock()
return p.knownWitnesses.Contains(witness.Header().Hash())
}

func (p *Peer) KnownWitnessContainsHash(hash common.Hash) bool {
p.lock.RLock()
defer p.lock.RUnlock()
return p.knownWitnesses.hashes.Contains(hash)
}

// ReplyWitness is the response to GetWitness
func (p *Peer) ReplyWitness(requestID uint64, response *WitnessPacketResponse) error {
p.lock.Lock()
defer p.lock.Unlock()
log.Debug("After lock")

// Send the response
return p2p.Send(p.rw, MsgWitness, &WitnessPacketRLPPacket{
RequestId: requestID,
WitnessPacketResponse: *response,
Expand All @@ -251,18 +218,18 @@ func (p *Peer) ReplyWitness(requestID uint64, response *WitnessPacketResponse) e

// ReplyWitnessMetadata is the response to GetWitnessMetadata
func (p *Peer) ReplyWitnessMetadata(requestID uint64, metadata []WitnessMetadataResponse) error {
p.lock.Lock()
defer p.lock.Unlock()

// Send the response
return p2p.Send(p.rw, WitnessMetadataMsg, &WitnessMetadataPacket{
RequestId: requestID,
Metadata: metadata,
})
}

// KnownCache is a cache for known witness, identified by the hash of the parent witness block.
// KnownCache is a thread-safe cache for known witness hashes, identified by the
// hash of the parent witness block. The internal mutex guards the Pop+Add
// eviction sequence in Add(); individual reads use the underlying thread-safe
// mapset and do not need external synchronization.
type KnownCache struct {
mu sync.Mutex
hashes mapset.Set[common.Hash]
max int
}
Expand All @@ -275,8 +242,11 @@ func newKnownCache(max int) *KnownCache {
}
}

// Add adds a witness to the set.
// Add adds a witness to the set, evicting old entries if at capacity.
func (k *KnownCache) Add(hash common.Hash) {
k.mu.Lock()
defer k.mu.Unlock()

for k.hashes.Cardinality() > max(0, k.max-1) {
k.hashes.Pop()
}
Expand Down
36 changes: 36 additions & 0 deletions eth/protocols/wit/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package wit
import (
"crypto/rand"
"math/big"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -509,3 +510,38 @@ func TestReplyWitnessMetadata(t *testing.T) {
app.Close()
})
}

// TestKnownCacheConcurrency verifies that KnownCache is safe for concurrent access
// without external locking. Run with -race to detect data races.
func TestKnownCacheConcurrency(t *testing.T) {
cache := newKnownCache(100)

var wg sync.WaitGroup

for i := 0; i < 100; i++ {
wg.Add(1)

go func(n int) {
defer wg.Done()

h := common.Hash{byte(n)}
cache.Add(h)
cache.Contains(h)
cache.Cardinality()
}(i)
}

wg.Wait()
assert.LessOrEqual(t, cache.Cardinality(), 100)
}

// TestKnownCacheEviction verifies that the cache evicts entries when at capacity.
func TestKnownCacheEviction(t *testing.T) {
cache := newKnownCache(5)

for i := 0; i < 10; i++ {
cache.Add(common.Hash{byte(i)})
}

assert.LessOrEqual(t, cache.Cardinality(), 5, "Cache should not exceed max capacity")
}
Loading