17
17
package core
18
18
19
19
import (
20
+ "context"
20
21
"errors"
21
22
"math"
22
23
"math/big"
@@ -34,6 +35,7 @@ import (
34
35
"github.com/ethereum/go-ethereum/log"
35
36
"github.com/ethereum/go-ethereum/metrics"
36
37
"github.com/ethereum/go-ethereum/params"
38
+ "github.com/google/uuid"
37
39
"golang.org/x/crypto/sha3"
38
40
)
39
41
@@ -280,6 +282,8 @@ type TxPool struct {
280
282
initDoneCh chan struct {} // is closed once the pool is initialized (for tests)
281
283
282
284
changesSinceReorg int // A counter for how many drops we've performed in-between reorg.
285
+
286
+ bundleFetcher IFetcher
283
287
}
284
288
285
289
type txpoolResetRequest struct {
@@ -344,6 +348,17 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
344
348
return pool
345
349
}
346
350
351
+ type IFetcher interface {
352
+ GetLatestUuidBundles (ctx context.Context , blockNum int64 ) ([]types.LatestUuidBundle , error )
353
+ }
354
+
355
+ func (pool * TxPool ) RegisterBundleFetcher (fetcher IFetcher ) {
356
+ pool .mu .Lock ()
357
+ defer pool .mu .Unlock ()
358
+
359
+ pool .bundleFetcher = fetcher
360
+ }
361
+
347
362
// loop is the transaction pool's main event loop, waiting for and reacting to
348
363
// outside blockchain events as well as for various reporting and transaction
349
364
// eviction events.
@@ -581,21 +596,79 @@ func (pool *TxPool) Pending(enforceTips bool) map[common.Address]types.Transacti
581
596
return pending
582
597
}
583
598
584
- /// AllMevBundles returns all the MEV Bundles currently in the pool
585
- func (pool * TxPool ) AllMevBundles () []types.MevBundle {
586
- return pool .mevBundles
599
+ type uuidBundleKey struct {
600
+ Uuid uuid.UUID
601
+ SigningAddress common.Address
602
+ }
603
+
604
+ func (pool * TxPool ) fetchLatestCancellableBundles (ctx context.Context , blockNumber * big.Int ) (chan []types.LatestUuidBundle , chan error ) {
605
+ if pool .bundleFetcher == nil {
606
+ return nil , nil
607
+ }
608
+ errCh := make (chan error , 1 )
609
+ lubCh := make (chan []types.LatestUuidBundle , 1 )
610
+ go func (blockNum int64 ) {
611
+ lub , err := pool .bundleFetcher .GetLatestUuidBundles (ctx , blockNum )
612
+ errCh <- err
613
+ lubCh <- lub
614
+ }(blockNumber .Int64 ())
615
+ return lubCh , errCh
616
+ }
617
+
618
+ func resolveCancellableBundles (lubCh chan []types.LatestUuidBundle , errCh chan error , uuidBundles map [uuidBundleKey ][]types.MevBundle ) []types.MevBundle {
619
+ if lubCh == nil || errCh == nil {
620
+ return nil
621
+ }
622
+
623
+ if len (uuidBundles ) == 0 {
624
+ return nil
625
+ }
626
+
627
+ err := <- errCh
628
+ if err != nil {
629
+ log .Error ("could not fetch latest bundles uuid map" , "err" , err )
630
+ return nil
631
+ }
632
+
633
+ currentCancellableBundles := []types.MevBundle {}
634
+
635
+ log .Trace ("Processing uuid bundles" , "uuidBundles" , uuidBundles )
636
+
637
+ lubs := <- lubCh
638
+ for _ , lub := range lubs {
639
+ ubk := uuidBundleKey {lub .Uuid , lub .SigningAddress }
640
+ bundles , found := uuidBundles [ubk ]
641
+ if ! found {
642
+ log .Trace ("missing uuid bundle" , "ubk" , ubk )
643
+ continue
644
+ }
645
+ for _ , bundle := range bundles {
646
+ if bundle .Hash == lub .BundleHash {
647
+ log .Trace ("adding uuid bundle" , "bundle hash" , bundle .Hash .String (), "lub" , lub )
648
+ currentCancellableBundles = append (currentCancellableBundles , bundle )
649
+ break
650
+ }
651
+ }
652
+ }
653
+ return currentCancellableBundles
587
654
}
588
655
589
656
// MevBundles returns a list of bundles valid for the given blockNumber/blockTimestamp
590
657
// also prunes bundles that are outdated
591
- func (pool * TxPool ) MevBundles (blockNumber * big.Int , blockTimestamp uint64 ) []types.MevBundle {
658
+ // Returns regular bundles and a function resolving to current cancellable bundles
659
+ func (pool * TxPool ) MevBundles (blockNumber * big.Int , blockTimestamp uint64 ) ([]types.MevBundle , chan []types.MevBundle ) {
592
660
pool .mu .Lock ()
593
661
defer pool .mu .Unlock ()
594
662
663
+ ctx , cancel := context .WithTimeout (context .Background (), 500 * time .Millisecond )
664
+ lubCh , errCh := pool .fetchLatestCancellableBundles (ctx , blockNumber )
665
+
595
666
// returned values
596
667
var ret []types.MevBundle
597
668
// rolled over values
598
669
var bundles []types.MevBundle
670
+ // (uuid, signingAddress) -> list of bundles
671
+ var uuidBundles = make (map [uuidBundleKey ][]types.MevBundle )
599
672
600
673
for _ , bundle := range pool .mevBundles {
601
674
// Prune outdated bundles
@@ -609,14 +682,31 @@ func (pool *TxPool) MevBundles(blockNumber *big.Int, blockTimestamp uint64) []ty
609
682
continue
610
683
}
611
684
612
- // return the ones which are in time
613
- ret = append (ret , bundle )
614
685
// keep the bundles around internally until they need to be pruned
615
686
bundles = append (bundles , bundle )
687
+
688
+ // TODO: omit duplicates
689
+
690
+ // do not append to the return quite yet, check the DB for the latest bundle for that uuid
691
+ if bundle .Uuid != types .EmptyUUID {
692
+ ubk := uuidBundleKey {bundle .Uuid , bundle .SigningAddress }
693
+ uuidBundles [ubk ] = append (uuidBundles [ubk ], bundle )
694
+ continue
695
+ }
696
+
697
+ // return the ones which are in time
698
+ ret = append (ret , bundle )
616
699
}
617
700
618
701
pool .mevBundles = bundles
619
- return ret
702
+
703
+ cancellableBundlesCh := make (chan []types.MevBundle , 1 )
704
+ go func () {
705
+ cancellableBundlesCh <- resolveCancellableBundles (lubCh , errCh , uuidBundles )
706
+ cancel ()
707
+ }()
708
+
709
+ return ret , cancellableBundlesCh
620
710
}
621
711
622
712
// AddMevBundles adds a mev bundles to the pool
@@ -629,7 +719,7 @@ func (pool *TxPool) AddMevBundles(mevBundles []types.MevBundle) error {
629
719
}
630
720
631
721
// AddMevBundle adds a mev bundle to the pool
632
- func (pool * TxPool ) AddMevBundle (txs types.Transactions , blockNumber * big.Int , minTimestamp , maxTimestamp uint64 , revertingTxHashes []common.Hash ) error {
722
+ func (pool * TxPool ) AddMevBundle (txs types.Transactions , blockNumber * big.Int , replacementUuid uuid. UUID , signingAddress common. Address , minTimestamp , maxTimestamp uint64 , revertingTxHashes []common.Hash ) error {
633
723
bundleHasher := sha3 .NewLegacyKeccak256 ()
634
724
for _ , tx := range txs {
635
725
bundleHasher .Write (tx .Hash ().Bytes ())
@@ -642,6 +732,8 @@ func (pool *TxPool) AddMevBundle(txs types.Transactions, blockNumber *big.Int, m
642
732
pool .mevBundles = append (pool .mevBundles , types.MevBundle {
643
733
Txs : txs ,
644
734
BlockNumber : blockNumber ,
735
+ Uuid : replacementUuid ,
736
+ SigningAddress : signingAddress ,
645
737
MinTimestamp : minTimestamp ,
646
738
MaxTimestamp : maxTimestamp ,
647
739
RevertingTxHashes : revertingTxHashes ,
0 commit comments