Skip to content

Commit

Permalink
feat: flush queued txs to mempool (#161)
Browse files Browse the repository at this point in the history
* flush queued txs if there is newly added txs from mempool or blocks

* increase test successness

* ignore sequence mismatch error at flush queued txs
  • Loading branch information
beer-1 authored Feb 11, 2025
1 parent 1149915 commit df95529
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 19 deletions.
16 changes: 15 additions & 1 deletion indexer/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,15 @@ func (e *EVMIndexerImpl) ListenFinalizeBlock(ctx context.Context, req abci.Reque
cumulativeGasUsed := uint64(0)
ethTxs := make([]*coretypes.Transaction, 0, len(req.Txs))
receipts := make([]*coretypes.Receipt, 0, len(req.Txs))
senderNonceMap := make(map[string]uint64)
for idx, txBytes := range req.Txs {
tx, err := e.txConfig.TxDecoder()(txBytes)
if err != nil {
e.logger.Error("failed to decode tx", "err", err)
continue
}

ethTx, _, err := keeper.NewTxUtils(e.evmKeeper).ConvertCosmosTxToEthereumTx(sdkCtx, tx)
ethTx, sender, err := keeper.NewTxUtils(e.evmKeeper).ConvertCosmosTxToEthereumTx(sdkCtx, tx)
if err != nil {
e.logger.Error("failed to convert CosmosTx to EthTx", "err", err)
return err
Expand Down Expand Up @@ -82,6 +83,7 @@ func (e *EVMIndexerImpl) ListenFinalizeBlock(ctx context.Context, req abci.Reque

txIndex++
ethTxs = append(ethTxs, ethTx)
senderNonceMap[sender.Hex()] = ethTx.Nonce()

// extract logs and contract address from tx results
ethLogs, contractAddr, err := extractLogsAndContractAddr(txStatus, txResult.Data, ethTx.To() == nil)
Expand Down Expand Up @@ -201,6 +203,18 @@ func (e *EVMIndexerImpl) ListenFinalizeBlock(ctx context.Context, req abci.Reque
}
}()

// flush queued txs to mempool
if e.flushQueuedTxs != nil {
go func() {
for senderHex, nonce := range senderNonceMap {
// try to flush queued txs from the next nonce
if err := e.flushQueuedTxs(senderHex, nonce+1); err != nil {
e.logger.Error("failed to flush queued txs", "err", err)
}
}
}()
}

// TODO - currently state changes are not supported in abci listener, so we track cosmos block hash at x/evm preblocker.
// - https://github.com/cosmos/cosmos-sdk/issues/22246
//
Expand Down
32 changes: 32 additions & 0 deletions indexer/abci_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,38 @@ func Test_ListenFinalizeBlock_Subscribe(t *testing.T) {
done()
}

func Test_ListenFinalizeBlock_FlushQueuedTxs(t *testing.T) {
app, addrs, privKeys := tests.CreateApp(t)
indexer := app.EVMIndexer()
defer app.Close()

received := make(map[string]uint64)

wg := sync.WaitGroup{}
wg.Add(1)
indexer.RegisterFlushQueuedTxs(func(senderHex string, accSeq uint64) error {
received[senderHex] = accSeq
wg.Done()
return nil
})

tx, _ := tests.GenerateCreateERC20Tx(t, app, privKeys[0])

// load current sequence
ctx, err := app.CreateQueryContext(0, false)
require.NoError(t, err)
seq, err := app.AccountKeeper.GetSequence(ctx, addrs[0].Bytes())
require.NoError(t, err)

_, finalizeRes := tests.ExecuteTxs(t, app, tx)
tests.CheckTxResult(t, finalizeRes.TxResults[0], true)

wg.Wait()

require.Len(t, received, 1)
require.Equal(t, uint64(seq+1), received[addrs[0].Hex()])
}

func Test_ListenFinalizeBlock_ContractCreation(t *testing.T) {
app, _, privKeys := tests.CreateApp(t)
indexer := app.EVMIndexer()
Expand Down
16 changes: 16 additions & 0 deletions indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ type EVMIndexer interface {
MempoolWrapper(mempool mempool.Mempool) mempool.Mempool
TxInMempool(hash common.Hash) *rpctypes.RPCTransaction

// register flush queued txs function to
RegisterFlushQueuedTxs(f FlushQueuedTxs)

// bloom
ReadBloomBits(ctx context.Context, section uint64, index uint32) ([]byte, error)
PeekBloomBitsNextSection(ctx context.Context) (uint64, error)
Expand All @@ -63,6 +66,9 @@ type EVMIndexer interface {
Stop()
}

// FlushQueuedTxs is a function to flush queued transactions.
type FlushQueuedTxs = func(senderHex string, accSeq uint64) error

// EVMIndexerImpl implements EVMIndexer.
type EVMIndexerImpl struct {
enabled bool
Expand Down Expand Up @@ -102,6 +108,9 @@ type EVMIndexerImpl struct {

// txPendingMap is a map to store tx hashes in pending state.
txPendingMap *ttlcache.Cache[common.Hash, *rpctypes.RPCTransaction]

// flushQueuedTxs is a function to flush queued transactions to mempool.
flushQueuedTxs FlushQueuedTxs
}

func NewEVMIndexer(
Expand Down Expand Up @@ -163,6 +172,8 @@ func NewEVMIndexer(
// pending tx lifetime is 1 minute in indexer
ttlcache.WithTTL[common.Hash, *rpctypes.RPCTransaction](time.Minute),
),

flushQueuedTxs: nil,
}

schema, err := sb.Build()
Expand All @@ -189,6 +200,11 @@ func (e *EVMIndexerImpl) Subscribe() (chan *coretypes.Header, chan []*coretypes.
return blockChan, logsChan, pendingChan
}

// RegisterFlushQueuedTxs registers a function to flush queued transactions.
func (e *EVMIndexerImpl) RegisterFlushQueuedTxs(f FlushQueuedTxs) {
e.flushQueuedTxs = f
}

// blockEvents is a struct to emit block events.
type blockEvents struct {
header *coretypes.Header
Expand Down
14 changes: 13 additions & 1 deletion indexer/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,17 @@ func (m *MempoolWrapper) CountTx() int {
// Insert implements mempool.Mempool.
func (m *MempoolWrapper) Insert(ctx context.Context, tx sdk.Tx) error {
txUtils := evmkeeper.NewTxUtils(m.indexer.evmKeeper)
ethTx, _, err := txUtils.ConvertCosmosTxToEthereumTx(ctx, tx)
ethTx, sender, err := txUtils.ConvertCosmosTxToEthereumTx(ctx, tx)
if err != nil {
m.indexer.logger.Error("failed to convert CosmosTx to EthTx", "err", err)
return err
}

if ethTx != nil {
ethTxHash := ethTx.Hash()
senderHex := sender.Hex()
nonce := ethTx.Nonce()

rpcTx := rpctypes.NewRPCTransaction(ethTx, common.Hash{}, 0, 0, ethTx.ChainId())

m.indexer.logger.Debug("inserting tx into mempool", "pending len", m.indexer.txPendingMap.Len(), "ethTxHash", ethTxHash)
Expand All @@ -62,6 +65,15 @@ func (m *MempoolWrapper) Insert(ctx context.Context, tx sdk.Tx) error {
pendingChan <- rpcTx
}
}()

if m.indexer.flushQueuedTxs != nil {
go func() {
// try to flush queued txs from the next nonce
if err := m.indexer.flushQueuedTxs(senderHex, nonce+1); err != nil {
m.indexer.logger.Error("failed to flush queued txs", "err", err)
}
}()
}
}

return m.mempool.Insert(ctx, tx)
Expand Down
34 changes: 34 additions & 0 deletions indexer/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,40 @@ func Test_Mempool_Subscribe(t *testing.T) {
done()
}

func Test_Mempool_TriggerFlushQueuedTxs(t *testing.T) {
app, addrs, privKeys := tests.CreateApp(t)
indexer := app.EVMIndexer()
defer app.Close()

received := make(map[string]uint64)

wg := sync.WaitGroup{}
wg.Add(1)
indexer.RegisterFlushQueuedTxs(func(senderHex string, accSeq uint64) error {
received[senderHex] = accSeq
wg.Done()
return nil
})

tx, _ := tests.GenerateCreateERC20Tx(t, app, privKeys[0])

noopMempool := &mempool.NoOpMempool{}
mempool := indexer.MempoolWrapper(noopMempool)

// insert tx into mempool
ctx, err := app.CreateQueryContext(0, false)
require.NoError(t, err)
seq, err := app.AccountKeeper.GetSequence(ctx, addrs[0].Bytes())
require.NoError(t, err)
err = mempool.Insert(ctx, tx)
require.NoError(t, err)

wg.Wait()

require.Len(t, received, 1)
require.Equal(t, uint64(seq+1), received[addrs[0].Hex()])
}

func consumeBlockLogsChan(blockCh <-chan *coretypes.Header, logChan <-chan []*coretypes.Log, duration int) {
timer := time.NewTimer(time.Second * time.Duration(duration))

Expand Down
11 changes: 11 additions & 0 deletions indexer/prune_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,17 @@ func Test_PruneIndexer(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, evmTx)

// wait for pruning
for {
time.Sleep(100 * time.Millisecond)

if indexer.IsPruningRunning() {
continue
} else {
break
}
}

// mint 1_000_000 tokens to the first address
tx, evmTxHash2 := tests.GenerateMintERC20Tx(t, app, privKeys[0], common.BytesToAddress(contractAddr), addrs[0], new(big.Int).SetUint64(1_000_000_000_000))
finalizeReq, finalizeRes := tests.ExecuteTxs(t, app, tx)
Expand Down
3 changes: 3 additions & 0 deletions jsonrpc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ func NewJSONRPCBackend(
// Start the bloom bits servicing goroutines
b.startBloomHandlers(evmconfig.SectionSize)

// Register flush queued txs function
b.app.EVMIndexer().RegisterFlushQueuedTxs(b.FlushQueuedTxs)

return b, nil
}

Expand Down
59 changes: 42 additions & 17 deletions jsonrpc/backend/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@ import (
"context"
"errors"
"fmt"
"strings"

"cosmossdk.io/collections"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
authsigning "github.com/cosmos/cosmos-sdk/x/auth/signing"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core"
coretypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"

authsigning "github.com/cosmos/cosmos-sdk/x/auth/signing"
rpctypes "github.com/initia-labs/minievm/jsonrpc/types"
"github.com/initia-labs/minievm/x/evm/keeper"
"github.com/initia-labs/minievm/x/evm/types"
Expand Down Expand Up @@ -75,10 +77,6 @@ func (b *JSONRPCBackend) SendTx(tx *coretypes.Transaction) error {

senderHex := common.BytesToAddress(sender.Bytes()).Hex()

// hold mutex for each sender
accMut := b.acquireAccMut(senderHex)
defer b.releaseAccMut(senderHex, accMut)

checkCtx := b.app.GetContextForCheckTx(nil)
if acc := b.app.AccountKeeper.GetAccount(checkCtx, sender); acc != nil {
accSeq = acc.GetSequence()
Expand All @@ -93,27 +91,54 @@ func (b *JSONRPCBackend) SendTx(tx *coretypes.Transaction) error {

txHash := tx.Hash()
b.queuedTxHashes.Store(txHash, cacheKey)

// hold mutex for each sender
accMut := b.acquireAccMut(senderHex)
_ = b.queuedTxs.Add(cacheKey, txQueueItem{hash: txHash, bytes: txBytes, body: tx, sender: senderHex})
b.releaseAccMut(senderHex, accMut)

// check if there are queued txs which can be sent
if err = b.FlushQueuedTxs(senderHex, accSeq); err != nil {
return err
}

return nil
}

// flush the queued transactions for the given sender only if the sequence matches
func (b *JSONRPCBackend) FlushQueuedTxs(senderHex string, accSeq uint64) error {
// hold mutex for each sender
accMut := b.acquireAccMut(senderHex)
defer b.releaseAccMut(senderHex, accMut)

for {
cacheKey := fmt.Sprintf("%s-%d", senderHex, accSeq)
if txQueueItem, ok := b.queuedTxs.Get(cacheKey); ok {
_ = b.queuedTxs.Remove(cacheKey)

b.logger.Debug("broadcast queued tx", "sender", senderHex, "txSeq", accSeq)
res, err := b.clientCtx.BroadcastTxSync(txQueueItem.bytes)
if err != nil {
return err
}
if res.Code != 0 {
return sdkerrors.ErrInvalidRequest.Wrapf("tx failed with code: %d: raw_log: %s", res.Code, res.RawLog)
}
} else {
txQueueItem, ok := b.queuedTxs.Get(cacheKey)
if !ok {
break
}

b.logger.Debug("broadcast queued tx", "sender", senderHex, "txSeq", accSeq)

// increase the sequence number for the next lookup
accSeq++

// remove the tx from the queue
_ = b.queuedTxs.Remove(cacheKey)

// broadcast the tx
res, err := b.clientCtx.BroadcastTxSync(txQueueItem.bytes)
if err != nil && strings.Contains(err.Error(), sdkerrors.ErrWrongSequence.Error()) {
// ignore wrong sequence error
continue
} else if err != nil {
return err
}

// ignore wrong sequence error
if res.Code != 0 && res.Code != sdkerrors.ErrWrongSequence.ABCICode() {
return sdkerrors.ErrInvalidRequest.Wrapf("tx failed with code: %d: raw_log: %s", res.Code, res.RawLog)
}
}

return nil
Expand Down
1 change: 1 addition & 0 deletions x/evm/keeper/txutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func (u *TxUtils) ConvertEthereumTxToCosmosTx(ctx context.Context, ethTx *corety
if err != nil {
return nil, err
}

// sig bytes
v, r, s := ethTx.RawSignatureValues()
sigBytes := make([]byte, 65)
Expand Down

0 comments on commit df95529

Please sign in to comment.