diff --git a/eth/handler.go b/eth/handler.go index d59ca2fcf8..65dbe7eb3a 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -73,6 +73,9 @@ var ( // sealToBroadcastTimer measures latency from seal+write completion to broadcast start. // This captures event delivery delay through the TypeMux subscription channel. sealToBroadcastTimer = metrics.NewRegisteredTimer("eth/seal2broadcast", nil) + // broadcastLoopTimer measures the time spent in each iteration of minedBroadcastLoop, + // covering both block propagation and witness hash announcement to all peers. + broadcastLoopTimer = metrics.NewRegisteredTimer("eth/broadcast_loop_duration", nil) ) // txPool defines the methods needed from a transaction pool implementation to @@ -826,8 +829,10 @@ func (h *handler) minedBroadcastLoop() { delay := common.PrettyDuration(time.Millisecond * time.Duration(delayInMs)) log.Info("[block tracker] Broadcasting mined block", "number", ev.Block.NumberU64(), "hash", ev.Block.Hash(), "blockTime", ev.Block.Time(), "now", now.Unix(), "delay", delay, "delayInMs", delayInMs, "sealToBroadcast", common.PrettyDuration(sealToBcast)) } + loopStart := time.Now() h.BroadcastBlock(ev.Block, ev.Witness, true) // First propagate block to peers h.BroadcastBlock(ev.Block, ev.Witness, false) // Only then announce to the rest + broadcastLoopTimer.Update(time.Since(loopStart)) } } } diff --git a/eth/protocols/wit/peer.go b/eth/protocols/wit/peer.go index a5a7ce6f3d..6008ad1dfd 100644 --- a/eth/protocols/wit/peer.go +++ b/eth/protocols/wit/peer.go @@ -44,9 +44,6 @@ type Peer struct { resDispatch chan *response // Dispatch channel to fulfill witness requests term chan struct{} // Termination channel to stop the broadcaster - // TODO(@pratikspatil024) - review all the instances of the lock and unlock - // and see if we can use a more efficient locking strategy - lock sync.RWMutex // Mutex protecting the internal fields } // NewPeer creates a new WIT peer and starts its background processes. @@ -77,9 +74,6 @@ func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, logger log.Logger) // sendWitness sends witness to the peer func (p *Peer) sendNewWitness(witness *stateless.Witness) error { - p.lock.Lock() - defer p.lock.Unlock() - p.knownWitnesses.Add(witness.Header().Hash()) return p2p.Send(p.rw, NewWitnessMsg, &NewWitnessPacket{ @@ -96,9 +90,6 @@ func (p *Peer) sendNewWitnessHashes(packet *NewWitnessHashesPacket) error { // witness will be sent in the background to avoid blocking the caller. If the // queue is full, the witness will be dropped. func (p *Peer) AsyncSendNewWitness(witness *stateless.Witness) { - p.lock.RLock() - defer p.lock.RUnlock() - log.Debug("AsyncSendNewWitness", "hash", witness.Header().Hash(), "peer", p.id) // Queue the witness for broadcast @@ -113,9 +104,6 @@ func (p *Peer) AsyncSendNewWitness(witness *stateless.Witness) { // AsyncSendNewWitnessHash queues witness hash for broadcast to the peer. func (p *Peer) AsyncSendNewWitnessHash(hash common.Hash, number uint64) { - p.lock.RLock() - defer p.lock.RUnlock() - // Queue the witness hashes for broadcast select { case p.queuedWitnessAnns <- &NewWitnessHashesPacket{ @@ -130,9 +118,6 @@ func (p *Peer) AsyncSendNewWitnessHash(hash common.Hash, number uint64) { // RequestWitness sends a request to the peer for witnesses by witness pages. func (p *Peer) RequestWitness(witnessPages []WitnessPageRequest, sink chan *Response) (*Request, error) { - p.lock.Lock() - defer p.lock.Unlock() - log.Debug("Requesting witnesses", "peer", p.id, "count", len(witnessPages)) id := rand.Uint64() @@ -156,9 +141,6 @@ func (p *Peer) RequestWitness(witnessPages []WitnessPageRequest, sink chan *Resp // RequestWitnessMetadata sends a request to the peer for witness metadata (page count only). func (p *Peer) RequestWitnessMetadata(hashes []common.Hash, sink chan *Response) (*Request, error) { - p.lock.Lock() - defer p.lock.Unlock() - log.Debug("Requesting witness metadata", "peer", p.id, "count", len(hashes)) id := rand.Uint64() @@ -204,45 +186,30 @@ func (p *Peer) Log() log.Logger { // KnownWitnesses retrieves the set of witness hashes known to be known by this peer. func (p *Peer) KnownWitnesses() *KnownCache { - p.lock.RLock() - defer p.lock.RUnlock() return p.knownWitnesses } // AddKnownWitnesses adds a witness hash to the set of known witness hashes. func (p *Peer) AddKnownWitness(hash common.Hash) { - p.lock.Lock() - defer p.lock.Unlock() p.knownWitnesses.Add(hash) } // KnownWitnessesCount returns the number of known witness. func (p *Peer) KnownWitnessesCount() int { - p.lock.RLock() - defer p.lock.RUnlock() return p.knownWitnesses.Cardinality() } // KnownWitnessesContains checks if a witness is known to be known by this peer. func (p *Peer) KnownWitnessesContains(witness *stateless.Witness) bool { - p.lock.RLock() - defer p.lock.RUnlock() return p.knownWitnesses.Contains(witness.Header().Hash()) } func (p *Peer) KnownWitnessContainsHash(hash common.Hash) bool { - p.lock.RLock() - defer p.lock.RUnlock() return p.knownWitnesses.hashes.Contains(hash) } // ReplyWitness is the response to GetWitness func (p *Peer) ReplyWitness(requestID uint64, response *WitnessPacketResponse) error { - p.lock.Lock() - defer p.lock.Unlock() - log.Debug("After lock") - - // Send the response return p2p.Send(p.rw, MsgWitness, &WitnessPacketRLPPacket{ RequestId: requestID, WitnessPacketResponse: *response, @@ -251,18 +218,18 @@ func (p *Peer) ReplyWitness(requestID uint64, response *WitnessPacketResponse) e // ReplyWitnessMetadata is the response to GetWitnessMetadata func (p *Peer) ReplyWitnessMetadata(requestID uint64, metadata []WitnessMetadataResponse) error { - p.lock.Lock() - defer p.lock.Unlock() - - // Send the response return p2p.Send(p.rw, WitnessMetadataMsg, &WitnessMetadataPacket{ RequestId: requestID, Metadata: metadata, }) } -// KnownCache is a cache for known witness, identified by the hash of the parent witness block. +// KnownCache is a thread-safe cache for known witness hashes, identified by the +// hash of the parent witness block. The internal mutex guards the Pop+Add +// eviction sequence in Add(); individual reads use the underlying thread-safe +// mapset and do not need external synchronization. type KnownCache struct { + mu sync.Mutex hashes mapset.Set[common.Hash] max int } @@ -275,8 +242,11 @@ func newKnownCache(max int) *KnownCache { } } -// Add adds a witness to the set. +// Add adds a witness to the set, evicting old entries if at capacity. func (k *KnownCache) Add(hash common.Hash) { + k.mu.Lock() + defer k.mu.Unlock() + for k.hashes.Cardinality() > max(0, k.max-1) { k.hashes.Pop() } diff --git a/eth/protocols/wit/peer_test.go b/eth/protocols/wit/peer_test.go index 1a0d9e2363..cd6f62859b 100644 --- a/eth/protocols/wit/peer_test.go +++ b/eth/protocols/wit/peer_test.go @@ -3,6 +3,7 @@ package wit import ( "crypto/rand" "math/big" + "sync" "testing" "time" @@ -509,3 +510,38 @@ func TestReplyWitnessMetadata(t *testing.T) { app.Close() }) } + +// TestKnownCacheConcurrency verifies that KnownCache is safe for concurrent access +// without external locking. Run with -race to detect data races. +func TestKnownCacheConcurrency(t *testing.T) { + cache := newKnownCache(100) + + var wg sync.WaitGroup + + for i := 0; i < 100; i++ { + wg.Add(1) + + go func(n int) { + defer wg.Done() + + h := common.Hash{byte(n)} + cache.Add(h) + cache.Contains(h) + cache.Cardinality() + }(i) + } + + wg.Wait() + assert.LessOrEqual(t, cache.Cardinality(), 100) +} + +// TestKnownCacheEviction verifies that the cache evicts entries when at capacity. +func TestKnownCacheEviction(t *testing.T) { + cache := newKnownCache(5) + + for i := 0; i < 10; i++ { + cache.Add(common.Hash{byte(i)}) + } + + assert.LessOrEqual(t, cache.Cardinality(), 5, "Cache should not exceed max capacity") +}