Skip to content

Commit

Permalink
op-node: Fix p2p data races (ethereum-optimism#11353)
Browse files Browse the repository at this point in the history
* Fix data races around p2p records

Fixes ethereum-optimism#11328

* Remove some constructor boilerplate

* Add data race fixes for op-node/p2p tests

* Include book locking for record deletion

* Add missing read locks

* Move locks into wrappers

* Remove ping service trace parameter from public API

* I came in search of data races and I found refactors
  • Loading branch information
anacrolix authored Aug 13, 2024
1 parent 27a1bfa commit 2e94c77
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 54 deletions.
7 changes: 5 additions & 2 deletions op-node/p2p/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,13 @@ func (conf *Config) Host(log log.Logger, reporter metrics.Reporter, metrics Host
}

if conf.EnablePingService {
out.pinging = NewPingService(log,
out.pinging = NewPingService(
log,
func(ctx context.Context, peerID peer.ID) <-chan ping.Result {
return ping.Ping(ctx, h, peerID)
}, h.Network().Peers, clock.SystemClock)
},
h.Network().Peers,
)
}

out.initStaticPeers()
Expand Down
11 changes: 8 additions & 3 deletions op-node/p2p/monitor/peer_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,15 @@ func TestPeriodicallyCheckNextPeer(t *testing.T) {
// Each time a step is performed, it calls Done on the wait group so we can wait for it to be performed
stepCh := make(chan struct{}, 10)
monitor.bgTasks.Add(1)
var actionErr error
actionErr := make(chan error, 1)
go monitor.background(func() error {
stepCh <- struct{}{}
return actionErr
select {
case err := <-actionErr:
return err
default:
return nil
}
})
defer monitor.Stop()
// Wait for the step ticker to be started
Expand All @@ -47,7 +52,7 @@ func TestPeriodicallyCheckNextPeer(t *testing.T) {
}

// Should continue executing periodically even after an error
actionErr = errors.New("boom")
actionErr <- errors.New("boom")
for i := 0; i < 5; i++ {
clock.AdvanceTime(checkInterval)
waitForChan(t, stepCh, fmt.Sprintf("Did not perform step %v", i))
Expand Down
17 changes: 16 additions & 1 deletion op-node/p2p/pings.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,21 @@ type PingService struct {
wg sync.WaitGroup
}

func NewPingService(log log.Logger, ping PingFn, peers PeersFn, clock clock.Clock) *PingService {
func NewPingService(
log log.Logger,
ping PingFn,
peers PeersFn,
) *PingService {
return newTracedPingService(log, ping, peers, clock.SystemClock, nil)
}

func newTracedPingService(
log log.Logger,
ping PingFn,
peers PeersFn,
clock clock.Clock,
trace func(work string),
) *PingService {
ctx, cancel := context.WithCancel(context.Background())
srv := &PingService{
ping: ping,
Expand All @@ -50,6 +64,7 @@ func NewPingService(log log.Logger, ping PingFn, peers PeersFn, clock clock.Cloc
clock: clock,
ctx: ctx,
cancel: cancel,
trace: trace,
}
srv.wg.Add(1)
go srv.pingPeersBackground()
Expand Down
6 changes: 2 additions & 4 deletions op-node/p2p/pings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,10 @@ func TestPingService(t *testing.T) {
return peers
})

srv := NewPingService(log, pingFn, peersFn, fakeClock)

trace := make(chan string)
srv.trace = func(work string) {
srv := newTracedPingService(log, pingFn, peersFn, fakeClock, func(work string) {
trace <- work
}
})

// wait for ping service to get online
require.Equal(t, "started", <-trace)
Expand Down
16 changes: 9 additions & 7 deletions op-node/p2p/store/ip_ban_book.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"net"
"sync"
"time"

"github.com/ethereum-optimism/optimism/op-service/clock"
Expand Down Expand Up @@ -46,19 +47,16 @@ func (p ipBanUpdate) Apply(rec *ipBanRecord) {
}

type ipBanBook struct {
mu sync.RWMutex
book *recordsBook[string, *ipBanRecord]
}

func newIPBanRecord() *ipBanRecord {
return new(ipBanRecord)
}

func ipKey(ip string) ds.Key {
return ds.NewKey(ip)
}

func newIPBanBook(ctx context.Context, logger log.Logger, clock clock.Clock, store ds.Batching) (*ipBanBook, error) {
book, err := newRecordsBook[string, *ipBanRecord](ctx, logger, clock, store, ipBanCacheSize, ipBanRecordExpiration, ipBanExpirationsBase, newIPBanRecord, ipKey)
book, err := newRecordsBook[string, *ipBanRecord](ctx, logger, clock, store, ipBanCacheSize, ipBanRecordExpiration, ipBanExpirationsBase, genNew, ipKey)
if err != nil {
return nil, err
}
Expand All @@ -70,8 +68,10 @@ func (d *ipBanBook) startGC() {
}

func (d *ipBanBook) GetIPBanExpiration(ip net.IP) (time.Time, error) {
d.mu.RLock()
defer d.mu.RUnlock()
rec, err := d.book.getRecord(ip.To16().String())
if err == ErrUnknownRecord {
if err == errUnknownRecord {
return time.Time{}, ErrUnknownBan
}
if err != nil {
Expand All @@ -81,10 +81,12 @@ func (d *ipBanBook) GetIPBanExpiration(ip net.IP) (time.Time, error) {
}

func (d *ipBanBook) SetIPBanExpiration(ip net.IP, expirationTime time.Time) error {
d.mu.Lock()
defer d.mu.Unlock()
if expirationTime == (time.Time{}) {
return d.book.deleteRecord(ip.To16().String())
}
_, err := d.book.SetRecord(ip.To16().String(), ipBanUpdate(expirationTime))
_, err := d.book.setRecord(ip.To16().String(), ipBanUpdate(expirationTime))
return err
}

Expand Down
12 changes: 9 additions & 3 deletions op-node/p2p/store/mdbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package store
import (
"context"
"encoding/json"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -47,6 +48,7 @@ func (m *metadataRecord) UnmarshalBinary(data []byte) error {
}

type metadataBook struct {
mu sync.RWMutex
book *recordsBook[peer.ID, *metadataRecord]
}

Expand All @@ -55,7 +57,7 @@ func newMetadataRecord() *metadataRecord {
}

func newMetadataBook(ctx context.Context, logger log.Logger, clock clock.Clock, store ds.Batching) (*metadataBook, error) {
book, err := newRecordsBook[peer.ID, *metadataRecord](ctx, logger, clock, store, mdCacheSize, mdRecordExpiration, metadataBase, newMetadataRecord, peerIDKey)
book, err := newRecordsBook[peer.ID, *metadataRecord](ctx, logger, clock, store, mdCacheSize, mdRecordExpiration, metadataBase, genNew, peerIDKey)
if err != nil {
return nil, err
}
Expand All @@ -67,9 +69,11 @@ func (m *metadataBook) startGC() {
}

func (m *metadataBook) GetPeerMetadata(id peer.ID) (PeerMetadata, error) {
m.mu.RLock()
defer m.mu.RUnlock()
record, err := m.book.getRecord(id)
// If the record is not found, return an empty PeerMetadata
if err == ErrUnknownRecord {
if err == errUnknownRecord {
return PeerMetadata{}, nil
}
if err != nil {
Expand All @@ -89,7 +93,9 @@ func (m *metadataBook) SetPeerMetadata(id peer.ID, md PeerMetadata) (PeerMetadat
rec := newMetadataRecord()
rec.PeerMetadata = md
rec.SetLastUpdated(m.book.clock.Now())
v, err := m.book.SetRecord(id, rec)
m.mu.Lock()
defer m.mu.Unlock()
v, err := m.book.setRecord(id, rec)
return v.PeerMetadata, err
}

Expand Down
16 changes: 9 additions & 7 deletions op-node/p2p/store/peer_ban_book.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package store
import (
"context"
"encoding/json"
"sync"
"time"

"github.com/ethereum-optimism/optimism/op-service/clock"
Expand Down Expand Up @@ -46,15 +47,12 @@ func (p peerBanUpdate) Apply(rec *peerBanRecord) {
}

type peerBanBook struct {
mu sync.RWMutex
book *recordsBook[peer.ID, *peerBanRecord]
}

func newPeerBanRecord() *peerBanRecord {
return new(peerBanRecord)
}

func newPeerBanBook(ctx context.Context, logger log.Logger, clock clock.Clock, store ds.Batching) (*peerBanBook, error) {
book, err := newRecordsBook[peer.ID, *peerBanRecord](ctx, logger, clock, store, peerBanCacheSize, peerBanRecordExpiration, peerBanExpirationsBase, newPeerBanRecord, peerIDKey)
book, err := newRecordsBook[peer.ID, *peerBanRecord](ctx, logger, clock, store, peerBanCacheSize, peerBanRecordExpiration, peerBanExpirationsBase, genNew, peerIDKey)
if err != nil {
return nil, err
}
Expand All @@ -66,8 +64,10 @@ func (d *peerBanBook) startGC() {
}

func (d *peerBanBook) GetPeerBanExpiration(id peer.ID) (time.Time, error) {
d.mu.RLock()
defer d.mu.RUnlock()
rec, err := d.book.getRecord(id)
if err == ErrUnknownRecord {
if err == errUnknownRecord {
return time.Time{}, ErrUnknownBan
}
if err != nil {
Expand All @@ -77,10 +77,12 @@ func (d *peerBanBook) GetPeerBanExpiration(id peer.ID) (time.Time, error) {
}

func (d *peerBanBook) SetPeerBanExpiration(id peer.ID, expirationTime time.Time) error {
d.mu.Lock()
defer d.mu.Unlock()
if expirationTime == (time.Time{}) {
return d.book.deleteRecord(id)
}
_, err := d.book.SetRecord(id, peerBanUpdate(expirationTime))
_, err := d.book.setRecord(id, peerBanUpdate(expirationTime))
return err
}

Expand Down
38 changes: 20 additions & 18 deletions op-node/p2p/store/records_book.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,15 @@ type recordDiff[V record] interface {
Apply(v V)
}

var ErrUnknownRecord = errors.New("unknown record")
var errUnknownRecord = errors.New("unknown record")

func genNew[T any]() *T {
return new(T)
}

// recordsBook is a generic K-V store to embed in the extended-peerstore.
// It prunes old entries to keep the store small.
// The recordsBook can be wrapped to customize typing more.
// The recordsBook can be wrapped to customize typing and introduce synchronization.
type recordsBook[K ~string, V record] struct {
ctx context.Context
cancelFn context.CancelFunc
Expand All @@ -47,7 +51,6 @@ type recordsBook[K ~string, V record] struct {
dsBaseKey ds.Key
dsEntryKey func(K) ds.Key
recordExpiry time.Duration // pruning is disabled if this is 0
sync.RWMutex
}

func newRecordsBook[K ~string, V record](ctx context.Context, logger log.Logger, clock clock.Clock, store ds.Batching, cacheSize int, recordExpiry time.Duration,
Expand Down Expand Up @@ -80,36 +83,34 @@ func (d *recordsBook[K, V]) startGC() {
startGc(d.ctx, d.log, d.clock, &d.bgTasks, d.prune)
}

func (d *recordsBook[K, V]) GetRecord(key K) (V, error) {
d.RLock()
defer d.RUnlock()
rec, err := d.getRecord(key)
return rec, err
}

func (d *recordsBook[K, V]) dsKey(key K) ds.Key {
return d.dsBaseKey.Child(d.dsEntryKey(key))
}

func (d *recordsBook[K, V]) deleteRecord(key K) error {
d.cache.Remove(key)
// If access to this isn't synchronized, removing from the cache first can result in the stored
// item being cached again before it is deleted.
err := d.store.Delete(d.ctx, d.dsKey(key))
d.cache.Remove(key)
if err == nil || errors.Is(err, ds.ErrNotFound) {
return nil
}
return fmt.Errorf("failed to delete entry with key %v: %w", key, err)
}

// You must read lock the recordsBook before calling this, and only unlock when you have extracted
// the values you want from the value of type V. There's no way to conveniently pass an extractor
// function parameterized on V here without breaking this out into a top-level function.
func (d *recordsBook[K, V]) getRecord(key K) (v V, err error) {
if val, ok := d.cache.Get(key); ok {
if d.hasExpired(val) {
return v, ErrUnknownRecord
return v, errUnknownRecord
}
return val, nil
}
data, err := d.store.Get(d.ctx, d.dsKey(key))
if errors.Is(err, ds.ErrNotFound) {
return v, ErrUnknownRecord
return v, errUnknownRecord
} else if err != nil {
return v, fmt.Errorf("failed to load value of key %v: %w", key, err)
}
Expand All @@ -118,17 +119,18 @@ func (d *recordsBook[K, V]) getRecord(key K) (v V, err error) {
return v, fmt.Errorf("invalid value for key %v: %w", key, err)
}
if d.hasExpired(v) {
return v, ErrUnknownRecord
return v, errUnknownRecord
}
// This is safe with a read lock as it's self-synchronized.
d.cache.Add(key, v)
return v, nil
}

func (d *recordsBook[K, V]) SetRecord(key K, diff recordDiff[V]) (V, error) {
d.Lock()
defer d.Unlock()
// You should lock the records book before calling this, and unlock it when you copy any values out
// of the returned value.
func (d *recordsBook[K, V]) setRecord(key K, diff recordDiff[V]) (V, error) {
rec, err := d.getRecord(key)
if err == ErrUnknownRecord { // instantiate new record if it does not exist yet
if err == errUnknownRecord { // instantiate new record if it does not exist yet
rec = d.newRecord()
} else if err != nil {
return d.newRecord(), err
Expand Down
Loading

0 comments on commit 2e94c77

Please sign in to comment.