diff --git a/components/mocks/mockNetwork.go b/components/mocks/mockNetwork.go index 8c8eb113f1..63b1668989 100644 --- a/components/mocks/mockNetwork.go +++ b/components/mocks/mockNetwork.go @@ -20,6 +20,7 @@ import ( "context" "net" "net/http" + "sync" "github.com/algorand/go-algorand/network" "github.com/algorand/go-algorand/protocol" @@ -35,11 +36,21 @@ func (network *MockNetwork) Broadcast(ctx context.Context, tag protocol.Tag, dat return nil } +// BroadcastArray - unused function +func (network *MockNetwork) BroadcastArray(ctx context.Context, tag []protocol.Tag, data [][]byte, wait bool, except network.Peer, exceptMany *sync.Map) error { + return nil +} + // Relay - unused function func (network *MockNetwork) Relay(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except network.Peer) error { return nil } +// RelayArray - unused function +func (network *MockNetwork) RelayArray(ctx context.Context, tag []protocol.Tag, data [][]byte, wait bool, except *sync.Map) error { + return nil +} + // Address - unused function func (network *MockNetwork) Address() (string, bool) { return "mock network", true diff --git a/data/txDupCache.go b/data/txDupCache.go index 96a426dc7b..c465748c76 100644 --- a/data/txDupCache.go +++ b/data/txDupCache.go @@ -25,6 +25,7 @@ import ( "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/crypto" + "github.com/algorand/go-algorand/network" "github.com/algorand/go-deadlock" "golang.org/x/crypto/blake2b" @@ -101,10 +102,14 @@ func (c *digestCache) Delete(d *crypto.Digest) { delete(c.prev, *d) } -// txSaltedCache is a digest cache with a rotating salt -// uses blake2b hash function +// txSaltedCache is a cache with a rotating salt +// uses blake2b hash function, and stores set of values per each entry type txSaltedCache struct { - digestCache + cur map[crypto.Digest]*sync.Map + prev map[crypto.Digest]*sync.Map + + maxSize int + mu deadlock.RWMutex curSalt [4]byte prevSalt [4]byte @@ -114,7 +119,8 @@ type txSaltedCache struct { func makeSaltedCache(size int) *txSaltedCache { return &txSaltedCache{ - digestCache: *makeDigestCache(size), + cur: map[crypto.Digest]*sync.Map{}, + maxSize: size, } } @@ -159,28 +165,23 @@ func (c *txSaltedCache) moreSalt() { func (c *txSaltedCache) Remix() { c.mu.Lock() defer c.mu.Unlock() - c.innerSwap(true) + c.innerSwap() } // innerSwap rotates cache pages and update the salt used. // locking semantic: write lock must be held -func (c *txSaltedCache) innerSwap(scheduled bool) { +func (c *txSaltedCache) innerSwap() { c.prevSalt = c.curSalt c.prev = c.cur - if scheduled { - // updating by timer, the prev size is a good estimation of a current load => preallocate - c.cur = make(map[crypto.Digest]struct{}, len(c.prev)) - } else { - // otherwise start empty - c.cur = map[crypto.Digest]struct{}{} - } + // the prev size is a good estimation of a current load + c.cur = make(map[crypto.Digest]*sync.Map, len(c.prev)) c.moreSalt() } -// innerCheck returns true if exists, and the current salted hash if does not. -// locking semantic: write lock must be held -func (c *txSaltedCache) innerCheck(msg []byte) (*crypto.Digest, bool) { +// innerCheck returns true if exists, the salted hash if does not exist +// locking semantic: read lock must be held +func (c *txSaltedCache) innerCheck(msg []byte) (*crypto.Digest, *sync.Map, bool) { ptr := saltedPool.Get() defer saltedPool.Put(ptr) @@ -191,53 +192,59 @@ func (c *txSaltedCache) innerCheck(msg []byte) (*crypto.Digest, bool) { d := crypto.Digest(blake2b.Sum256(toBeHashed)) - _, found := c.cur[d] + v, found := c.cur[d] if found { - return nil, true + return &d, v, true } toBeHashed = append(toBeHashed[:len(msg)], c.prevSalt[:]...) toBeHashed = toBeHashed[:len(msg)+len(c.prevSalt)] pd := crypto.Digest(blake2b.Sum256(toBeHashed)) - _, found = c.prev[pd] + v, found = c.prev[pd] if found { - return nil, true + return &pd, v, true } - return &d, false + return &d, nil, false } -// CheckAndPut adds msg into a cache if not found -// returns a hashing key used for insertion if the message not found. -func (c *txSaltedCache) CheckAndPut(msg []byte) (*crypto.Digest, bool) { +// CheckAndPut adds (msg, sender) pair into a cache. The sender is appended into values if msg is already in the cache. +// Returns a hashing key used for insertion and its associated map of values. The boolean flag `found` is false if the msg not in the cache. +func (c *txSaltedCache) CheckAndPut(msg []byte, sender network.Peer) (d *crypto.Digest, vals *sync.Map, found bool) { c.mu.RLock() - d, found := c.innerCheck(msg) + d, vals, found = c.innerCheck(msg) salt := c.curSalt c.mu.RUnlock() // fast read-only path: assuming most messages are duplicates, hash msg and check cache + // keep lock - it is needed for copying vals in defer if found { - return d, found + vals.LoadOrStore(sender, struct{}{}) + return d, vals, true } // not found: acquire write lock to add this msg hash to cache c.mu.Lock() - defer c.mu.Unlock() // salt may have changed between RUnlock() and Lock(), rehash if needed if salt != c.curSalt { - d, found = c.innerCheck(msg) + d, vals, found = c.innerCheck(msg) if found { - // already added to cache between RUnlock() and Lock(), return - return d, found + c.mu.Unlock() + vals.LoadOrStore(sender, struct{}{}) + return d, vals, true } - } else { + } else { // not found or found in cur page // Do another check to see if another copy of the transaction won the race to write it to the cache - // Only check current to save a lookup since swaps are rare and no need to re-hash - if _, found := c.cur[*d]; found { - return d, found + // Only check current to save a lookup since swap is handled in the first branch + vals, found = c.cur[*d] + if found { + c.mu.Unlock() + vals.LoadOrStore(sender, struct{}{}) + return d, vals, true } } + defer c.mu.Unlock() if len(c.cur) >= c.maxSize { - c.innerSwap(false) + c.innerSwap() ptr := saltedPool.Get() defer saltedPool.Put(ptr) @@ -250,13 +257,26 @@ func (c *txSaltedCache) CheckAndPut(msg []byte) (*crypto.Digest, bool) { d = &dn } - c.cur[*d] = struct{}{} - return d, false + vals = &sync.Map{} + vals.Store(sender, struct{}{}) + c.cur[*d] = vals + return d, vals, false } // DeleteByKey from the cache by using a key used for insertion func (c *txSaltedCache) DeleteByKey(d *crypto.Digest) { - c.digestCache.Delete(d) + c.mu.Lock() + defer c.mu.Unlock() + delete(c.cur, *d) + delete(c.prev, *d) +} + +// Len returns size of a cache +func (c *txSaltedCache) Len() int { + c.mu.Lock() + defer c.mu.Unlock() + + return len(c.cur) + len(c.prev) } var saltedPool = sync.Pool{ diff --git a/data/txDupCache_test.go b/data/txDupCache_test.go index 13a36b19d1..a1be6bdc6c 100644 --- a/data/txDupCache_test.go +++ b/data/txDupCache_test.go @@ -18,6 +18,7 @@ package data import ( "context" + "encoding/binary" "fmt" "math/rand" "sync" @@ -109,7 +110,7 @@ func TestTxHandlerDigestCache(t *testing.T) { } func (c *txSaltedCache) check(msg []byte) bool { - _, found := c.innerCheck(msg) + _, _, found := c.innerCheck(msg) return found } @@ -129,7 +130,7 @@ func TestTxHandlerSaltedCacheBasic(t *testing.T) { var exist bool for i := 0; i < size; i++ { crypto.RandBytes([]byte(ds[i][:])) - ks[i], exist = cache.CheckAndPut(ds[i][:]) + ks[i], _, exist = cache.CheckAndPut(ds[i][:], struct{}{}) require.False(t, exist) require.NotEmpty(t, ks[i]) @@ -141,9 +142,9 @@ func TestTxHandlerSaltedCacheBasic(t *testing.T) { // try to re-add, ensure not added for i := 0; i < size; i++ { - k, exist := cache.CheckAndPut(ds[i][:]) + k, _, exist := cache.CheckAndPut(ds[i][:], struct{}{}) require.True(t, exist) - require.Empty(t, k) + require.NotEmpty(t, k) } require.Equal(t, size, cache.Len()) @@ -153,7 +154,7 @@ func TestTxHandlerSaltedCacheBasic(t *testing.T) { var ks2 [size]*crypto.Digest for i := 0; i < size; i++ { crypto.RandBytes(ds2[i][:]) - ks2[i], exist = cache.CheckAndPut(ds2[i][:]) + ks2[i], _, exist = cache.CheckAndPut(ds2[i][:], struct{}{}) require.False(t, exist) require.NotEmpty(t, ks2[i]) @@ -165,7 +166,7 @@ func TestTxHandlerSaltedCacheBasic(t *testing.T) { var d [8]byte crypto.RandBytes(d[:]) - k, exist := cache.CheckAndPut(d[:]) + k, _, exist := cache.CheckAndPut(d[:], struct{}{}) require.False(t, exist) require.NotEmpty(t, k) exist = cache.check(d[:]) @@ -211,7 +212,7 @@ func TestTxHandlerSaltedCacheScheduled(t *testing.T) { var ds [size][8]byte for i := 0; i < size; i++ { crypto.RandBytes([]byte(ds[i][:])) - k, exist := cache.CheckAndPut(ds[i][:]) + k, _, exist := cache.CheckAndPut(ds[i][:], struct{}{}) require.False(t, exist) require.NotEmpty(t, k) @@ -236,7 +237,7 @@ func TestTxHandlerSaltedCacheManual(t *testing.T) { var ds [size][8]byte for i := 0; i < size; i++ { crypto.RandBytes([]byte(ds[i][:])) - k, exist := cache.CheckAndPut(ds[i][:]) + k, _, exist := cache.CheckAndPut(ds[i][:], struct{}{}) require.False(t, exist) require.NotEmpty(t, k) exist = cache.check(ds[i][:]) @@ -251,7 +252,7 @@ func TestTxHandlerSaltedCacheManual(t *testing.T) { var ds2 [size][8]byte for i := 0; i < size; i++ { crypto.RandBytes([]byte(ds2[i][:])) - k, exist := cache.CheckAndPut(ds2[i][:]) + k, _, exist := cache.CheckAndPut(ds2[i][:], struct{}{}) require.False(t, exist) require.NotEmpty(t, k) exist = cache.check(ds2[i][:]) @@ -279,7 +280,7 @@ func TestTxHandlerSaltedCacheManual(t *testing.T) { // benchmark abstractions type cachePusher interface { - push() + push(seed int) } type cacheMaker interface { @@ -288,6 +289,7 @@ type cacheMaker interface { type digestCacheMaker struct{} type saltedCacheMaker struct{} +type saltedCacheDupMaker struct{} func (m digestCacheMaker) make(size int) cachePusher { return &digestCachePusher{c: makeDigestCache(size)} @@ -298,6 +300,12 @@ func (m saltedCacheMaker) make(size int) cachePusher { return scp } +func (m saltedCacheDupMaker) make(size int) cachePusher { + scp := &saltedCacheDupPusher{c: makeSaltedCache(size)} + scp.c.Start(context.Background(), 0) + return scp +} + type digestCachePusher struct { c *digestCache } @@ -305,17 +313,29 @@ type saltedCachePusher struct { c *txSaltedCache } -func (p *digestCachePusher) push() { +type saltedCacheDupPusher struct { + c *txSaltedCache +} + +func (p *digestCachePusher) push(seed int) { var d [crypto.DigestSize]byte crypto.RandBytes(d[:]) h := crypto.Digest(blake2b.Sum256(d[:])) // digestCache does not hashes so calculate hash here p.c.CheckAndPut(&h) } -func (p *saltedCachePusher) push() { +func (p *saltedCachePusher) push(seed int) { var d [crypto.DigestSize]byte crypto.RandBytes(d[:]) - p.c.CheckAndPut(d[:]) // saltedCache hashes inside + p.c.CheckAndPut(d[:], struct{}{}) // saltedCache hashes inside +} + +func (p *saltedCacheDupPusher) push(seed int) { + var d [crypto.DigestSize]byte + var peer [8]byte + crypto.RandBytes(peer[:]) + binary.BigEndian.PutUint64(d[:], uint64(seed)) + p.c.CheckAndPut(d[:], peer) // saltedCache hashes inside } func BenchmarkDigestCaches(b *testing.B) { @@ -327,18 +347,23 @@ func BenchmarkDigestCaches(b *testing.B) { digestCacheMaker := digestCacheMaker{} saltedCacheMaker := saltedCacheMaker{} + saltedCacheDupMaker := saltedCacheDupMaker{} var benchmarks = []struct { maker cacheMaker numThreads int }{ {digestCacheMaker, 1}, {saltedCacheMaker, 1}, + {saltedCacheDupMaker, 1}, {digestCacheMaker, 4}, {saltedCacheMaker, 4}, + {saltedCacheDupMaker, 4}, {digestCacheMaker, 16}, {saltedCacheMaker, 16}, + {saltedCacheDupMaker, 16}, {digestCacheMaker, 128}, {saltedCacheMaker, 128}, + {saltedCacheDupMaker, 128}, } for _, bench := range benchmarks { b.Run(fmt.Sprintf("%T/threads=%d", bench.maker, bench.numThreads), func(b *testing.B) { @@ -365,9 +390,143 @@ func benchmarkDigestCache(b *testing.B, m cacheMaker, numThreads int) { go func() { defer wg.Done() for j := 0; j < numHashes; j++ { - p.push() + p.push(j) } }() } wg.Wait() } + +// TestTxHandlerSaltedCacheValues checks values are stored correctly +func TestTxHandlerSaltedCacheValues(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + const size = 2 + cache := makeSaltedCache(size) + cache.Start(context.Background(), 0) + require.Zero(t, cache.Len()) + + smapLenEqual := func(t *testing.T, smap *sync.Map, expectedLen int) { + t.Helper() + actualLen := 0 + smap.Range(func(_, _ interface{}) bool { + actualLen++ + return true + }) + require.Equal(t, expectedLen, actualLen) + } + + smapContains := func(t *testing.T, smap *sync.Map, key interface{}) { + t.Helper() + _, ok := smap.Load(key) + require.True(t, ok) + } + + type snd struct { + id int + } + + d, v, found := cache.innerCheck([]byte{1}) + require.False(t, found) + require.Nil(t, v) + require.NotNil(t, d) + require.NotEmpty(t, d) + + // add a value, ensure it can be found + d1, v1, found := cache.CheckAndPut([]byte{1}, snd{id: 1}) + require.False(t, found) + require.NotNil(t, d1) + require.NotEmpty(t, d1) + d, v, found = cache.innerCheck([]byte{1}) + require.True(t, found) + require.NotNil(t, v) + require.NotNil(t, d) + require.Equal(t, *d, *d1) + smapLenEqual(t, v, 1) + require.Equal(t, v, v1) + smapContains(t, v, snd{id: 1}) + d, v, found = cache.CheckAndPut([]byte{1}, snd{id: 1}) + require.True(t, found) + require.NotNil(t, d) + require.NotEmpty(t, d) + require.Equal(t, *d, *d1) + require.Equal(t, v, v1) + require.Len(t, cache.cur, 1) + smapLenEqual(t, cache.cur[*d], 1) + require.Nil(t, cache.prev) + + // add a value with different sender + dt, vt, found := cache.CheckAndPut([]byte{1}, snd{id: 2}) + require.True(t, found) + require.NotNil(t, dt) + require.NotEmpty(t, dt) + require.Nil(t, cache.prev) + d, v, found = cache.innerCheck([]byte{1}) + require.True(t, found) + require.NotNil(t, v) + require.NotNil(t, d) + require.Equal(t, *d, *dt) + require.Equal(t, *d, *d1) + require.Equal(t, v, vt) + smapLenEqual(t, v, 2) + smapContains(t, v, snd{id: 1}) + smapContains(t, v, snd{id: 2}) + + // add one more value to full cache.cur + d2, v2, found := cache.CheckAndPut([]byte{2}, snd{id: 1}) + require.False(t, found) + require.NotNil(t, d2) + require.NotEmpty(t, d2) + smapLenEqual(t, v2, 1) + require.Len(t, cache.cur, 2) + smapLenEqual(t, cache.cur[*d1], 2) + smapLenEqual(t, cache.cur[*d2], 1) + require.Nil(t, cache.prev) + + // adding new value would trigger cache swap + // first ensure new sender for seen message does not trigger a swap + dt, vt, found = cache.CheckAndPut([]byte{2}, snd{id: 2}) + require.True(t, found) + require.NotNil(t, dt) + require.NotEmpty(t, dt) + require.Equal(t, *d2, *dt) + smapLenEqual(t, vt, 2) + require.Len(t, cache.cur, 2) + smapLenEqual(t, cache.cur[*d1], 2) + smapLenEqual(t, cache.cur[*d2], 2) + require.Nil(t, cache.prev) + + // add a new value triggers a swap + d3, v3, found := cache.CheckAndPut([]byte{3}, snd{id: 1}) + require.False(t, found) + require.NotNil(t, d2) + require.NotEmpty(t, d2) + smapLenEqual(t, v3, 1) + require.Len(t, cache.cur, 1) + smapLenEqual(t, cache.cur[*d3], 1) + require.Len(t, cache.prev, 2) + smapLenEqual(t, cache.prev[*d1], 2) + smapLenEqual(t, cache.prev[*d2], 2) + + // add a sender into old (prev) value + dt, vt, found = cache.CheckAndPut([]byte{2}, snd{id: 3}) + require.True(t, found) + require.NotNil(t, dt) + require.NotEmpty(t, dt) + require.Equal(t, *d2, *dt) + require.Len(t, cache.cur, 1) + smapLenEqual(t, cache.cur[*d3], 1) + require.Len(t, cache.prev, 2) + smapLenEqual(t, cache.prev[*d1], 2) + smapLenEqual(t, cache.prev[*d2], 3) + d, v, found = cache.innerCheck([]byte{2}) + require.True(t, found) + require.NotNil(t, v) + require.NotNil(t, d) + require.Equal(t, *d, *dt) + require.Equal(t, *d, *d2) + smapLenEqual(t, v, 3) + require.Equal(t, vt, v) + smapContains(t, v, snd{id: 3}) +} diff --git a/data/txHandler.go b/data/txHandler.go index 74f9b07b77..1fcceb713e 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -107,6 +107,7 @@ type txBacklogMsg struct { unverifiedTxGroupHash *crypto.Digest // hash (if any) of the unverifiedTxGroup verificationErr error // The verification error generated by the verification function, if any. capguard *util.ErlCapacityGuard // the structure returned from the elastic rate limiter, to be released when dequeued + peers *sync.Map // peers that ever sent this message } // TxHandler handles transaction messages @@ -494,7 +495,12 @@ func (handler *TxHandler) postProcessCheckedTxn(wi *txBacklogMsg) { } // We reencode here instead of using rawmsg.Data to avoid broadcasting non-canonical encodings - handler.net.Relay(handler.ctx, protocol.TxnTag, reencode(verifiedTxGroup), false, wi.rawmsg.Sender) + err = handler.net.RelayArray( + handler.ctx, []protocol.Tag{protocol.TxnTag}, + [][]byte{reencode(verifiedTxGroup)}, false, wi.peers) + if err != nil { + logging.Base().Infof("unable to relay transaction: %v", err) + } } func (handler *TxHandler) deleteFromCaches(msgKey *crypto.Digest, canonicalKey *crypto.Digest) { @@ -562,10 +568,11 @@ func (handler *TxHandler) dedupCanonical(ntx int, unverifiedTxGroup []transactio func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) network.OutgoingMessage { var msgKey *crypto.Digest var isDup bool + var peers *sync.Map if handler.msgCache != nil { // check for duplicate messages // this helps against relaying duplicates - if msgKey, isDup = handler.msgCache.CheckAndPut(rawmsg.Data); isDup { + if msgKey, peers, isDup = handler.msgCache.CheckAndPut(rawmsg.Data, rawmsg.Sender); isDup { transactionMessagesDupRawMsg.Inc(nil) return network.OutgoingMessage{Action: network.Ignore} } @@ -647,6 +654,7 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net rawmsgDataHash: msgKey, unverifiedTxGroupHash: canonicalKey, capguard: capguard, + peers: peers, }: default: // if we failed here we want to increase the corresponding metric. It might suggest that we diff --git a/data/txHandler_test.go b/data/txHandler_test.go index f51290a374..dea6f9acc7 100644 --- a/data/txHandler_test.go +++ b/data/txHandler_test.go @@ -1007,6 +1007,28 @@ func TestTxHandlerProcessIncomingCacheBacklogDrop(t *testing.T) { require.Equal(t, initialValue+1, currentValue) } +func makeTxn(sendIdx int, recvIdx int, addresses []basics.Address, secrets []*crypto.SignatureSecrets) ([]transactions.SignedTxn, []byte) { + tx := transactions.Transaction{ + Type: protocol.PaymentTx, + Header: transactions.Header{ + Sender: addresses[sendIdx], + Fee: basics.MicroAlgos{Raw: proto.MinTxnFee * 2}, + FirstValid: 0, + LastValid: basics.Round(proto.MaxTxnLife), + Note: make([]byte, 2), + GenesisID: genesisID, + GenesisHash: genesisHash, + }, + PaymentTxnFields: transactions.PaymentTxnFields{ + Receiver: addresses[recvIdx], + Amount: basics.MicroAlgos{Raw: mockBalancesMinBalance + (rand.Uint64() % 10000)}, + }, + } + signedTx := tx.Sign(secrets[sendIdx]) + blob := protocol.Encode(&signedTx) + return []transactions.SignedTxn{signedTx}, blob +} + func TestTxHandlerProcessIncomingCacheTxPoolDrop(t *testing.T) { partitiontest.PartitionTest(t) @@ -1043,27 +1065,7 @@ loop: } } - makeTxns := func(sendIdx, recvIdx int) ([]transactions.SignedTxn, []byte) { - tx := transactions.Transaction{ - Type: protocol.PaymentTx, - Header: transactions.Header{ - Sender: addresses[sendIdx], - Fee: basics.MicroAlgos{Raw: proto.MinTxnFee * 2}, - FirstValid: 0, - LastValid: basics.Round(proto.MaxTxnLife), - Note: make([]byte, 2), - }, - PaymentTxnFields: transactions.PaymentTxnFields{ - Receiver: addresses[recvIdx], - Amount: basics.MicroAlgos{Raw: mockBalancesMinBalance + (rand.Uint64() % 10000)}, - }, - } - signedTx := tx.Sign(secrets[sendIdx]) - blob := protocol.Encode(&signedTx) - return []transactions.SignedTxn{signedTx}, blob - } - - stxns, blob := makeTxns(1, 2) + stxns, blob := makeTxn(1, 2, addresses, secrets) action := handler.processIncomingTxn(network.IncomingMessage{Data: blob}) require.Equal(t, network.OutgoingMessage{Action: network.Ignore}, action) @@ -1097,6 +1099,93 @@ loop: require.Equal(t, 0, handler.txCanonicalCache.Len()) } +type dupMockNetwork struct { + mocks.MockNetwork + mu deadlock.Mutex + relay []*sync.Map +} + +func (network *dupMockNetwork) RelayArray(ctx context.Context, tag []protocol.Tag, data [][]byte, wait bool, except *sync.Map) error { + network.mu.Lock() + defer network.mu.Unlock() + network.relay = append(network.relay, except) + return nil +} + +func (network *dupMockNetwork) requireRelayCount(t *testing.T, callsCount int, exceptIdx int, exceptPeersCount int) { + network.mu.Lock() + defer network.mu.Unlock() + + require.Len(t, network.relay, callsCount) + var count int + network.relay[exceptIdx].Range(func(_, _ interface{}) bool { + count++ + return true + }) + require.Equal(t, count, exceptPeersCount) +} + +// TestTxHandlerProcessIncomingRelay ensures txHandler does not relay duplicates +func TestTxHandlerProcessIncomingRelayDups(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + const numUsers = 100 + log := logging.TestingLog(t) + log.SetLevel(logging.Info) + + // prepare the accounts + addresses, secrets, genesis := makeTestGenesisAccounts(t, numUsers) + genBal := bookkeeping.MakeGenesisBalances(genesis, sinkAddr, poolAddr) + ledgerName := fmt.Sprintf("%s-mem", t.Name()) + const inMem = true + cfg := config.GetDefaultLocal() + cfg.Archival = true + cfg.EnableTxBacklogRateLimiting = false + cfg.TxIncomingFilteringFlags = 1 // txFilterRawMsg + ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg) + require.NoError(t, err) + defer ledger.Close() + + net := dupMockNetwork{} + + tp := pools.MakeTransactionPool(ledger.Ledger, cfg, logging.Base()) + backlogPool := execpool.MakeBacklog(nil, 0, execpool.LowPriority, nil) + opts := TxHandlerOpts{ + tp, backlogPool, ledger, &net, "", crypto.Digest{}, cfg, + } + + handler, err := MakeTxHandler(opts) + require.NoError(t, err) + handler.Start() + defer handler.Stop() + + defer handler.txVerificationPool.Shutdown() + defer close(handler.streamVerifierDropped) + + _, blob := makeTxn(1, 2, addresses, secrets) + + type mockPeer struct { + id int + } + + for i := 1; i <= 3; i++ { + msg := network.IncomingMessage{Data: blob, Sender: mockPeer{i}} + handler.processIncomingTxn(msg) + } + + // wait until txn propagates in handler + require.Eventually(t, func() bool { + return tp.PendingCount() >= 1 + }, time.Second, 10*time.Millisecond) + + // there is almost no delay between tp.Remember and net.RelayArray but still wait until the goroutine finishes + handler.ctxCancel() + handler.backlogWg.Wait() + + net.requireRelayCount(t, 1, 0, 3) // one RelayArray call and 3 except peers +} + const benchTxnNum = 25_000 func BenchmarkTxHandlerDecoder(b *testing.B) { diff --git a/network/wsNetwork.go b/network/wsNetwork.go index 02784393cc..c82b427e46 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -115,6 +115,7 @@ var networkIncomingBufferMicros = metrics.MakeCounter(metrics.MetricName{Name: " var networkHandleMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_rx_handle_micros_total", Description: "microseconds spent by protocol handlers in the receive thread"}) var networkBroadcasts = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcasts_total", Description: "number of broadcast operations"}) +var networkBroadcastsIgnored = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcasts_peer_skipped_total", Description: "number of skipped peers in broadcast operations"}) // this metric is bound to algod_network_broadcasts_total: skipped = var networkBroadcastQueueMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_queue_micros_total", Description: "microseconds broadcast requests sit on queue"}) var networkBroadcastSendMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_send_micros_total", Description: "microseconds spent broadcasting"}) var networkBroadcastsDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_broadcasts_dropped_total", Description: "number of broadcast messages not sent to any peer"}) @@ -173,9 +174,9 @@ const ( type GossipNode interface { Address() (string, bool) Broadcast(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except Peer) error - BroadcastArray(ctx context.Context, tag []protocol.Tag, data [][]byte, wait bool, except Peer) error + BroadcastArray(ctx context.Context, tag []protocol.Tag, data [][]byte, wait bool, except Peer, exceptMany *sync.Map) error Relay(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except Peer) error - RelayArray(ctx context.Context, tag []protocol.Tag, data [][]byte, wait bool, except Peer) error + RelayArray(ctx context.Context, tag []protocol.Tag, data [][]byte, wait bool, exceptMany *sync.Map) error Disconnect(badnode Peer) DisconnectPeers() Ready() chan struct{} @@ -482,6 +483,7 @@ type broadcastRequest struct { tags []Tag data [][]byte except *wsPeer + exceptMany *sync.Map done chan struct{} enqueueTime time.Time ctx context.Context @@ -524,14 +526,14 @@ func (wn *WebsocketNetwork) Broadcast(ctx context.Context, tag protocol.Tag, dat dataArray[0] = data tagArray := make([]protocol.Tag, 1, 1) tagArray[0] = tag - return wn.BroadcastArray(ctx, tagArray, dataArray, wait, except) + return wn.BroadcastArray(ctx, tagArray, dataArray, wait, except, nil) } // BroadcastArray sends an array of messages. // If except is not nil then we will not send it to that neighboring Peer. // if wait is true then the call blocks until the packet has actually been sent to all neighbors. // TODO: add `priority` argument so that we don't have to guess it based on tag -func (wn *WebsocketNetwork) BroadcastArray(ctx context.Context, tags []protocol.Tag, data [][]byte, wait bool, except Peer) error { +func (wn *WebsocketNetwork) BroadcastArray(ctx context.Context, tags []protocol.Tag, data [][]byte, wait bool, except Peer, exceptMany *sync.Map) error { if wn.config.DisableNetworking { return nil } @@ -544,6 +546,9 @@ func (wn *WebsocketNetwork) BroadcastArray(ctx context.Context, tags []protocol. if except != nil { request.except = except.(*wsPeer) } + if exceptMany != nil { + request.exceptMany = exceptMany + } broadcastQueue := wn.broadcastQueueBulk if highPriorityTag(tags) { @@ -592,9 +597,9 @@ func (wn *WebsocketNetwork) Relay(ctx context.Context, tag protocol.Tag, data [] } // RelayArray relays array of messages -func (wn *WebsocketNetwork) RelayArray(ctx context.Context, tags []protocol.Tag, data [][]byte, wait bool, except Peer) error { +func (wn *WebsocketNetwork) RelayArray(ctx context.Context, tags []protocol.Tag, data [][]byte, wait bool, exceptMany *sync.Map) error { if wn.relayMessages { - return wn.BroadcastArray(ctx, tags, data, wait, except) + return wn.BroadcastArray(ctx, tags, data, wait, nil, exceptMany) } return nil } @@ -1585,13 +1590,21 @@ func (wn *WebsocketNetwork) innerBroadcast(request broadcastRequest, prio bool, // first send to all the easy outbound peers who don't block, get them started. sentMessageCount := 0 + peersIgnored := uint64(0) for _, peer := range peers { if wn.config.BroadcastConnectionsLimit >= 0 && sentMessageCount >= wn.config.BroadcastConnectionsLimit { break } if peer == request.except { + peersIgnored++ continue } + if request.exceptMany != nil { + if _, ok := request.exceptMany.Load(Peer(peer)); ok { + peersIgnored++ + continue + } + } var ok bool if peer.pfProposalCompressionSupported() && len(dataWithCompression) > 0 { // if this peer supports compressed proposals and compressed data batch is filled out, use it @@ -1618,6 +1631,7 @@ func (wn *WebsocketNetwork) innerBroadcast(request broadcastRequest, prio bool, dt := time.Since(start) networkBroadcasts.Inc(nil) + networkBroadcastsIgnored.AddUint64(peersIgnored, nil) networkBroadcastSendMicros.AddUint64(uint64(dt.Nanoseconds()/1000), nil) } diff --git a/network/wsNetwork_test.go b/network/wsNetwork_test.go index bafc4f2b39..3203a5d99f 100644 --- a/network/wsNetwork_test.go +++ b/network/wsNetwork_test.go @@ -23,7 +23,6 @@ import ( "encoding/binary" "encoding/json" "fmt" - "github.com/algorand/go-algorand/internal/rapidgen" "io" "math/rand" "net" @@ -31,7 +30,6 @@ import ( "net/http/httptest" "net/url" "os" - "pgregory.net/rapid" "regexp" "runtime" "sort" @@ -41,6 +39,9 @@ import ( "testing" "time" + "github.com/algorand/go-algorand/internal/rapidgen" + "pgregory.net/rapid" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -521,7 +522,7 @@ func TestWebsocketNetworkArray(t *testing.T) { tags := []protocol.Tag{protocol.TxnTag, protocol.TxnTag, protocol.TxnTag} data := [][]byte{[]byte("foo"), []byte("bar"), []byte("algo")} - netA.BroadcastArray(context.Background(), tags, data, false, nil) + netA.BroadcastArray(context.Background(), tags, data, false, nil, nil) select { case <-counterDone: @@ -549,7 +550,7 @@ func TestWebsocketNetworkCancel(t *testing.T) { cancel() // try calling BroadcastArray - netA.BroadcastArray(ctx, tags, data, true, nil) + netA.BroadcastArray(ctx, tags, data, true, nil, nil) select { case <-counterDone: @@ -4358,3 +4359,23 @@ func TestMergePrimarySecondaryRelayAddressListsNoDedupExp(t *testing.T) { assert.ElementsMatch(t, expectedRelayAddresses, mergedRelayAddresses) }) } + +func TestInterfaceLookup(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + var m sync.Map + peer1 := &wsPeer{peerIndex: 1} + peer2 := &wsPeer{peerIndex: 1} + + var sender1 Peer = peer1 + m.Store(sender1, struct{}{}) + m.Store(peer2, struct{}{}) + + _, ok := m.Load(Peer(peer1)) + require.True(t, ok) + _, ok = m.Load(peer2) + require.True(t, ok) + _, ok = m.Load(sender1) + require.True(t, ok) +}