From d48cb17bfd1e8e90c7661fb468adb7ed96c5bf9b Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 26 May 2023 12:22:12 -0400 Subject: [PATCH 01/22] separate digestCacheData --- data/txDupCache.go | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/data/txDupCache.go b/data/txDupCache.go index 96a426dc7b..5424f8167b 100644 --- a/data/txDupCache.go +++ b/data/txDupCache.go @@ -30,9 +30,8 @@ import ( "golang.org/x/crypto/blake2b" ) -// digestCache is a rotating cache of size N accepting crypto.Digest as a key -// and keeping up to 2*N elements in memory -type digestCache struct { +// digestCacheData is a base data structure for rotating size N accepting crypto.Digest as a key +type digestCacheData struct { cur map[crypto.Digest]struct{} prev map[crypto.Digest]struct{} @@ -40,10 +39,18 @@ type digestCache struct { mu deadlock.RWMutex } +// digestCache is a rotating cache of size N accepting crypto.Digest as a key +// and keeping up to 2*N elements in memory +type digestCache struct { + digestCacheData +} + func makeDigestCache(size int) *digestCache { c := &digestCache{ - cur: map[crypto.Digest]struct{}{}, - maxSize: size, + digestCacheData: digestCacheData{ + cur: map[crypto.Digest]struct{}{}, + maxSize: size, + }, } return c } @@ -104,7 +111,7 @@ func (c *digestCache) Delete(d *crypto.Digest) { // txSaltedCache is a digest cache with a rotating salt // uses blake2b hash function type txSaltedCache struct { - digestCache + digestCacheData curSalt [4]byte prevSalt [4]byte @@ -114,7 +121,10 @@ type txSaltedCache struct { func makeSaltedCache(size int) *txSaltedCache { return &txSaltedCache{ - digestCache: *makeDigestCache(size), + digestCacheData: digestCacheData{ + cur: map[crypto.Digest]struct{}{}, + maxSize: size, + }, } } @@ -256,7 +266,10 @@ func (c *txSaltedCache) CheckAndPut(msg []byte) (*crypto.Digest, bool) { // DeleteByKey from the cache by using a key used for insertion func (c *txSaltedCache) DeleteByKey(d *crypto.Digest) { - c.digestCache.Delete(d) + c.mu.Lock() + defer c.mu.Unlock() + delete(c.cur, *d) + delete(c.prev, *d) } var saltedPool = sync.Pool{ From 973dcb22cd3f0363691b5b1c4326a9c3655c36a9 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 26 May 2023 14:01:20 -0400 Subject: [PATCH 02/22] store senders in a cache --- data/txDupCache.go | 92 ++++++++++++++++++-------- data/txDupCache_test.go | 142 +++++++++++++++++++++++++++++++++++++--- data/txHandler.go | 2 +- 3 files changed, 199 insertions(+), 37 deletions(-) diff --git a/data/txDupCache.go b/data/txDupCache.go index 5424f8167b..edd51e10c0 100644 --- a/data/txDupCache.go +++ b/data/txDupCache.go @@ -30,8 +30,8 @@ import ( "golang.org/x/crypto/blake2b" ) -// digestCacheData is a base data structure for rotating size N accepting crypto.Digest as a key -type digestCacheData struct { +// digestCacheBase is a base data structure for rotating size N accepting crypto.Digest as a key +type digestCacheBase struct { cur map[crypto.Digest]struct{} prev map[crypto.Digest]struct{} @@ -42,12 +42,12 @@ type digestCacheData struct { // digestCache is a rotating cache of size N accepting crypto.Digest as a key // and keeping up to 2*N elements in memory type digestCache struct { - digestCacheData + digestCacheBase } func makeDigestCache(size int) *digestCache { c := &digestCache{ - digestCacheData: digestCacheData{ + digestCacheBase: digestCacheBase{ cur: map[crypto.Digest]struct{}{}, maxSize: size, }, @@ -108,6 +108,17 @@ func (c *digestCache) Delete(d *crypto.Digest) { delete(c.prev, *d) } +type cacheValue map[interface{}]struct{} + +// digestCacheData is a base data structure for rotating size N accepting crypto.Digest as a key +type digestCacheData struct { + cur map[crypto.Digest]cacheValue + prev map[crypto.Digest]cacheValue + + maxSize int + mu deadlock.RWMutex +} + // txSaltedCache is a digest cache with a rotating salt // uses blake2b hash function type txSaltedCache struct { @@ -122,7 +133,7 @@ type txSaltedCache struct { func makeSaltedCache(size int) *txSaltedCache { return &txSaltedCache{ digestCacheData: digestCacheData{ - cur: map[crypto.Digest]struct{}{}, + cur: map[crypto.Digest]cacheValue{}, maxSize: size, }, } @@ -180,17 +191,17 @@ func (c *txSaltedCache) innerSwap(scheduled bool) { if scheduled { // updating by timer, the prev size is a good estimation of a current load => preallocate - c.cur = make(map[crypto.Digest]struct{}, len(c.prev)) + c.cur = make(map[crypto.Digest]cacheValue, len(c.prev)) } else { // otherwise start empty - c.cur = map[crypto.Digest]struct{}{} + c.cur = map[crypto.Digest]cacheValue{} } c.moreSalt() } -// innerCheck returns true if exists, and the current salted hash if does not. -// locking semantic: write lock must be held -func (c *txSaltedCache) innerCheck(msg []byte) (*crypto.Digest, bool) { +// innerCheck returns true if exists, the salted hash if does not exist +// locking semantic: read lock must be held +func (c *txSaltedCache) innerCheck(msg []byte) (*crypto.Digest, cacheValue, *map[crypto.Digest]cacheValue, bool) { ptr := saltedPool.Get() defer saltedPool.Put(ptr) @@ -201,31 +212,34 @@ func (c *txSaltedCache) innerCheck(msg []byte) (*crypto.Digest, bool) { d := crypto.Digest(blake2b.Sum256(toBeHashed)) - _, found := c.cur[d] + v, found := c.cur[d] if found { - return nil, true + return &d, v, &c.cur, true } toBeHashed = append(toBeHashed[:len(msg)], c.prevSalt[:]...) toBeHashed = toBeHashed[:len(msg)+len(c.prevSalt)] pd := crypto.Digest(blake2b.Sum256(toBeHashed)) - _, found = c.prev[pd] + v, found = c.prev[pd] if found { - return nil, true + return &pd, v, &c.prev, true } - return &d, false + return &d, nil, nil, false } // CheckAndPut adds msg into a cache if not found // returns a hashing key used for insertion if the message not found. -func (c *txSaltedCache) CheckAndPut(msg []byte) (*crypto.Digest, bool) { +func (c *txSaltedCache) CheckAndPut(msg []byte, sender interface{}) (*crypto.Digest, bool) { c.mu.RLock() - d, found := c.innerCheck(msg) + d, val, page, found := c.innerCheck(msg) salt := c.curSalt c.mu.RUnlock() // fast read-only path: assuming most messages are duplicates, hash msg and check cache + senderFound := false if found { - return d, found + if _, senderFound = val[sender]; senderFound { + return d, true + } } // not found: acquire write lock to add this msg hash to cache @@ -233,19 +247,37 @@ func (c *txSaltedCache) CheckAndPut(msg []byte) (*crypto.Digest, bool) { defer c.mu.Unlock() // salt may have changed between RUnlock() and Lock(), rehash if needed if salt != c.curSalt { - d, found = c.innerCheck(msg) + d, val, page, found = c.innerCheck(msg) if found { - // already added to cache between RUnlock() and Lock(), return - return d, found + if _, senderFound = val[sender]; senderFound { + // already added to cache between RUnlock() and Lock(), return + return d, true + } } - } else { + } else if found && page == &c.prev { + // there is match with prev page, update the value with data possible added in between locks + val, found = c.prev[*d] + } else { // not found or found in cur page // Do another check to see if another copy of the transaction won the race to write it to the cache - // Only check current to save a lookup since swaps are rare and no need to re-hash - if _, found := c.cur[*d]; found { - return d, found + // Only check current to save a lookup since swap is handled in the first branch + val, found = c.cur[*d] + if found { + if _, senderFound = val[sender]; senderFound { + return d, true + } + page = &c.cur } } + // at this point we know that either: + // 1. the message is not in the cache + // 2. the message is in the cache but from other senders + if found && !senderFound { + val[sender] = struct{}{} + (*page)[*d] = val + return d, true + } + if len(c.cur) >= c.maxSize { c.innerSwap(false) ptr := saltedPool.Get() @@ -260,7 +292,7 @@ func (c *txSaltedCache) CheckAndPut(msg []byte) (*crypto.Digest, bool) { d = &dn } - c.cur[*d] = struct{}{} + c.cur[*d] = cacheValue{sender: {}} return d, false } @@ -272,6 +304,14 @@ func (c *txSaltedCache) DeleteByKey(d *crypto.Digest) { delete(c.prev, *d) } +// Len returns size of a cache +func (c *txSaltedCache) Len() int { + c.mu.Lock() + defer c.mu.Unlock() + + return len(c.cur) + len(c.prev) +} + var saltedPool = sync.Pool{ New: func() interface{} { // 2 x MaxAvailableAppProgramLen that covers diff --git a/data/txDupCache_test.go b/data/txDupCache_test.go index 13a36b19d1..16e8e121fb 100644 --- a/data/txDupCache_test.go +++ b/data/txDupCache_test.go @@ -109,7 +109,7 @@ func TestTxHandlerDigestCache(t *testing.T) { } func (c *txSaltedCache) check(msg []byte) bool { - _, found := c.innerCheck(msg) + _, _, _, found := c.innerCheck(msg) return found } @@ -129,7 +129,7 @@ func TestTxHandlerSaltedCacheBasic(t *testing.T) { var exist bool for i := 0; i < size; i++ { crypto.RandBytes([]byte(ds[i][:])) - ks[i], exist = cache.CheckAndPut(ds[i][:]) + ks[i], exist = cache.CheckAndPut(ds[i][:], struct{}{}) require.False(t, exist) require.NotEmpty(t, ks[i]) @@ -141,9 +141,9 @@ func TestTxHandlerSaltedCacheBasic(t *testing.T) { // try to re-add, ensure not added for i := 0; i < size; i++ { - k, exist := cache.CheckAndPut(ds[i][:]) + k, exist := cache.CheckAndPut(ds[i][:], struct{}{}) require.True(t, exist) - require.Empty(t, k) + require.NotEmpty(t, k) } require.Equal(t, size, cache.Len()) @@ -153,7 +153,7 @@ func TestTxHandlerSaltedCacheBasic(t *testing.T) { var ks2 [size]*crypto.Digest for i := 0; i < size; i++ { crypto.RandBytes(ds2[i][:]) - ks2[i], exist = cache.CheckAndPut(ds2[i][:]) + ks2[i], exist = cache.CheckAndPut(ds2[i][:], struct{}{}) require.False(t, exist) require.NotEmpty(t, ks2[i]) @@ -165,7 +165,7 @@ func TestTxHandlerSaltedCacheBasic(t *testing.T) { var d [8]byte crypto.RandBytes(d[:]) - k, exist := cache.CheckAndPut(d[:]) + k, exist := cache.CheckAndPut(d[:], struct{}{}) require.False(t, exist) require.NotEmpty(t, k) exist = cache.check(d[:]) @@ -211,7 +211,7 @@ func TestTxHandlerSaltedCacheScheduled(t *testing.T) { var ds [size][8]byte for i := 0; i < size; i++ { crypto.RandBytes([]byte(ds[i][:])) - k, exist := cache.CheckAndPut(ds[i][:]) + k, exist := cache.CheckAndPut(ds[i][:], struct{}{}) require.False(t, exist) require.NotEmpty(t, k) @@ -236,7 +236,7 @@ func TestTxHandlerSaltedCacheManual(t *testing.T) { var ds [size][8]byte for i := 0; i < size; i++ { crypto.RandBytes([]byte(ds[i][:])) - k, exist := cache.CheckAndPut(ds[i][:]) + k, exist := cache.CheckAndPut(ds[i][:], struct{}{}) require.False(t, exist) require.NotEmpty(t, k) exist = cache.check(ds[i][:]) @@ -251,7 +251,7 @@ func TestTxHandlerSaltedCacheManual(t *testing.T) { var ds2 [size][8]byte for i := 0; i < size; i++ { crypto.RandBytes([]byte(ds2[i][:])) - k, exist := cache.CheckAndPut(ds2[i][:]) + k, exist := cache.CheckAndPut(ds2[i][:], struct{}{}) require.False(t, exist) require.NotEmpty(t, k) exist = cache.check(ds2[i][:]) @@ -315,7 +315,7 @@ func (p *digestCachePusher) push() { func (p *saltedCachePusher) push() { var d [crypto.DigestSize]byte crypto.RandBytes(d[:]) - p.c.CheckAndPut(d[:]) // saltedCache hashes inside + p.c.CheckAndPut(d[:], struct{}{}) // saltedCache hashes inside } func BenchmarkDigestCaches(b *testing.B) { @@ -371,3 +371,125 @@ func benchmarkDigestCache(b *testing.B, m cacheMaker, numThreads int) { } wg.Wait() } + +// TestTxHandlerSaltedCacheValues checks values are stored correctly +func TestTxHandlerSaltedCacheValues(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + const size = 2 + cache := makeSaltedCache(size) + cache.Start(context.Background(), 0) + require.Zero(t, cache.Len()) + + type snd struct { + id int + } + + d, v, p, found := cache.innerCheck([]byte{1}) + require.False(t, found) + require.Nil(t, p) + require.Nil(t, v) + require.NotNil(t, d) + require.NotEmpty(t, d) + + // add a value, ensure it can be found + d1, found := cache.CheckAndPut([]byte{1}, snd{id: 1}) + require.False(t, found) + require.NotNil(t, d1) + require.NotEmpty(t, d1) + d, v, p, found = cache.innerCheck([]byte{1}) + require.True(t, found) + require.NotNil(t, p) + require.NotNil(t, v) + require.NotNil(t, d) + require.Equal(t, *d, *d1) + require.Equal(t, p, &cache.cur) + require.Equal(t, *p, cache.cur) + require.Len(t, *p, 1) + require.Len(t, v, 1) + require.Contains(t, v, snd{id: 1}) + d, found = cache.CheckAndPut([]byte{1}, snd{id: 1}) + require.True(t, found) + require.NotNil(t, d) + require.NotEmpty(t, d) + require.Equal(t, *d, *d1) + require.Len(t, cache.cur, 1) + require.Len(t, cache.cur[*d], 1) + require.Nil(t, cache.prev) + + // add a value with different sender + dt, found := cache.CheckAndPut([]byte{1}, snd{id: 2}) + require.True(t, found) + require.NotNil(t, dt) + require.NotEmpty(t, dt) + require.Nil(t, cache.prev) + d, v, p, found = cache.innerCheck([]byte{1}) + require.True(t, found) + require.NotNil(t, p) + require.NotNil(t, v) + require.NotNil(t, d) + require.Equal(t, *d, *dt) + require.Equal(t, *d, *d1) + require.Equal(t, p, &cache.cur) + require.Len(t, *p, 1) + require.Len(t, v, 2) + require.Contains(t, v, snd{id: 1}) + require.Contains(t, v, snd{id: 2}) + + // add one more value to full cache.cur + d2, found := cache.CheckAndPut([]byte{2}, snd{id: 1}) + require.False(t, found) + require.NotNil(t, d2) + require.NotEmpty(t, d2) + require.Len(t, cache.cur, 2) + require.Len(t, cache.cur[*d1], 2) + require.Len(t, cache.cur[*d2], 1) + require.Nil(t, cache.prev) + + // adding new value would trigger cache swap + // first ensure new sender for seen message does not trigger a swap + dt, found = cache.CheckAndPut([]byte{2}, snd{id: 2}) + require.True(t, found) + require.NotNil(t, dt) + require.NotEmpty(t, dt) + require.Equal(t, *d2, *dt) + require.Len(t, cache.cur, 2) + require.Len(t, cache.cur[*d1], 2) + require.Len(t, cache.cur[*d2], 2) + require.Nil(t, cache.prev) + + // add a new value triggers a swap + d3, found := cache.CheckAndPut([]byte{3}, snd{id: 1}) + require.False(t, found) + require.NotNil(t, d2) + require.NotEmpty(t, d2) + require.Len(t, cache.cur, 1) + require.Len(t, cache.cur[*d3], 1) + require.Len(t, cache.prev, 2) + require.Len(t, cache.prev[*d1], 2) + require.Len(t, cache.prev[*d2], 2) + + // add a sender into old (prev) value + dt, found = cache.CheckAndPut([]byte{2}, snd{id: 3}) + require.True(t, found) + require.NotNil(t, dt) + require.NotEmpty(t, dt) + require.Equal(t, *d2, *dt) + require.Len(t, cache.cur, 1) + require.Len(t, cache.cur[*d3], 1) + require.Len(t, cache.prev, 2) + require.Len(t, cache.prev[*d1], 2) + require.Len(t, cache.prev[*d2], 3) + d, v, p, found = cache.innerCheck([]byte{2}) + require.True(t, found) + require.NotNil(t, p) + require.NotNil(t, v) + require.NotNil(t, d) + require.Equal(t, *d, *dt) + require.Equal(t, *d, *d2) + require.Equal(t, p, &cache.prev) + require.Len(t, *p, 2) + require.Len(t, v, 3) + require.Contains(t, v, snd{id: 3}) +} diff --git a/data/txHandler.go b/data/txHandler.go index 3ad2716702..8de402356f 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -567,7 +567,7 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net if handler.msgCache != nil { // check for duplicate messages // this helps against relaying duplicates - if msgKey, isDup = handler.msgCache.CheckAndPut(rawmsg.Data); isDup { + if msgKey, isDup = handler.msgCache.CheckAndPut(rawmsg.Data, rawmsg.Sender); isDup { transactionMessagesDupRawMsg.Inc(nil) return network.OutgoingMessage{Action: network.Ignore} } From f07f0b92f7d2a1c81a8d29b3096a7eef00107be5 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 26 May 2023 17:01:15 -0400 Subject: [PATCH 03/22] make RelayArray accept map of peers --- data/txDupCache.go | 36 +++++++++++++++++++----------------- data/txDupCache_test.go | 35 +++++++++++++++++++++-------------- data/txHandler.go | 23 +++++++++++++++-------- network/wsNetwork.go | 16 ++++++++-------- 4 files changed, 63 insertions(+), 47 deletions(-) diff --git a/data/txDupCache.go b/data/txDupCache.go index edd51e10c0..b020aa9c10 100644 --- a/data/txDupCache.go +++ b/data/txDupCache.go @@ -25,6 +25,7 @@ import ( "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/crypto" + "github.com/algorand/go-algorand/network" "github.com/algorand/go-deadlock" "golang.org/x/crypto/blake2b" @@ -108,7 +109,7 @@ func (c *digestCache) Delete(d *crypto.Digest) { delete(c.prev, *d) } -type cacheValue map[interface{}]struct{} +type cacheValue map[network.Peer]struct{} // digestCacheData is a base data structure for rotating size N accepting crypto.Digest as a key type digestCacheData struct { @@ -229,16 +230,16 @@ func (c *txSaltedCache) innerCheck(msg []byte) (*crypto.Digest, cacheValue, *map // CheckAndPut adds msg into a cache if not found // returns a hashing key used for insertion if the message not found. -func (c *txSaltedCache) CheckAndPut(msg []byte, sender interface{}) (*crypto.Digest, bool) { +func (c *txSaltedCache) CheckAndPut(msg []byte, sender network.Peer) (*crypto.Digest, cacheValue, bool) { c.mu.RLock() - d, val, page, found := c.innerCheck(msg) + d, vals, page, found := c.innerCheck(msg) salt := c.curSalt c.mu.RUnlock() // fast read-only path: assuming most messages are duplicates, hash msg and check cache senderFound := false if found { - if _, senderFound = val[sender]; senderFound { - return d, true + if _, senderFound = vals[sender]; senderFound { + return d, vals, true } } @@ -247,23 +248,23 @@ func (c *txSaltedCache) CheckAndPut(msg []byte, sender interface{}) (*crypto.Dig defer c.mu.Unlock() // salt may have changed between RUnlock() and Lock(), rehash if needed if salt != c.curSalt { - d, val, page, found = c.innerCheck(msg) + d, vals, page, found = c.innerCheck(msg) if found { - if _, senderFound = val[sender]; senderFound { + if _, senderFound = vals[sender]; senderFound { // already added to cache between RUnlock() and Lock(), return - return d, true + return d, vals, true } } } else if found && page == &c.prev { // there is match with prev page, update the value with data possible added in between locks - val, found = c.prev[*d] + vals, found = c.prev[*d] } else { // not found or found in cur page // Do another check to see if another copy of the transaction won the race to write it to the cache // Only check current to save a lookup since swap is handled in the first branch - val, found = c.cur[*d] + vals, found = c.cur[*d] if found { - if _, senderFound = val[sender]; senderFound { - return d, true + if _, senderFound = vals[sender]; senderFound { + return d, vals, true } page = &c.cur } @@ -273,9 +274,9 @@ func (c *txSaltedCache) CheckAndPut(msg []byte, sender interface{}) (*crypto.Dig // 1. the message is not in the cache // 2. the message is in the cache but from other senders if found && !senderFound { - val[sender] = struct{}{} - (*page)[*d] = val - return d, true + vals[sender] = struct{}{} + (*page)[*d] = vals + return d, vals, true } if len(c.cur) >= c.maxSize { @@ -292,8 +293,9 @@ func (c *txSaltedCache) CheckAndPut(msg []byte, sender interface{}) (*crypto.Dig d = &dn } - c.cur[*d] = cacheValue{sender: {}} - return d, false + vals = cacheValue{sender: {}} + c.cur[*d] = vals + return d, vals, false } // DeleteByKey from the cache by using a key used for insertion diff --git a/data/txDupCache_test.go b/data/txDupCache_test.go index 16e8e121fb..ab4197e2fe 100644 --- a/data/txDupCache_test.go +++ b/data/txDupCache_test.go @@ -129,7 +129,7 @@ func TestTxHandlerSaltedCacheBasic(t *testing.T) { var exist bool for i := 0; i < size; i++ { crypto.RandBytes([]byte(ds[i][:])) - ks[i], exist = cache.CheckAndPut(ds[i][:], struct{}{}) + ks[i], _, exist = cache.CheckAndPut(ds[i][:], struct{}{}) require.False(t, exist) require.NotEmpty(t, ks[i]) @@ -141,7 +141,7 @@ func TestTxHandlerSaltedCacheBasic(t *testing.T) { // try to re-add, ensure not added for i := 0; i < size; i++ { - k, exist := cache.CheckAndPut(ds[i][:], struct{}{}) + k, _, exist := cache.CheckAndPut(ds[i][:], struct{}{}) require.True(t, exist) require.NotEmpty(t, k) } @@ -153,7 +153,7 @@ func TestTxHandlerSaltedCacheBasic(t *testing.T) { var ks2 [size]*crypto.Digest for i := 0; i < size; i++ { crypto.RandBytes(ds2[i][:]) - ks2[i], exist = cache.CheckAndPut(ds2[i][:], struct{}{}) + ks2[i], _, exist = cache.CheckAndPut(ds2[i][:], struct{}{}) require.False(t, exist) require.NotEmpty(t, ks2[i]) @@ -165,7 +165,7 @@ func TestTxHandlerSaltedCacheBasic(t *testing.T) { var d [8]byte crypto.RandBytes(d[:]) - k, exist := cache.CheckAndPut(d[:], struct{}{}) + k, _, exist := cache.CheckAndPut(d[:], struct{}{}) require.False(t, exist) require.NotEmpty(t, k) exist = cache.check(d[:]) @@ -211,7 +211,7 @@ func TestTxHandlerSaltedCacheScheduled(t *testing.T) { var ds [size][8]byte for i := 0; i < size; i++ { crypto.RandBytes([]byte(ds[i][:])) - k, exist := cache.CheckAndPut(ds[i][:], struct{}{}) + k, _, exist := cache.CheckAndPut(ds[i][:], struct{}{}) require.False(t, exist) require.NotEmpty(t, k) @@ -236,7 +236,7 @@ func TestTxHandlerSaltedCacheManual(t *testing.T) { var ds [size][8]byte for i := 0; i < size; i++ { crypto.RandBytes([]byte(ds[i][:])) - k, exist := cache.CheckAndPut(ds[i][:], struct{}{}) + k, _, exist := cache.CheckAndPut(ds[i][:], struct{}{}) require.False(t, exist) require.NotEmpty(t, k) exist = cache.check(ds[i][:]) @@ -251,7 +251,7 @@ func TestTxHandlerSaltedCacheManual(t *testing.T) { var ds2 [size][8]byte for i := 0; i < size; i++ { crypto.RandBytes([]byte(ds2[i][:])) - k, exist := cache.CheckAndPut(ds2[i][:], struct{}{}) + k, _, exist := cache.CheckAndPut(ds2[i][:], struct{}{}) require.False(t, exist) require.NotEmpty(t, k) exist = cache.check(ds2[i][:]) @@ -394,7 +394,7 @@ func TestTxHandlerSaltedCacheValues(t *testing.T) { require.NotEmpty(t, d) // add a value, ensure it can be found - d1, found := cache.CheckAndPut([]byte{1}, snd{id: 1}) + d1, v1, found := cache.CheckAndPut([]byte{1}, snd{id: 1}) require.False(t, found) require.NotNil(t, d1) require.NotEmpty(t, d1) @@ -408,18 +408,20 @@ func TestTxHandlerSaltedCacheValues(t *testing.T) { require.Equal(t, *p, cache.cur) require.Len(t, *p, 1) require.Len(t, v, 1) + require.Equal(t, v, v1) require.Contains(t, v, snd{id: 1}) - d, found = cache.CheckAndPut([]byte{1}, snd{id: 1}) + d, v, found = cache.CheckAndPut([]byte{1}, snd{id: 1}) require.True(t, found) require.NotNil(t, d) require.NotEmpty(t, d) require.Equal(t, *d, *d1) + require.Equal(t, v, v1) require.Len(t, cache.cur, 1) require.Len(t, cache.cur[*d], 1) require.Nil(t, cache.prev) // add a value with different sender - dt, found := cache.CheckAndPut([]byte{1}, snd{id: 2}) + dt, vt, found := cache.CheckAndPut([]byte{1}, snd{id: 2}) require.True(t, found) require.NotNil(t, dt) require.NotEmpty(t, dt) @@ -431,6 +433,7 @@ func TestTxHandlerSaltedCacheValues(t *testing.T) { require.NotNil(t, d) require.Equal(t, *d, *dt) require.Equal(t, *d, *d1) + require.Equal(t, v, vt) require.Equal(t, p, &cache.cur) require.Len(t, *p, 1) require.Len(t, v, 2) @@ -438,10 +441,11 @@ func TestTxHandlerSaltedCacheValues(t *testing.T) { require.Contains(t, v, snd{id: 2}) // add one more value to full cache.cur - d2, found := cache.CheckAndPut([]byte{2}, snd{id: 1}) + d2, v2, found := cache.CheckAndPut([]byte{2}, snd{id: 1}) require.False(t, found) require.NotNil(t, d2) require.NotEmpty(t, d2) + require.Len(t, v2, 1) require.Len(t, cache.cur, 2) require.Len(t, cache.cur[*d1], 2) require.Len(t, cache.cur[*d2], 1) @@ -449,21 +453,23 @@ func TestTxHandlerSaltedCacheValues(t *testing.T) { // adding new value would trigger cache swap // first ensure new sender for seen message does not trigger a swap - dt, found = cache.CheckAndPut([]byte{2}, snd{id: 2}) + dt, vt, found = cache.CheckAndPut([]byte{2}, snd{id: 2}) require.True(t, found) require.NotNil(t, dt) require.NotEmpty(t, dt) require.Equal(t, *d2, *dt) + require.Len(t, vt, 2) require.Len(t, cache.cur, 2) require.Len(t, cache.cur[*d1], 2) require.Len(t, cache.cur[*d2], 2) require.Nil(t, cache.prev) // add a new value triggers a swap - d3, found := cache.CheckAndPut([]byte{3}, snd{id: 1}) + d3, v3, found := cache.CheckAndPut([]byte{3}, snd{id: 1}) require.False(t, found) require.NotNil(t, d2) require.NotEmpty(t, d2) + require.Len(t, v3, 1) require.Len(t, cache.cur, 1) require.Len(t, cache.cur[*d3], 1) require.Len(t, cache.prev, 2) @@ -471,7 +477,7 @@ func TestTxHandlerSaltedCacheValues(t *testing.T) { require.Len(t, cache.prev[*d2], 2) // add a sender into old (prev) value - dt, found = cache.CheckAndPut([]byte{2}, snd{id: 3}) + dt, vt, found = cache.CheckAndPut([]byte{2}, snd{id: 3}) require.True(t, found) require.NotNil(t, dt) require.NotEmpty(t, dt) @@ -491,5 +497,6 @@ func TestTxHandlerSaltedCacheValues(t *testing.T) { require.Equal(t, p, &cache.prev) require.Len(t, *p, 2) require.Len(t, v, 3) + require.Equal(t, vt, v) require.Contains(t, v, snd{id: 3}) } diff --git a/data/txHandler.go b/data/txHandler.go index 8de402356f..ac8cded804 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -101,12 +101,13 @@ const ( // The txBacklogMsg structure used to track a single incoming transaction from the gossip network, type txBacklogMsg struct { - rawmsg *network.IncomingMessage // the raw message from the network - unverifiedTxGroup []transactions.SignedTxn // the unverified ( and signed ) transaction group - rawmsgDataHash *crypto.Digest // hash (if any) of raw message data from the network - unverifiedTxGroupHash *crypto.Digest // hash (if any) of the unverifiedTxGroup - verificationErr error // The verification error generated by the verification function, if any. - capguard *util.ErlCapacityGuard // the structure returned from the elastic rate limiter, to be released when dequeued + rawmsg *network.IncomingMessage // the raw message from the network + unverifiedTxGroup []transactions.SignedTxn // the unverified ( and signed ) transaction group + rawmsgDataHash *crypto.Digest // hash (if any) of raw message data from the network + unverifiedTxGroupHash *crypto.Digest // hash (if any) of the unverifiedTxGroup + verificationErr error // The verification error generated by the verification function, if any. + capguard *util.ErlCapacityGuard // the structure returned from the elastic rate limiter, to be released when dequeued + peers map[network.Peer]struct{} // peers that ever sent this message } // TxHandler handles transaction messages @@ -496,7 +497,11 @@ func (handler *TxHandler) postProcessCheckedTxn(wi *txBacklogMsg) { } // We reencode here instead of using rawmsg.Data to avoid broadcasting non-canonical encodings - handler.net.Relay(handler.ctx, protocol.TxnTag, reencode(verifiedTxGroup), false, wi.rawmsg.Sender) + handler.net.RelayArray( + handler.ctx, + []protocol.Tag{protocol.TxnTag}, + [][]byte{reencode(verifiedTxGroup)}, + false, wi.peers) } func (handler *TxHandler) deleteFromCaches(msgKey *crypto.Digest, canonicalKey *crypto.Digest) { @@ -564,10 +569,11 @@ func (handler *TxHandler) dedupCanonical(ntx int, unverifiedTxGroup []transactio func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) network.OutgoingMessage { var msgKey *crypto.Digest var isDup bool + var peers map[network.Peer]struct{} if handler.msgCache != nil { // check for duplicate messages // this helps against relaying duplicates - if msgKey, isDup = handler.msgCache.CheckAndPut(rawmsg.Data, rawmsg.Sender); isDup { + if msgKey, peers, isDup = handler.msgCache.CheckAndPut(rawmsg.Data, rawmsg.Sender); isDup { transactionMessagesDupRawMsg.Inc(nil) return network.OutgoingMessage{Action: network.Ignore} } @@ -649,6 +655,7 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net rawmsgDataHash: msgKey, unverifiedTxGroupHash: canonicalKey, capguard: capguard, + peers: peers, }: default: // if we failed here we want to increase the corresponding metric. It might suggest that we diff --git a/network/wsNetwork.go b/network/wsNetwork.go index 9196553524..2e2d925f27 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -171,9 +171,9 @@ const ( type GossipNode interface { Address() (string, bool) Broadcast(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except Peer) error - BroadcastArray(ctx context.Context, tag []protocol.Tag, data [][]byte, wait bool, except Peer) error + BroadcastArray(ctx context.Context, tag []protocol.Tag, data [][]byte, wait bool, except map[Peer]struct{}) error Relay(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except Peer) error - RelayArray(ctx context.Context, tag []protocol.Tag, data [][]byte, wait bool, except Peer) error + RelayArray(ctx context.Context, tag []protocol.Tag, data [][]byte, wait bool, except map[Peer]struct{}) error Disconnect(badnode Peer) DisconnectPeers() Ready() chan struct{} @@ -476,7 +476,7 @@ const ( type broadcastRequest struct { tags []Tag data [][]byte - except *wsPeer + except map[Peer]struct{} done chan struct{} enqueueTime time.Time ctx context.Context @@ -519,14 +519,14 @@ func (wn *WebsocketNetwork) Broadcast(ctx context.Context, tag protocol.Tag, dat dataArray[0] = data tagArray := make([]protocol.Tag, 1, 1) tagArray[0] = tag - return wn.BroadcastArray(ctx, tagArray, dataArray, wait, except) + return wn.BroadcastArray(ctx, tagArray, dataArray, wait, map[Peer]struct{}{except: {}}) } // BroadcastArray sends an array of messages. // If except is not nil then we will not send it to that neighboring Peer. // if wait is true then the call blocks until the packet has actually been sent to all neighbors. // TODO: add `priority` argument so that we don't have to guess it based on tag -func (wn *WebsocketNetwork) BroadcastArray(ctx context.Context, tags []protocol.Tag, data [][]byte, wait bool, except Peer) error { +func (wn *WebsocketNetwork) BroadcastArray(ctx context.Context, tags []protocol.Tag, data [][]byte, wait bool, except map[Peer]struct{}) error { if wn.config.DisableNetworking { return nil } @@ -537,7 +537,7 @@ func (wn *WebsocketNetwork) BroadcastArray(ctx context.Context, tags []protocol. request := broadcastRequest{tags: tags, data: data, enqueueTime: time.Now(), ctx: ctx} if except != nil { - request.except = except.(*wsPeer) + request.except = except } broadcastQueue := wn.broadcastQueueBulk @@ -587,7 +587,7 @@ func (wn *WebsocketNetwork) Relay(ctx context.Context, tag protocol.Tag, data [] } // RelayArray relays array of messages -func (wn *WebsocketNetwork) RelayArray(ctx context.Context, tags []protocol.Tag, data [][]byte, wait bool, except Peer) error { +func (wn *WebsocketNetwork) RelayArray(ctx context.Context, tags []protocol.Tag, data [][]byte, wait bool, except map[Peer]struct{}) error { if wn.relayMessages { return wn.BroadcastArray(ctx, tags, data, wait, except) } @@ -1577,7 +1577,7 @@ func (wn *WebsocketNetwork) innerBroadcast(request broadcastRequest, prio bool, if wn.config.BroadcastConnectionsLimit >= 0 && sentMessageCount >= wn.config.BroadcastConnectionsLimit { break } - if peer == request.except { + if _, ok := request.except[peer]; ok { continue } var ok bool From 69b4e25ddc9dde248ce503acc2f82a68310be3a2 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 26 May 2023 17:30:18 -0400 Subject: [PATCH 04/22] fix linter --- data/txHandler.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/data/txHandler.go b/data/txHandler.go index ac8cded804..a92ad5db91 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -497,11 +497,12 @@ func (handler *TxHandler) postProcessCheckedTxn(wi *txBacklogMsg) { } // We reencode here instead of using rawmsg.Data to avoid broadcasting non-canonical encodings - handler.net.RelayArray( - handler.ctx, - []protocol.Tag{protocol.TxnTag}, - [][]byte{reencode(verifiedTxGroup)}, - false, wi.peers) + err = handler.net.RelayArray( + handler.ctx, []protocol.Tag{protocol.TxnTag}, + [][]byte{reencode(verifiedTxGroup)}, false, wi.peers) + if err != nil { + logging.Base().Infof("unable to relay transaction: %v", err) + } } func (handler *TxHandler) deleteFromCaches(msgKey *crypto.Digest, canonicalKey *crypto.Digest) { From 5a9506cb64955a850230f5cf1a18032730e234c6 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 26 May 2023 17:56:50 -0400 Subject: [PATCH 05/22] Add RelayArray to mock to fix tests --- components/mocks/mockNetwork.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/components/mocks/mockNetwork.go b/components/mocks/mockNetwork.go index 8c8eb113f1..4e5cf8c275 100644 --- a/components/mocks/mockNetwork.go +++ b/components/mocks/mockNetwork.go @@ -35,11 +35,19 @@ func (network *MockNetwork) Broadcast(ctx context.Context, tag protocol.Tag, dat return nil } +func (network *MockNetwork) BroadcastArray(ctx context.Context, tag []protocol.Tag, data [][]byte, wait bool, except map[network.Peer]struct{}) error { + return nil +} + // Relay - unused function func (network *MockNetwork) Relay(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except network.Peer) error { return nil } +func (network *MockNetwork) RelayArray(ctx context.Context, tag []protocol.Tag, data [][]byte, wait bool, except map[network.Peer]struct{}) error { + return nil +} + // Address - unused function func (network *MockNetwork) Address() (string, bool) { return "mock network", true From 2cc0b75ebae5d5a824bb06760fc1e1debf01baa8 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 26 May 2023 19:21:11 -0400 Subject: [PATCH 06/22] fix linter --- components/mocks/mockNetwork.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/components/mocks/mockNetwork.go b/components/mocks/mockNetwork.go index 4e5cf8c275..ee6839b697 100644 --- a/components/mocks/mockNetwork.go +++ b/components/mocks/mockNetwork.go @@ -35,6 +35,7 @@ func (network *MockNetwork) Broadcast(ctx context.Context, tag protocol.Tag, dat return nil } +// BroadcastArray - unused function func (network *MockNetwork) BroadcastArray(ctx context.Context, tag []protocol.Tag, data [][]byte, wait bool, except map[network.Peer]struct{}) error { return nil } @@ -44,6 +45,7 @@ func (network *MockNetwork) Relay(ctx context.Context, tag protocol.Tag, data [] return nil } +// RelayArray - unused function func (network *MockNetwork) RelayArray(ctx context.Context, tag []protocol.Tag, data [][]byte, wait bool, except map[network.Peer]struct{}) error { return nil } From 93dddb207cb7d6f50a3634654bf2620eaafa5dd4 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 26 May 2023 19:42:23 -0400 Subject: [PATCH 07/22] fix data race --- data/txDupCache.go | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/data/txDupCache.go b/data/txDupCache.go index b020aa9c10..25283e5181 100644 --- a/data/txDupCache.go +++ b/data/txDupCache.go @@ -231,17 +231,29 @@ func (c *txSaltedCache) innerCheck(msg []byte) (*crypto.Digest, cacheValue, *map // CheckAndPut adds msg into a cache if not found // returns a hashing key used for insertion if the message not found. func (c *txSaltedCache) CheckAndPut(msg []byte, sender network.Peer) (*crypto.Digest, cacheValue, bool) { + // copy vals since its owned by the cache and may be concurrently modified + copy := func(in cacheValue) cacheValue { + out := make(cacheValue, len(in)) + for p := range in { + out[p] = struct{}{} + } + return out + } + c.mu.RLock() d, vals, page, found := c.innerCheck(msg) salt := c.curSalt - c.mu.RUnlock() // fast read-only path: assuming most messages are duplicates, hash msg and check cache + // keep lock - it is needed for copying vals in defer senderFound := false if found { if _, senderFound = vals[sender]; senderFound { + vals = copy(vals) + c.mu.RUnlock() return d, vals, true } } + c.mu.RUnlock() // not found: acquire write lock to add this msg hash to cache c.mu.Lock() @@ -252,6 +264,7 @@ func (c *txSaltedCache) CheckAndPut(msg []byte, sender network.Peer) (*crypto.Di if found { if _, senderFound = vals[sender]; senderFound { // already added to cache between RUnlock() and Lock(), return + vals = copy(vals) return d, vals, true } } @@ -264,6 +277,7 @@ func (c *txSaltedCache) CheckAndPut(msg []byte, sender network.Peer) (*crypto.Di vals, found = c.cur[*d] if found { if _, senderFound = vals[sender]; senderFound { + vals = copy(vals) return d, vals, true } page = &c.cur @@ -276,6 +290,7 @@ func (c *txSaltedCache) CheckAndPut(msg []byte, sender network.Peer) (*crypto.Di if found && !senderFound { vals[sender] = struct{}{} (*page)[*d] = vals + vals = copy(vals) return d, vals, true } @@ -295,6 +310,7 @@ func (c *txSaltedCache) CheckAndPut(msg []byte, sender network.Peer) (*crypto.Di vals = cacheValue{sender: {}} c.cur[*d] = vals + vals = copy(vals) return d, vals, false } From 19bb0bbc619cb3b982ab783d8978bba8e8ebd420 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Thu, 1 Jun 2023 11:42:10 -0400 Subject: [PATCH 08/22] WIP test --- node/node_test.go | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/node/node_test.go b/node/node_test.go index f584710902..bf23d15046 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -36,6 +36,7 @@ import ( "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/data/bookkeeping" "github.com/algorand/go-algorand/logging" + "github.com/algorand/go-algorand/network" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/test/partitiontest" "github.com/algorand/go-algorand/util" @@ -539,3 +540,43 @@ func TestOfflineOnlineClosedBitStatus(t *testing.T) { }) } } + +// TestNodeDuplicateTransactions creates a network of 4 nodes R, N1-N3, and check nodes do not receive +// duplicate transactions back +func TestNodeDuplicateTransactions(t *testing.T) { + partitiontest.PartitionTest(t) + + log := logging.TestingLog(t) + + var allocation []bookkeeping.GenesisAllocation + + genesis := bookkeeping.Genesis{ + SchemaID: "go-test-node-genesis", + Proto: protocol.ConsensusCurrentVersion, + Network: config.Devtestnet, + FeeSink: sinkAddr.String(), + RewardsPool: poolAddr.String(), + Allocation: allocation, + } + + cfg := config.GetDefaultLocal() + + neighbors := make([]string, numAccounts) + for i := range neighbors { + neighbors[i] = "127.0.0.1:" + strconv.Itoa(10000+i) + } + + wallets := make([]string, numAccounts) + nodes := make([]*AlgorandFullNode, numAccounts) + rootDirs := make([]string, 0) + + for i := range wallets { + rootDirectory := t.TempDir() + rootDirs = append(rootDirs, rootDirectory) + + defaultConfig.NetAddress = neighbors[i] + defaultConfig.SaveToDisk(rootDirectory) + } + + p2pNode, err := network.NewWebsocketNetwork(log, cfg, phonebookAddresses, genesis.ID(), genesis.Network, node) +} From 57b76540dee4b78f3b600b5202067a7748420060 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Thu, 1 Jun 2023 20:10:59 -0400 Subject: [PATCH 09/22] Add no dups relaying test --- data/txHandler_test.go | 118 +++++++++++++++++++++++++++++++++-------- node/node_test.go | 41 -------------- 2 files changed, 97 insertions(+), 62 deletions(-) diff --git a/data/txHandler_test.go b/data/txHandler_test.go index f51290a374..4d8898ddca 100644 --- a/data/txHandler_test.go +++ b/data/txHandler_test.go @@ -1007,6 +1007,28 @@ func TestTxHandlerProcessIncomingCacheBacklogDrop(t *testing.T) { require.Equal(t, initialValue+1, currentValue) } +func makeTxn(sendIdx int, recvIdx int, addresses []basics.Address, secrets []*crypto.SignatureSecrets) ([]transactions.SignedTxn, []byte) { + tx := transactions.Transaction{ + Type: protocol.PaymentTx, + Header: transactions.Header{ + Sender: addresses[sendIdx], + Fee: basics.MicroAlgos{Raw: proto.MinTxnFee * 2}, + FirstValid: 0, + LastValid: basics.Round(proto.MaxTxnLife), + Note: make([]byte, 2), + GenesisID: genesisID, + GenesisHash: genesisHash, + }, + PaymentTxnFields: transactions.PaymentTxnFields{ + Receiver: addresses[recvIdx], + Amount: basics.MicroAlgos{Raw: mockBalancesMinBalance + (rand.Uint64() % 10000)}, + }, + } + signedTx := tx.Sign(secrets[sendIdx]) + blob := protocol.Encode(&signedTx) + return []transactions.SignedTxn{signedTx}, blob +} + func TestTxHandlerProcessIncomingCacheTxPoolDrop(t *testing.T) { partitiontest.PartitionTest(t) @@ -1043,27 +1065,7 @@ loop: } } - makeTxns := func(sendIdx, recvIdx int) ([]transactions.SignedTxn, []byte) { - tx := transactions.Transaction{ - Type: protocol.PaymentTx, - Header: transactions.Header{ - Sender: addresses[sendIdx], - Fee: basics.MicroAlgos{Raw: proto.MinTxnFee * 2}, - FirstValid: 0, - LastValid: basics.Round(proto.MaxTxnLife), - Note: make([]byte, 2), - }, - PaymentTxnFields: transactions.PaymentTxnFields{ - Receiver: addresses[recvIdx], - Amount: basics.MicroAlgos{Raw: mockBalancesMinBalance + (rand.Uint64() % 10000)}, - }, - } - signedTx := tx.Sign(secrets[sendIdx]) - blob := protocol.Encode(&signedTx) - return []transactions.SignedTxn{signedTx}, blob - } - - stxns, blob := makeTxns(1, 2) + stxns, blob := makeTxn(1, 2, addresses, secrets) action := handler.processIncomingTxn(network.IncomingMessage{Data: blob}) require.Equal(t, network.OutgoingMessage{Action: network.Ignore}, action) @@ -1097,6 +1099,80 @@ loop: require.Equal(t, 0, handler.txCanonicalCache.Len()) } +type dupMockNetwork struct { + mocks.MockNetwork + relay []map[network.Peer]struct{} +} + +func (network *dupMockNetwork) RelayArray(ctx context.Context, tag []protocol.Tag, data [][]byte, wait bool, except map[network.Peer]struct{}) error { + network.relay = append(network.relay, except) + return nil +} + +// TestTxHandlerProcessIncomingRelay ensures txHandler does not relay duplicates +func TestTxHandlerProcessIncomingRelayDups(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + const numUsers = 100 + log := logging.TestingLog(t) + log.SetLevel(logging.Info) + + // prepare the accounts + addresses, secrets, genesis := makeTestGenesisAccounts(t, numUsers) + genBal := bookkeeping.MakeGenesisBalances(genesis, sinkAddr, poolAddr) + ledgerName := fmt.Sprintf("%s-mem", t.Name()) + const inMem = true + cfg := config.GetDefaultLocal() + cfg.Archival = true + cfg.EnableTxBacklogRateLimiting = false + cfg.TxIncomingFilteringFlags = 1 // txFilterRawMsg + ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg) + require.NoError(t, err) + defer ledger.Close() + + net := dupMockNetwork{} + + tp := pools.MakeTransactionPool(ledger.Ledger, cfg, logging.Base()) + backlogPool := execpool.MakeBacklog(nil, 0, execpool.LowPriority, nil) + opts := TxHandlerOpts{ + tp, backlogPool, ledger, &net, "", crypto.Digest{}, cfg, + } + + handler, err := MakeTxHandler(opts) + require.NoError(t, err) + handler.Start() + defer handler.Stop() + + defer handler.txVerificationPool.Shutdown() + defer close(handler.streamVerifierDropped) + + _, blob := makeTxn(1, 2, addresses, secrets) + + type mockPeer struct { + id int + } + + for i := 1; i <= 3; i++ { + msg := network.IncomingMessage{Data: blob, Sender: mockPeer{i}} + handler.processIncomingTxn(msg) + } + + // wait until txn propagates in handler + var timeout time.Duration + waitTime := 10 * time.Millisecond + for tp.PendingCount() < 1 { + time.Sleep(waitTime) + timeout += waitTime + if timeout >= time.Second { + require.Fail(t, "timed out waiting for txn to propagate") + } + } + // there is almost no delay between tp.Remember and net.RelayArray + require.Len(t, net.relay, 1) // one RelayArray call + require.Len(t, net.relay[0], 3) // 3 except peers +} + const benchTxnNum = 25_000 func BenchmarkTxHandlerDecoder(b *testing.B) { diff --git a/node/node_test.go b/node/node_test.go index bf23d15046..f584710902 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -36,7 +36,6 @@ import ( "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/data/bookkeeping" "github.com/algorand/go-algorand/logging" - "github.com/algorand/go-algorand/network" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/test/partitiontest" "github.com/algorand/go-algorand/util" @@ -540,43 +539,3 @@ func TestOfflineOnlineClosedBitStatus(t *testing.T) { }) } } - -// TestNodeDuplicateTransactions creates a network of 4 nodes R, N1-N3, and check nodes do not receive -// duplicate transactions back -func TestNodeDuplicateTransactions(t *testing.T) { - partitiontest.PartitionTest(t) - - log := logging.TestingLog(t) - - var allocation []bookkeeping.GenesisAllocation - - genesis := bookkeeping.Genesis{ - SchemaID: "go-test-node-genesis", - Proto: protocol.ConsensusCurrentVersion, - Network: config.Devtestnet, - FeeSink: sinkAddr.String(), - RewardsPool: poolAddr.String(), - Allocation: allocation, - } - - cfg := config.GetDefaultLocal() - - neighbors := make([]string, numAccounts) - for i := range neighbors { - neighbors[i] = "127.0.0.1:" + strconv.Itoa(10000+i) - } - - wallets := make([]string, numAccounts) - nodes := make([]*AlgorandFullNode, numAccounts) - rootDirs := make([]string, 0) - - for i := range wallets { - rootDirectory := t.TempDir() - rootDirs = append(rootDirs, rootDirectory) - - defaultConfig.NetAddress = neighbors[i] - defaultConfig.SaveToDisk(rootDirectory) - } - - p2pNode, err := network.NewWebsocketNetwork(log, cfg, phonebookAddresses, genesis.ID(), genesis.Network, node) -} From 676c1451aa97ae416392bda1a4a5b9d6cbe17b21 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Thu, 1 Jun 2023 20:53:15 -0400 Subject: [PATCH 10/22] switch from map + copy to a pointer to sync.Map --- components/mocks/mockNetwork.go | 5 ++-- data/txDupCache.go | 41 +++++++++++---------------------- data/txHandler.go | 16 ++++++------- data/txHandler_test.go | 13 +++++++---- network/wsNetwork.go | 21 +++++++++++------ 5 files changed, 47 insertions(+), 49 deletions(-) diff --git a/components/mocks/mockNetwork.go b/components/mocks/mockNetwork.go index ee6839b697..dee47e0e56 100644 --- a/components/mocks/mockNetwork.go +++ b/components/mocks/mockNetwork.go @@ -20,6 +20,7 @@ import ( "context" "net" "net/http" + "sync" "github.com/algorand/go-algorand/network" "github.com/algorand/go-algorand/protocol" @@ -36,7 +37,7 @@ func (network *MockNetwork) Broadcast(ctx context.Context, tag protocol.Tag, dat } // BroadcastArray - unused function -func (network *MockNetwork) BroadcastArray(ctx context.Context, tag []protocol.Tag, data [][]byte, wait bool, except map[network.Peer]struct{}) error { +func (network *MockNetwork) BroadcastArray(ctx context.Context, tag []protocol.Tag, data [][]byte, wait bool, except *sync.Map) error { return nil } @@ -46,7 +47,7 @@ func (network *MockNetwork) Relay(ctx context.Context, tag protocol.Tag, data [] } // RelayArray - unused function -func (network *MockNetwork) RelayArray(ctx context.Context, tag []protocol.Tag, data [][]byte, wait bool, except map[network.Peer]struct{}) error { +func (network *MockNetwork) RelayArray(ctx context.Context, tag []protocol.Tag, data [][]byte, wait bool, except *sync.Map) error { return nil } diff --git a/data/txDupCache.go b/data/txDupCache.go index 25283e5181..e519b384b0 100644 --- a/data/txDupCache.go +++ b/data/txDupCache.go @@ -109,12 +109,10 @@ func (c *digestCache) Delete(d *crypto.Digest) { delete(c.prev, *d) } -type cacheValue map[network.Peer]struct{} - // digestCacheData is a base data structure for rotating size N accepting crypto.Digest as a key type digestCacheData struct { - cur map[crypto.Digest]cacheValue - prev map[crypto.Digest]cacheValue + cur map[crypto.Digest]*sync.Map + prev map[crypto.Digest]*sync.Map maxSize int mu deadlock.RWMutex @@ -134,7 +132,7 @@ type txSaltedCache struct { func makeSaltedCache(size int) *txSaltedCache { return &txSaltedCache{ digestCacheData: digestCacheData{ - cur: map[crypto.Digest]cacheValue{}, + cur: map[crypto.Digest]*sync.Map{}, maxSize: size, }, } @@ -192,17 +190,17 @@ func (c *txSaltedCache) innerSwap(scheduled bool) { if scheduled { // updating by timer, the prev size is a good estimation of a current load => preallocate - c.cur = make(map[crypto.Digest]cacheValue, len(c.prev)) + c.cur = make(map[crypto.Digest]*sync.Map, len(c.prev)) } else { // otherwise start empty - c.cur = map[crypto.Digest]cacheValue{} + c.cur = map[crypto.Digest]*sync.Map{} } c.moreSalt() } // innerCheck returns true if exists, the salted hash if does not exist // locking semantic: read lock must be held -func (c *txSaltedCache) innerCheck(msg []byte) (*crypto.Digest, cacheValue, *map[crypto.Digest]cacheValue, bool) { +func (c *txSaltedCache) innerCheck(msg []byte) (*crypto.Digest, *sync.Map, *map[crypto.Digest]*sync.Map, bool) { ptr := saltedPool.Get() defer saltedPool.Put(ptr) @@ -230,16 +228,7 @@ func (c *txSaltedCache) innerCheck(msg []byte) (*crypto.Digest, cacheValue, *map // CheckAndPut adds msg into a cache if not found // returns a hashing key used for insertion if the message not found. -func (c *txSaltedCache) CheckAndPut(msg []byte, sender network.Peer) (*crypto.Digest, cacheValue, bool) { - // copy vals since its owned by the cache and may be concurrently modified - copy := func(in cacheValue) cacheValue { - out := make(cacheValue, len(in)) - for p := range in { - out[p] = struct{}{} - } - return out - } - +func (c *txSaltedCache) CheckAndPut(msg []byte, sender network.Peer) (*crypto.Digest, *sync.Map, bool) { c.mu.RLock() d, vals, page, found := c.innerCheck(msg) salt := c.curSalt @@ -247,8 +236,7 @@ func (c *txSaltedCache) CheckAndPut(msg []byte, sender network.Peer) (*crypto.Di // keep lock - it is needed for copying vals in defer senderFound := false if found { - if _, senderFound = vals[sender]; senderFound { - vals = copy(vals) + if _, senderFound = vals.Load(sender); senderFound { c.mu.RUnlock() return d, vals, true } @@ -262,9 +250,8 @@ func (c *txSaltedCache) CheckAndPut(msg []byte, sender network.Peer) (*crypto.Di if salt != c.curSalt { d, vals, page, found = c.innerCheck(msg) if found { - if _, senderFound = vals[sender]; senderFound { + if _, senderFound = vals.Load(sender); senderFound { // already added to cache between RUnlock() and Lock(), return - vals = copy(vals) return d, vals, true } } @@ -276,8 +263,7 @@ func (c *txSaltedCache) CheckAndPut(msg []byte, sender network.Peer) (*crypto.Di // Only check current to save a lookup since swap is handled in the first branch vals, found = c.cur[*d] if found { - if _, senderFound = vals[sender]; senderFound { - vals = copy(vals) + if _, senderFound = vals.Load(sender); senderFound { return d, vals, true } page = &c.cur @@ -288,9 +274,8 @@ func (c *txSaltedCache) CheckAndPut(msg []byte, sender network.Peer) (*crypto.Di // 1. the message is not in the cache // 2. the message is in the cache but from other senders if found && !senderFound { - vals[sender] = struct{}{} + vals.Store(sender, struct{}{}) (*page)[*d] = vals - vals = copy(vals) return d, vals, true } @@ -308,9 +293,9 @@ func (c *txSaltedCache) CheckAndPut(msg []byte, sender network.Peer) (*crypto.Di d = &dn } - vals = cacheValue{sender: {}} + vals = &sync.Map{} + vals.Store(sender, struct{}{}) c.cur[*d] = vals - vals = copy(vals) return d, vals, false } diff --git a/data/txHandler.go b/data/txHandler.go index a92ad5db91..2036b40d3f 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -101,13 +101,13 @@ const ( // The txBacklogMsg structure used to track a single incoming transaction from the gossip network, type txBacklogMsg struct { - rawmsg *network.IncomingMessage // the raw message from the network - unverifiedTxGroup []transactions.SignedTxn // the unverified ( and signed ) transaction group - rawmsgDataHash *crypto.Digest // hash (if any) of raw message data from the network - unverifiedTxGroupHash *crypto.Digest // hash (if any) of the unverifiedTxGroup - verificationErr error // The verification error generated by the verification function, if any. - capguard *util.ErlCapacityGuard // the structure returned from the elastic rate limiter, to be released when dequeued - peers map[network.Peer]struct{} // peers that ever sent this message + rawmsg *network.IncomingMessage // the raw message from the network + unverifiedTxGroup []transactions.SignedTxn // the unverified ( and signed ) transaction group + rawmsgDataHash *crypto.Digest // hash (if any) of raw message data from the network + unverifiedTxGroupHash *crypto.Digest // hash (if any) of the unverifiedTxGroup + verificationErr error // The verification error generated by the verification function, if any. + capguard *util.ErlCapacityGuard // the structure returned from the elastic rate limiter, to be released when dequeued + peers *sync.Map // peers that ever sent this message } // TxHandler handles transaction messages @@ -570,7 +570,7 @@ func (handler *TxHandler) dedupCanonical(ntx int, unverifiedTxGroup []transactio func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) network.OutgoingMessage { var msgKey *crypto.Digest var isDup bool - var peers map[network.Peer]struct{} + var peers *sync.Map if handler.msgCache != nil { // check for duplicate messages // this helps against relaying duplicates diff --git a/data/txHandler_test.go b/data/txHandler_test.go index 4d8898ddca..a606d4f9f1 100644 --- a/data/txHandler_test.go +++ b/data/txHandler_test.go @@ -1101,10 +1101,10 @@ loop: type dupMockNetwork struct { mocks.MockNetwork - relay []map[network.Peer]struct{} + relay []*sync.Map } -func (network *dupMockNetwork) RelayArray(ctx context.Context, tag []protocol.Tag, data [][]byte, wait bool, except map[network.Peer]struct{}) error { +func (network *dupMockNetwork) RelayArray(ctx context.Context, tag []protocol.Tag, data [][]byte, wait bool, except *sync.Map) error { network.relay = append(network.relay, except) return nil } @@ -1169,8 +1169,13 @@ func TestTxHandlerProcessIncomingRelayDups(t *testing.T) { } } // there is almost no delay between tp.Remember and net.RelayArray - require.Len(t, net.relay, 1) // one RelayArray call - require.Len(t, net.relay[0], 3) // 3 except peers + require.Len(t, net.relay, 1) // one RelayArray call + var count int + net.relay[0].Range(func(key, value interface{}) bool { + count++ + return true + }) + require.Equal(t, count, 3) // 3 except peers } const benchTxnNum = 25_000 diff --git a/network/wsNetwork.go b/network/wsNetwork.go index 2e2d925f27..4bef02b0d7 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -151,6 +151,11 @@ const peerShutdownDisconnectionAckDuration = 50 * time.Millisecond // Peer opaque interface for referring to a neighbor in the network type Peer interface{} +type PeerList interface { + Has() bool + Add(Peer) +} + // PeerOption allows users to specify a subset of peers to query // //msgp:ignore PeerOption @@ -171,9 +176,9 @@ const ( type GossipNode interface { Address() (string, bool) Broadcast(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except Peer) error - BroadcastArray(ctx context.Context, tag []protocol.Tag, data [][]byte, wait bool, except map[Peer]struct{}) error + BroadcastArray(ctx context.Context, tag []protocol.Tag, data [][]byte, wait bool, except *sync.Map) error Relay(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except Peer) error - RelayArray(ctx context.Context, tag []protocol.Tag, data [][]byte, wait bool, except map[Peer]struct{}) error + RelayArray(ctx context.Context, tag []protocol.Tag, data [][]byte, wait bool, except *sync.Map) error Disconnect(badnode Peer) DisconnectPeers() Ready() chan struct{} @@ -476,7 +481,7 @@ const ( type broadcastRequest struct { tags []Tag data [][]byte - except map[Peer]struct{} + except *sync.Map done chan struct{} enqueueTime time.Time ctx context.Context @@ -519,14 +524,16 @@ func (wn *WebsocketNetwork) Broadcast(ctx context.Context, tag protocol.Tag, dat dataArray[0] = data tagArray := make([]protocol.Tag, 1, 1) tagArray[0] = tag - return wn.BroadcastArray(ctx, tagArray, dataArray, wait, map[Peer]struct{}{except: {}}) + exceptPeers := &sync.Map{} + exceptPeers.Store(except, struct{}{}) + return wn.BroadcastArray(ctx, tagArray, dataArray, wait, exceptPeers) } // BroadcastArray sends an array of messages. // If except is not nil then we will not send it to that neighboring Peer. // if wait is true then the call blocks until the packet has actually been sent to all neighbors. // TODO: add `priority` argument so that we don't have to guess it based on tag -func (wn *WebsocketNetwork) BroadcastArray(ctx context.Context, tags []protocol.Tag, data [][]byte, wait bool, except map[Peer]struct{}) error { +func (wn *WebsocketNetwork) BroadcastArray(ctx context.Context, tags []protocol.Tag, data [][]byte, wait bool, except *sync.Map) error { if wn.config.DisableNetworking { return nil } @@ -587,7 +594,7 @@ func (wn *WebsocketNetwork) Relay(ctx context.Context, tag protocol.Tag, data [] } // RelayArray relays array of messages -func (wn *WebsocketNetwork) RelayArray(ctx context.Context, tags []protocol.Tag, data [][]byte, wait bool, except map[Peer]struct{}) error { +func (wn *WebsocketNetwork) RelayArray(ctx context.Context, tags []protocol.Tag, data [][]byte, wait bool, except *sync.Map) error { if wn.relayMessages { return wn.BroadcastArray(ctx, tags, data, wait, except) } @@ -1577,7 +1584,7 @@ func (wn *WebsocketNetwork) innerBroadcast(request broadcastRequest, prio bool, if wn.config.BroadcastConnectionsLimit >= 0 && sentMessageCount >= wn.config.BroadcastConnectionsLimit { break } - if _, ok := request.except[peer]; ok { + if _, ok := request.except.Load(peer); ok { continue } var ok bool From ad563e5e22137de39d1dd0aa15891a5e9d58502e Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 2 Jun 2023 11:27:51 -0400 Subject: [PATCH 11/22] fix tests --- data/txDupCache_test.go | 58 ++++++++++++++++++++++++++--------------- data/txHandler_test.go | 29 +++++++++++++++------ network/wsNetwork.go | 11 +++----- 3 files changed, 62 insertions(+), 36 deletions(-) diff --git a/data/txDupCache_test.go b/data/txDupCache_test.go index ab4197e2fe..10a582a322 100644 --- a/data/txDupCache_test.go +++ b/data/txDupCache_test.go @@ -382,6 +382,22 @@ func TestTxHandlerSaltedCacheValues(t *testing.T) { cache.Start(context.Background(), 0) require.Zero(t, cache.Len()) + smapLenEqual := func(t *testing.T, smap *sync.Map, expectedLen int) { + t.Helper() + actualLen := 0 + smap.Range(func(_, _ interface{}) bool { + actualLen++ + return true + }) + require.Equal(t, expectedLen, actualLen) + } + + smapContains := func(t *testing.T, smap *sync.Map, key interface{}) { + t.Helper() + _, ok := smap.Load(key) + require.True(t, ok) + } + type snd struct { id int } @@ -407,9 +423,9 @@ func TestTxHandlerSaltedCacheValues(t *testing.T) { require.Equal(t, p, &cache.cur) require.Equal(t, *p, cache.cur) require.Len(t, *p, 1) - require.Len(t, v, 1) + smapLenEqual(t, v, 1) require.Equal(t, v, v1) - require.Contains(t, v, snd{id: 1}) + smapContains(t, v, snd{id: 1}) d, v, found = cache.CheckAndPut([]byte{1}, snd{id: 1}) require.True(t, found) require.NotNil(t, d) @@ -417,7 +433,7 @@ func TestTxHandlerSaltedCacheValues(t *testing.T) { require.Equal(t, *d, *d1) require.Equal(t, v, v1) require.Len(t, cache.cur, 1) - require.Len(t, cache.cur[*d], 1) + smapLenEqual(t, cache.cur[*d], 1) require.Nil(t, cache.prev) // add a value with different sender @@ -436,19 +452,19 @@ func TestTxHandlerSaltedCacheValues(t *testing.T) { require.Equal(t, v, vt) require.Equal(t, p, &cache.cur) require.Len(t, *p, 1) - require.Len(t, v, 2) - require.Contains(t, v, snd{id: 1}) - require.Contains(t, v, snd{id: 2}) + smapLenEqual(t, v, 2) + smapContains(t, v, snd{id: 1}) + smapContains(t, v, snd{id: 2}) // add one more value to full cache.cur d2, v2, found := cache.CheckAndPut([]byte{2}, snd{id: 1}) require.False(t, found) require.NotNil(t, d2) require.NotEmpty(t, d2) - require.Len(t, v2, 1) + smapLenEqual(t, v2, 1) require.Len(t, cache.cur, 2) - require.Len(t, cache.cur[*d1], 2) - require.Len(t, cache.cur[*d2], 1) + smapLenEqual(t, cache.cur[*d1], 2) + smapLenEqual(t, cache.cur[*d2], 1) require.Nil(t, cache.prev) // adding new value would trigger cache swap @@ -458,10 +474,10 @@ func TestTxHandlerSaltedCacheValues(t *testing.T) { require.NotNil(t, dt) require.NotEmpty(t, dt) require.Equal(t, *d2, *dt) - require.Len(t, vt, 2) + smapLenEqual(t, vt, 2) require.Len(t, cache.cur, 2) - require.Len(t, cache.cur[*d1], 2) - require.Len(t, cache.cur[*d2], 2) + smapLenEqual(t, cache.cur[*d1], 2) + smapLenEqual(t, cache.cur[*d2], 2) require.Nil(t, cache.prev) // add a new value triggers a swap @@ -469,12 +485,12 @@ func TestTxHandlerSaltedCacheValues(t *testing.T) { require.False(t, found) require.NotNil(t, d2) require.NotEmpty(t, d2) - require.Len(t, v3, 1) + smapLenEqual(t, v3, 1) require.Len(t, cache.cur, 1) - require.Len(t, cache.cur[*d3], 1) + smapLenEqual(t, cache.cur[*d3], 1) require.Len(t, cache.prev, 2) - require.Len(t, cache.prev[*d1], 2) - require.Len(t, cache.prev[*d2], 2) + smapLenEqual(t, cache.prev[*d1], 2) + smapLenEqual(t, cache.prev[*d2], 2) // add a sender into old (prev) value dt, vt, found = cache.CheckAndPut([]byte{2}, snd{id: 3}) @@ -483,10 +499,10 @@ func TestTxHandlerSaltedCacheValues(t *testing.T) { require.NotEmpty(t, dt) require.Equal(t, *d2, *dt) require.Len(t, cache.cur, 1) - require.Len(t, cache.cur[*d3], 1) + smapLenEqual(t, cache.cur[*d3], 1) require.Len(t, cache.prev, 2) - require.Len(t, cache.prev[*d1], 2) - require.Len(t, cache.prev[*d2], 3) + smapLenEqual(t, cache.prev[*d1], 2) + smapLenEqual(t, cache.prev[*d2], 3) d, v, p, found = cache.innerCheck([]byte{2}) require.True(t, found) require.NotNil(t, p) @@ -496,7 +512,7 @@ func TestTxHandlerSaltedCacheValues(t *testing.T) { require.Equal(t, *d, *d2) require.Equal(t, p, &cache.prev) require.Len(t, *p, 2) - require.Len(t, v, 3) + smapLenEqual(t, v, 3) require.Equal(t, vt, v) - require.Contains(t, v, snd{id: 3}) + smapContains(t, v, snd{id: 3}) } diff --git a/data/txHandler_test.go b/data/txHandler_test.go index a606d4f9f1..620c6ac9db 100644 --- a/data/txHandler_test.go +++ b/data/txHandler_test.go @@ -1101,14 +1101,30 @@ loop: type dupMockNetwork struct { mocks.MockNetwork + mu sync.Mutex relay []*sync.Map } func (network *dupMockNetwork) RelayArray(ctx context.Context, tag []protocol.Tag, data [][]byte, wait bool, except *sync.Map) error { + network.mu.Lock() + defer network.mu.Unlock() network.relay = append(network.relay, except) return nil } +func (network *dupMockNetwork) requireRelayCount(t *testing.T, callsCount int, exceptIdx int, exceptPeersCount int) { + network.mu.Lock() + defer network.mu.Unlock() + + require.Len(t, network.relay, callsCount) + var count int + network.relay[exceptIdx].Range(func(_, _ interface{}) bool { + count++ + return true + }) + require.Equal(t, count, exceptPeersCount) +} + // TestTxHandlerProcessIncomingRelay ensures txHandler does not relay duplicates func TestTxHandlerProcessIncomingRelayDups(t *testing.T) { partitiontest.PartitionTest(t) @@ -1168,14 +1184,11 @@ func TestTxHandlerProcessIncomingRelayDups(t *testing.T) { require.Fail(t, "timed out waiting for txn to propagate") } } - // there is almost no delay between tp.Remember and net.RelayArray - require.Len(t, net.relay, 1) // one RelayArray call - var count int - net.relay[0].Range(func(key, value interface{}) bool { - count++ - return true - }) - require.Equal(t, count, 3) // 3 except peers + // there is almost no delay between tp.Remember and net.RelayArray but still wait until the goroutine finishes + handler.ctxCancel() + handler.backlogWg.Wait() + + net.requireRelayCount(t, 1, 0, 3) // one RelayArray call and 3 except peers } const benchTxnNum = 25_000 diff --git a/network/wsNetwork.go b/network/wsNetwork.go index 4bef02b0d7..3cf78123ad 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -151,11 +151,6 @@ const peerShutdownDisconnectionAckDuration = 50 * time.Millisecond // Peer opaque interface for referring to a neighbor in the network type Peer interface{} -type PeerList interface { - Has() bool - Add(Peer) -} - // PeerOption allows users to specify a subset of peers to query // //msgp:ignore PeerOption @@ -1584,8 +1579,10 @@ func (wn *WebsocketNetwork) innerBroadcast(request broadcastRequest, prio bool, if wn.config.BroadcastConnectionsLimit >= 0 && sentMessageCount >= wn.config.BroadcastConnectionsLimit { break } - if _, ok := request.except.Load(peer); ok { - continue + if request.except != nil { + if _, ok := request.except.Load(peer); ok { + continue + } } var ok bool if peer.pfProposalCompressionSupported() && len(dataWithCompression) > 0 { From f8823404abd53a21678a8bc91babcf895c163646 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 2 Jun 2023 11:37:16 -0400 Subject: [PATCH 12/22] use c.prev as a alloc size guidance --- data/txDupCache.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/data/txDupCache.go b/data/txDupCache.go index e519b384b0..fe6178fa6e 100644 --- a/data/txDupCache.go +++ b/data/txDupCache.go @@ -188,13 +188,8 @@ func (c *txSaltedCache) innerSwap(scheduled bool) { c.prevSalt = c.curSalt c.prev = c.cur - if scheduled { - // updating by timer, the prev size is a good estimation of a current load => preallocate - c.cur = make(map[crypto.Digest]*sync.Map, len(c.prev)) - } else { - // otherwise start empty - c.cur = map[crypto.Digest]*sync.Map{} - } + // the prev size is a good estimation of a current load + c.cur = make(map[crypto.Digest]*sync.Map, len(c.prev)) c.moreSalt() } From 440b63bde78e5e0fbc79d6222db46b101e9a8fd0 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 2 Jun 2023 11:57:40 -0400 Subject: [PATCH 13/22] fix lint --- data/txHandler_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data/txHandler_test.go b/data/txHandler_test.go index 620c6ac9db..01c2422db9 100644 --- a/data/txHandler_test.go +++ b/data/txHandler_test.go @@ -1101,7 +1101,7 @@ loop: type dupMockNetwork struct { mocks.MockNetwork - mu sync.Mutex + mu deadlock.Mutex relay []*sync.Map } From 92304032a511c9eccf89f98a054e255fb09454e8 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Mon, 5 Jun 2023 08:34:05 -0400 Subject: [PATCH 14/22] CR fixes: remove extra data types --- data/txDupCache.go | 34 ++++++++++------------------------ 1 file changed, 10 insertions(+), 24 deletions(-) diff --git a/data/txDupCache.go b/data/txDupCache.go index fe6178fa6e..3a3d37fdd2 100644 --- a/data/txDupCache.go +++ b/data/txDupCache.go @@ -31,8 +31,9 @@ import ( "golang.org/x/crypto/blake2b" ) -// digestCacheBase is a base data structure for rotating size N accepting crypto.Digest as a key -type digestCacheBase struct { +// digestCache is a rotating cache of size N accepting crypto.Digest as a key +// and keeping up to 2*N elements in memory +type digestCache struct { cur map[crypto.Digest]struct{} prev map[crypto.Digest]struct{} @@ -40,18 +41,10 @@ type digestCacheBase struct { mu deadlock.RWMutex } -// digestCache is a rotating cache of size N accepting crypto.Digest as a key -// and keeping up to 2*N elements in memory -type digestCache struct { - digestCacheBase -} - func makeDigestCache(size int) *digestCache { c := &digestCache{ - digestCacheBase: digestCacheBase{ - cur: map[crypto.Digest]struct{}{}, - maxSize: size, - }, + cur: map[crypto.Digest]struct{}{}, + maxSize: size, } return c } @@ -109,19 +102,14 @@ func (c *digestCache) Delete(d *crypto.Digest) { delete(c.prev, *d) } -// digestCacheData is a base data structure for rotating size N accepting crypto.Digest as a key -type digestCacheData struct { +// txSaltedCache is a cache with a rotating salt +// uses blake2b hash function, and stores set of values per each entry +type txSaltedCache struct { cur map[crypto.Digest]*sync.Map prev map[crypto.Digest]*sync.Map maxSize int mu deadlock.RWMutex -} - -// txSaltedCache is a digest cache with a rotating salt -// uses blake2b hash function -type txSaltedCache struct { - digestCacheData curSalt [4]byte prevSalt [4]byte @@ -131,10 +119,8 @@ type txSaltedCache struct { func makeSaltedCache(size int) *txSaltedCache { return &txSaltedCache{ - digestCacheData: digestCacheData{ - cur: map[crypto.Digest]*sync.Map{}, - maxSize: size, - }, + cur: map[crypto.Digest]*sync.Map{}, + maxSize: size, } } From 4b6e9ef02d3f19bb6e7247b1ed7513416337426d Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Tue, 6 Jun 2023 11:01:47 -0400 Subject: [PATCH 15/22] CR fixes --- data/txDupCache.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/data/txDupCache.go b/data/txDupCache.go index 3a3d37fdd2..4347dba021 100644 --- a/data/txDupCache.go +++ b/data/txDupCache.go @@ -165,12 +165,12 @@ func (c *txSaltedCache) moreSalt() { func (c *txSaltedCache) Remix() { c.mu.Lock() defer c.mu.Unlock() - c.innerSwap(true) + c.innerSwap() } // innerSwap rotates cache pages and update the salt used. // locking semantic: write lock must be held -func (c *txSaltedCache) innerSwap(scheduled bool) { +func (c *txSaltedCache) innerSwap() { c.prevSalt = c.curSalt c.prev = c.cur @@ -207,9 +207,9 @@ func (c *txSaltedCache) innerCheck(msg []byte) (*crypto.Digest, *sync.Map, *map[ return &d, nil, nil, false } -// CheckAndPut adds msg into a cache if not found -// returns a hashing key used for insertion if the message not found. -func (c *txSaltedCache) CheckAndPut(msg []byte, sender network.Peer) (*crypto.Digest, *sync.Map, bool) { +// CheckAndPut adds (msg, sender) pair into a cache. The sender is appended into values if msg is already in the cache. +// Returns a hashing key used for insertion and its associated map of values. The boolean flag `found` is false if the msg not in the cache. +func (c *txSaltedCache) CheckAndPut(msg []byte, sender network.Peer) (d *crypto.Digest, vals *sync.Map, found bool) { c.mu.RLock() d, vals, page, found := c.innerCheck(msg) salt := c.curSalt @@ -252,16 +252,16 @@ func (c *txSaltedCache) CheckAndPut(msg []byte, sender network.Peer) (*crypto.Di } // at this point we know that either: - // 1. the message is not in the cache + // 1. the message is not in the cache so do not check senderFound again // 2. the message is in the cache but from other senders - if found && !senderFound { + if found { vals.Store(sender, struct{}{}) (*page)[*d] = vals return d, vals, true } if len(c.cur) >= c.maxSize { - c.innerSwap(false) + c.innerSwap() ptr := saltedPool.Get() defer saltedPool.Put(ptr) From 366b7a1fcba79d2f404d01ce9eec1129ddf2d91a Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Tue, 6 Jun 2023 11:05:17 -0400 Subject: [PATCH 16/22] CR feedback: use eventually --- data/txHandler_test.go | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/data/txHandler_test.go b/data/txHandler_test.go index 01c2422db9..dea6f9acc7 100644 --- a/data/txHandler_test.go +++ b/data/txHandler_test.go @@ -1175,15 +1175,10 @@ func TestTxHandlerProcessIncomingRelayDups(t *testing.T) { } // wait until txn propagates in handler - var timeout time.Duration - waitTime := 10 * time.Millisecond - for tp.PendingCount() < 1 { - time.Sleep(waitTime) - timeout += waitTime - if timeout >= time.Second { - require.Fail(t, "timed out waiting for txn to propagate") - } - } + require.Eventually(t, func() bool { + return tp.PendingCount() >= 1 + }, time.Second, 10*time.Millisecond) + // there is almost no delay between tp.Remember and net.RelayArray but still wait until the goroutine finishes handler.ctxCancel() handler.backlogWg.Wait() From 6b27c17af58daf4d7dbef8fcb8710668ca8827a9 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 9 Jun 2023 11:41:50 -0400 Subject: [PATCH 17/22] duplicates + unique senders benchmark --- data/txDupCache_test.go | 33 +++++++++++++++++++++++++++++---- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/data/txDupCache_test.go b/data/txDupCache_test.go index 10a582a322..7be67cbf94 100644 --- a/data/txDupCache_test.go +++ b/data/txDupCache_test.go @@ -18,6 +18,7 @@ package data import ( "context" + "encoding/binary" "fmt" "math/rand" "sync" @@ -279,7 +280,7 @@ func TestTxHandlerSaltedCacheManual(t *testing.T) { // benchmark abstractions type cachePusher interface { - push() + push(seed int) } type cacheMaker interface { @@ -288,6 +289,7 @@ type cacheMaker interface { type digestCacheMaker struct{} type saltedCacheMaker struct{} +type saltedCacheDupMaker struct{} func (m digestCacheMaker) make(size int) cachePusher { return &digestCachePusher{c: makeDigestCache(size)} @@ -298,6 +300,12 @@ func (m saltedCacheMaker) make(size int) cachePusher { return scp } +func (m saltedCacheDupMaker) make(size int) cachePusher { + scp := &saltedCacheDupPusher{c: makeSaltedCache(size)} + scp.c.Start(context.Background(), 0) + return scp +} + type digestCachePusher struct { c *digestCache } @@ -305,19 +313,31 @@ type saltedCachePusher struct { c *txSaltedCache } -func (p *digestCachePusher) push() { +type saltedCacheDupPusher struct { + c *txSaltedCache +} + +func (p *digestCachePusher) push(seed int) { var d [crypto.DigestSize]byte crypto.RandBytes(d[:]) h := crypto.Digest(blake2b.Sum256(d[:])) // digestCache does not hashes so calculate hash here p.c.CheckAndPut(&h) } -func (p *saltedCachePusher) push() { +func (p *saltedCachePusher) push(seed int) { var d [crypto.DigestSize]byte crypto.RandBytes(d[:]) p.c.CheckAndPut(d[:], struct{}{}) // saltedCache hashes inside } +func (p *saltedCacheDupPusher) push(seed int) { + var d [crypto.DigestSize]byte + var peer [8]byte + crypto.RandBytes(peer[:]) + binary.BigEndian.PutUint64(d[:], uint64(seed)) + p.c.CheckAndPut(d[:], peer) // saltedCache hashes inside +} + func BenchmarkDigestCaches(b *testing.B) { deadlockDisable := deadlock.Opts.Disable deadlock.Opts.Disable = true @@ -327,18 +347,23 @@ func BenchmarkDigestCaches(b *testing.B) { digestCacheMaker := digestCacheMaker{} saltedCacheMaker := saltedCacheMaker{} + saltedCacheDupMaker := saltedCacheDupMaker{} var benchmarks = []struct { maker cacheMaker numThreads int }{ {digestCacheMaker, 1}, {saltedCacheMaker, 1}, + {saltedCacheDupMaker, 1}, {digestCacheMaker, 4}, {saltedCacheMaker, 4}, + {saltedCacheDupMaker, 4}, {digestCacheMaker, 16}, {saltedCacheMaker, 16}, + {saltedCacheDupMaker, 16}, {digestCacheMaker, 128}, {saltedCacheMaker, 128}, + {saltedCacheDupMaker, 128}, } for _, bench := range benchmarks { b.Run(fmt.Sprintf("%T/threads=%d", bench.maker, bench.numThreads), func(b *testing.B) { @@ -365,7 +390,7 @@ func benchmarkDigestCache(b *testing.B, m cacheMaker, numThreads int) { go func() { defer wg.Done() for j := 0; j < numHashes; j++ { - p.push() + p.push(j) } }() } From 1685b49613001670b5107a4798dd17e80e0c5cf5 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 9 Jun 2023 12:08:00 -0400 Subject: [PATCH 18/22] refactor locks --- data/txDupCache.go | 40 +++++++++++++++------------------------- 1 file changed, 15 insertions(+), 25 deletions(-) diff --git a/data/txDupCache.go b/data/txDupCache.go index 4347dba021..a015c42666 100644 --- a/data/txDupCache.go +++ b/data/txDupCache.go @@ -211,54 +211,44 @@ func (c *txSaltedCache) innerCheck(msg []byte) (*crypto.Digest, *sync.Map, *map[ // Returns a hashing key used for insertion and its associated map of values. The boolean flag `found` is false if the msg not in the cache. func (c *txSaltedCache) CheckAndPut(msg []byte, sender network.Peer) (d *crypto.Digest, vals *sync.Map, found bool) { c.mu.RLock() - d, vals, page, found := c.innerCheck(msg) + d, vals, _, found = c.innerCheck(msg) salt := c.curSalt + c.mu.RUnlock() // fast read-only path: assuming most messages are duplicates, hash msg and check cache // keep lock - it is needed for copying vals in defer senderFound := false if found { - if _, senderFound = vals.Load(sender); senderFound { - c.mu.RUnlock() - return d, vals, true + if _, senderFound = vals.Load(sender); !senderFound { + vals.Store(sender, struct{}{}) } + return d, vals, true } - c.mu.RUnlock() // not found: acquire write lock to add this msg hash to cache c.mu.Lock() - defer c.mu.Unlock() // salt may have changed between RUnlock() and Lock(), rehash if needed if salt != c.curSalt { - d, vals, page, found = c.innerCheck(msg) + d, vals, _, found = c.innerCheck(msg) if found { - if _, senderFound = vals.Load(sender); senderFound { - // already added to cache between RUnlock() and Lock(), return - return d, vals, true + c.mu.Unlock() + if _, senderFound = vals.Load(sender); !senderFound { + vals.Store(sender, struct{}{}) } + return d, vals, true } - } else if found && page == &c.prev { - // there is match with prev page, update the value with data possible added in between locks - vals, found = c.prev[*d] } else { // not found or found in cur page // Do another check to see if another copy of the transaction won the race to write it to the cache // Only check current to save a lookup since swap is handled in the first branch vals, found = c.cur[*d] if found { - if _, senderFound = vals.Load(sender); senderFound { - return d, vals, true + c.mu.Unlock() + if _, senderFound = vals.Load(sender); !senderFound { + vals.Store(sender, struct{}{}) } - page = &c.cur + return d, vals, true } } - - // at this point we know that either: - // 1. the message is not in the cache so do not check senderFound again - // 2. the message is in the cache but from other senders - if found { - vals.Store(sender, struct{}{}) - (*page)[*d] = vals - return d, vals, true - } + defer c.mu.Unlock() if len(c.cur) >= c.maxSize { c.innerSwap() From bf3bad5ee468a1e863b494dc7d5e99b8c7e09c23 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 9 Jun 2023 12:10:54 -0400 Subject: [PATCH 19/22] remove page from innerCheck --- data/txDupCache.go | 12 ++++++------ data/txDupCache_test.go | 21 +++++---------------- 2 files changed, 11 insertions(+), 22 deletions(-) diff --git a/data/txDupCache.go b/data/txDupCache.go index a015c42666..75c40db076 100644 --- a/data/txDupCache.go +++ b/data/txDupCache.go @@ -181,7 +181,7 @@ func (c *txSaltedCache) innerSwap() { // innerCheck returns true if exists, the salted hash if does not exist // locking semantic: read lock must be held -func (c *txSaltedCache) innerCheck(msg []byte) (*crypto.Digest, *sync.Map, *map[crypto.Digest]*sync.Map, bool) { +func (c *txSaltedCache) innerCheck(msg []byte) (*crypto.Digest, *sync.Map, bool) { ptr := saltedPool.Get() defer saltedPool.Put(ptr) @@ -194,7 +194,7 @@ func (c *txSaltedCache) innerCheck(msg []byte) (*crypto.Digest, *sync.Map, *map[ v, found := c.cur[d] if found { - return &d, v, &c.cur, true + return &d, v, true } toBeHashed = append(toBeHashed[:len(msg)], c.prevSalt[:]...) @@ -202,16 +202,16 @@ func (c *txSaltedCache) innerCheck(msg []byte) (*crypto.Digest, *sync.Map, *map[ pd := crypto.Digest(blake2b.Sum256(toBeHashed)) v, found = c.prev[pd] if found { - return &pd, v, &c.prev, true + return &pd, v, true } - return &d, nil, nil, false + return &d, nil, false } // CheckAndPut adds (msg, sender) pair into a cache. The sender is appended into values if msg is already in the cache. // Returns a hashing key used for insertion and its associated map of values. The boolean flag `found` is false if the msg not in the cache. func (c *txSaltedCache) CheckAndPut(msg []byte, sender network.Peer) (d *crypto.Digest, vals *sync.Map, found bool) { c.mu.RLock() - d, vals, _, found = c.innerCheck(msg) + d, vals, found = c.innerCheck(msg) salt := c.curSalt c.mu.RUnlock() // fast read-only path: assuming most messages are duplicates, hash msg and check cache @@ -228,7 +228,7 @@ func (c *txSaltedCache) CheckAndPut(msg []byte, sender network.Peer) (d *crypto. c.mu.Lock() // salt may have changed between RUnlock() and Lock(), rehash if needed if salt != c.curSalt { - d, vals, _, found = c.innerCheck(msg) + d, vals, found = c.innerCheck(msg) if found { c.mu.Unlock() if _, senderFound = vals.Load(sender); !senderFound { diff --git a/data/txDupCache_test.go b/data/txDupCache_test.go index 7be67cbf94..a1be6bdc6c 100644 --- a/data/txDupCache_test.go +++ b/data/txDupCache_test.go @@ -110,7 +110,7 @@ func TestTxHandlerDigestCache(t *testing.T) { } func (c *txSaltedCache) check(msg []byte) bool { - _, _, _, found := c.innerCheck(msg) + _, _, found := c.innerCheck(msg) return found } @@ -427,9 +427,8 @@ func TestTxHandlerSaltedCacheValues(t *testing.T) { id int } - d, v, p, found := cache.innerCheck([]byte{1}) + d, v, found := cache.innerCheck([]byte{1}) require.False(t, found) - require.Nil(t, p) require.Nil(t, v) require.NotNil(t, d) require.NotEmpty(t, d) @@ -439,15 +438,11 @@ func TestTxHandlerSaltedCacheValues(t *testing.T) { require.False(t, found) require.NotNil(t, d1) require.NotEmpty(t, d1) - d, v, p, found = cache.innerCheck([]byte{1}) + d, v, found = cache.innerCheck([]byte{1}) require.True(t, found) - require.NotNil(t, p) require.NotNil(t, v) require.NotNil(t, d) require.Equal(t, *d, *d1) - require.Equal(t, p, &cache.cur) - require.Equal(t, *p, cache.cur) - require.Len(t, *p, 1) smapLenEqual(t, v, 1) require.Equal(t, v, v1) smapContains(t, v, snd{id: 1}) @@ -467,16 +462,13 @@ func TestTxHandlerSaltedCacheValues(t *testing.T) { require.NotNil(t, dt) require.NotEmpty(t, dt) require.Nil(t, cache.prev) - d, v, p, found = cache.innerCheck([]byte{1}) + d, v, found = cache.innerCheck([]byte{1}) require.True(t, found) - require.NotNil(t, p) require.NotNil(t, v) require.NotNil(t, d) require.Equal(t, *d, *dt) require.Equal(t, *d, *d1) require.Equal(t, v, vt) - require.Equal(t, p, &cache.cur) - require.Len(t, *p, 1) smapLenEqual(t, v, 2) smapContains(t, v, snd{id: 1}) smapContains(t, v, snd{id: 2}) @@ -528,15 +520,12 @@ func TestTxHandlerSaltedCacheValues(t *testing.T) { require.Len(t, cache.prev, 2) smapLenEqual(t, cache.prev[*d1], 2) smapLenEqual(t, cache.prev[*d2], 3) - d, v, p, found = cache.innerCheck([]byte{2}) + d, v, found = cache.innerCheck([]byte{2}) require.True(t, found) - require.NotNil(t, p) require.NotNil(t, v) require.NotNil(t, d) require.Equal(t, *d, *dt) require.Equal(t, *d, *d2) - require.Equal(t, p, &cache.prev) - require.Len(t, *p, 2) smapLenEqual(t, v, 3) require.Equal(t, vt, v) smapContains(t, v, snd{id: 3}) From 5129bd084f340c83fac3011f5990379170fd6e6e Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 9 Jun 2023 19:02:03 -0400 Subject: [PATCH 20/22] use LoadOrStore --- data/txDupCache.go | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/data/txDupCache.go b/data/txDupCache.go index 75c40db076..c465748c76 100644 --- a/data/txDupCache.go +++ b/data/txDupCache.go @@ -216,11 +216,8 @@ func (c *txSaltedCache) CheckAndPut(msg []byte, sender network.Peer) (d *crypto. c.mu.RUnlock() // fast read-only path: assuming most messages are duplicates, hash msg and check cache // keep lock - it is needed for copying vals in defer - senderFound := false if found { - if _, senderFound = vals.Load(sender); !senderFound { - vals.Store(sender, struct{}{}) - } + vals.LoadOrStore(sender, struct{}{}) return d, vals, true } @@ -231,9 +228,7 @@ func (c *txSaltedCache) CheckAndPut(msg []byte, sender network.Peer) (d *crypto. d, vals, found = c.innerCheck(msg) if found { c.mu.Unlock() - if _, senderFound = vals.Load(sender); !senderFound { - vals.Store(sender, struct{}{}) - } + vals.LoadOrStore(sender, struct{}{}) return d, vals, true } } else { // not found or found in cur page @@ -242,9 +237,7 @@ func (c *txSaltedCache) CheckAndPut(msg []byte, sender network.Peer) (d *crypto. vals, found = c.cur[*d] if found { c.mu.Unlock() - if _, senderFound = vals.Load(sender); !senderFound { - vals.Store(sender, struct{}{}) - } + vals.LoadOrStore(sender, struct{}{}) return d, vals, true } } From e462b90ac676d1759efe9b066742c4198a41fb1b Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Fri, 9 Jun 2023 19:17:05 -0400 Subject: [PATCH 21/22] add ignore metric --- network/wsNetwork.go | 6 +++++- network/wsNetwork_test.go | 25 +++++++++++++++++++++++-- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/network/wsNetwork.go b/network/wsNetwork.go index 5128b64913..02b43480cc 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -115,6 +115,7 @@ var networkIncomingBufferMicros = metrics.MakeCounter(metrics.MetricName{Name: " var networkHandleMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_rx_handle_micros_total", Description: "microseconds spent by protocol handlers in the receive thread"}) var networkBroadcasts = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcasts_total", Description: "number of broadcast operations"}) +var networkBroadcastsIgnored = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcasts_ignored_total", Description: "number of broadcasts ignored"}) var networkBroadcastQueueMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_queue_micros_total", Description: "microseconds broadcast requests sit on queue"}) var networkBroadcastSendMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_send_micros_total", Description: "microseconds spent broadcasting"}) var networkBroadcastsDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_broadcasts_dropped_total", Description: "number of broadcast messages not sent to any peer"}) @@ -1587,12 +1588,14 @@ func (wn *WebsocketNetwork) innerBroadcast(request broadcastRequest, prio bool, // first send to all the easy outbound peers who don't block, get them started. sentMessageCount := 0 + peersIgnored := uint64(0) for _, peer := range peers { if wn.config.BroadcastConnectionsLimit >= 0 && sentMessageCount >= wn.config.BroadcastConnectionsLimit { break } if request.except != nil { - if _, ok := request.except.Load(peer); ok { + if _, ok := request.except.Load(Peer(peer)); ok { + peersIgnored++ continue } } @@ -1622,6 +1625,7 @@ func (wn *WebsocketNetwork) innerBroadcast(request broadcastRequest, prio bool, dt := time.Since(start) networkBroadcasts.Inc(nil) + networkBroadcastsIgnored.AddUint64(peersIgnored, nil) networkBroadcastSendMicros.AddUint64(uint64(dt.Nanoseconds()/1000), nil) } diff --git a/network/wsNetwork_test.go b/network/wsNetwork_test.go index bafc4f2b39..81f6139bb3 100644 --- a/network/wsNetwork_test.go +++ b/network/wsNetwork_test.go @@ -23,7 +23,6 @@ import ( "encoding/binary" "encoding/json" "fmt" - "github.com/algorand/go-algorand/internal/rapidgen" "io" "math/rand" "net" @@ -31,7 +30,6 @@ import ( "net/http/httptest" "net/url" "os" - "pgregory.net/rapid" "regexp" "runtime" "sort" @@ -41,6 +39,9 @@ import ( "testing" "time" + "github.com/algorand/go-algorand/internal/rapidgen" + "pgregory.net/rapid" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -4358,3 +4359,23 @@ func TestMergePrimarySecondaryRelayAddressListsNoDedupExp(t *testing.T) { assert.ElementsMatch(t, expectedRelayAddresses, mergedRelayAddresses) }) } + +func TestInterfaceLookup(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + var m sync.Map + peer1 := &wsPeer{peerIndex: 1} + peer2 := &wsPeer{peerIndex: 1} + + var sender1 Peer = peer1 + m.Store(sender1, struct{}{}) + m.Store(peer2, struct{}{}) + + _, ok := m.Load(Peer(peer1)) + require.True(t, ok) + _, ok = m.Load(peer2) + require.True(t, ok) + _, ok = m.Load(sender1) + require.True(t, ok) +} From 2fa404663d3ba8aab695874c3bdbc84bd344c3db Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Tue, 13 Jun 2023 17:30:22 -0400 Subject: [PATCH 22/22] Add except and exceptMany to reduce allocations --- components/mocks/mockNetwork.go | 2 +- network/wsNetwork.go | 32 +++++++++++++++++++------------- network/wsNetwork_test.go | 4 ++-- 3 files changed, 22 insertions(+), 16 deletions(-) diff --git a/components/mocks/mockNetwork.go b/components/mocks/mockNetwork.go index dee47e0e56..63b1668989 100644 --- a/components/mocks/mockNetwork.go +++ b/components/mocks/mockNetwork.go @@ -37,7 +37,7 @@ func (network *MockNetwork) Broadcast(ctx context.Context, tag protocol.Tag, dat } // BroadcastArray - unused function -func (network *MockNetwork) BroadcastArray(ctx context.Context, tag []protocol.Tag, data [][]byte, wait bool, except *sync.Map) error { +func (network *MockNetwork) BroadcastArray(ctx context.Context, tag []protocol.Tag, data [][]byte, wait bool, except network.Peer, exceptMany *sync.Map) error { return nil } diff --git a/network/wsNetwork.go b/network/wsNetwork.go index 02b43480cc..c82b427e46 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -115,7 +115,7 @@ var networkIncomingBufferMicros = metrics.MakeCounter(metrics.MetricName{Name: " var networkHandleMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_rx_handle_micros_total", Description: "microseconds spent by protocol handlers in the receive thread"}) var networkBroadcasts = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcasts_total", Description: "number of broadcast operations"}) -var networkBroadcastsIgnored = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcasts_ignored_total", Description: "number of broadcasts ignored"}) +var networkBroadcastsIgnored = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcasts_peer_skipped_total", Description: "number of skipped peers in broadcast operations"}) // this metric is bound to algod_network_broadcasts_total: skipped = var networkBroadcastQueueMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_queue_micros_total", Description: "microseconds broadcast requests sit on queue"}) var networkBroadcastSendMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_send_micros_total", Description: "microseconds spent broadcasting"}) var networkBroadcastsDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_broadcasts_dropped_total", Description: "number of broadcast messages not sent to any peer"}) @@ -174,9 +174,9 @@ const ( type GossipNode interface { Address() (string, bool) Broadcast(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except Peer) error - BroadcastArray(ctx context.Context, tag []protocol.Tag, data [][]byte, wait bool, except *sync.Map) error + BroadcastArray(ctx context.Context, tag []protocol.Tag, data [][]byte, wait bool, except Peer, exceptMany *sync.Map) error Relay(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except Peer) error - RelayArray(ctx context.Context, tag []protocol.Tag, data [][]byte, wait bool, except *sync.Map) error + RelayArray(ctx context.Context, tag []protocol.Tag, data [][]byte, wait bool, exceptMany *sync.Map) error Disconnect(badnode Peer) DisconnectPeers() Ready() chan struct{} @@ -482,7 +482,8 @@ const ( type broadcastRequest struct { tags []Tag data [][]byte - except *sync.Map + except *wsPeer + exceptMany *sync.Map done chan struct{} enqueueTime time.Time ctx context.Context @@ -525,16 +526,14 @@ func (wn *WebsocketNetwork) Broadcast(ctx context.Context, tag protocol.Tag, dat dataArray[0] = data tagArray := make([]protocol.Tag, 1, 1) tagArray[0] = tag - exceptPeers := &sync.Map{} - exceptPeers.Store(except, struct{}{}) - return wn.BroadcastArray(ctx, tagArray, dataArray, wait, exceptPeers) + return wn.BroadcastArray(ctx, tagArray, dataArray, wait, except, nil) } // BroadcastArray sends an array of messages. // If except is not nil then we will not send it to that neighboring Peer. // if wait is true then the call blocks until the packet has actually been sent to all neighbors. // TODO: add `priority` argument so that we don't have to guess it based on tag -func (wn *WebsocketNetwork) BroadcastArray(ctx context.Context, tags []protocol.Tag, data [][]byte, wait bool, except *sync.Map) error { +func (wn *WebsocketNetwork) BroadcastArray(ctx context.Context, tags []protocol.Tag, data [][]byte, wait bool, except Peer, exceptMany *sync.Map) error { if wn.config.DisableNetworking { return nil } @@ -545,7 +544,10 @@ func (wn *WebsocketNetwork) BroadcastArray(ctx context.Context, tags []protocol. request := broadcastRequest{tags: tags, data: data, enqueueTime: time.Now(), ctx: ctx} if except != nil { - request.except = except + request.except = except.(*wsPeer) + } + if exceptMany != nil { + request.exceptMany = exceptMany } broadcastQueue := wn.broadcastQueueBulk @@ -595,9 +597,9 @@ func (wn *WebsocketNetwork) Relay(ctx context.Context, tag protocol.Tag, data [] } // RelayArray relays array of messages -func (wn *WebsocketNetwork) RelayArray(ctx context.Context, tags []protocol.Tag, data [][]byte, wait bool, except *sync.Map) error { +func (wn *WebsocketNetwork) RelayArray(ctx context.Context, tags []protocol.Tag, data [][]byte, wait bool, exceptMany *sync.Map) error { if wn.relayMessages { - return wn.BroadcastArray(ctx, tags, data, wait, except) + return wn.BroadcastArray(ctx, tags, data, wait, nil, exceptMany) } return nil } @@ -1593,8 +1595,12 @@ func (wn *WebsocketNetwork) innerBroadcast(request broadcastRequest, prio bool, if wn.config.BroadcastConnectionsLimit >= 0 && sentMessageCount >= wn.config.BroadcastConnectionsLimit { break } - if request.except != nil { - if _, ok := request.except.Load(Peer(peer)); ok { + if peer == request.except { + peersIgnored++ + continue + } + if request.exceptMany != nil { + if _, ok := request.exceptMany.Load(Peer(peer)); ok { peersIgnored++ continue } diff --git a/network/wsNetwork_test.go b/network/wsNetwork_test.go index 81f6139bb3..3203a5d99f 100644 --- a/network/wsNetwork_test.go +++ b/network/wsNetwork_test.go @@ -522,7 +522,7 @@ func TestWebsocketNetworkArray(t *testing.T) { tags := []protocol.Tag{protocol.TxnTag, protocol.TxnTag, protocol.TxnTag} data := [][]byte{[]byte("foo"), []byte("bar"), []byte("algo")} - netA.BroadcastArray(context.Background(), tags, data, false, nil) + netA.BroadcastArray(context.Background(), tags, data, false, nil, nil) select { case <-counterDone: @@ -550,7 +550,7 @@ func TestWebsocketNetworkCancel(t *testing.T) { cancel() // try calling BroadcastArray - netA.BroadcastArray(ctx, tags, data, true, nil) + netA.BroadcastArray(ctx, tags, data, true, nil, nil) select { case <-counterDone: