Skip to content

Commit

Permalink
feat: implement bloom indexer for eth_getLogs (#150)
Browse files Browse the repository at this point in the history
* implement get logs bloom indexer

* fix lint

* fix bloom handler

* increase unindexedLogs batch size

* increase filter max block range

* remove unused hash in the bloom bits and return empty bitsets if bloom bits are pruned

* add test
  • Loading branch information
beer-1 authored Jan 31, 2025
1 parent 137aa1f commit 703e4d7
Show file tree
Hide file tree
Showing 16 changed files with 594 additions and 15 deletions.
6 changes: 5 additions & 1 deletion cmd/minitiad/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,9 +304,13 @@ func (a *appCreator) AppCreator() servertypes.AppCreator {
panic(err)
}

evmConfig := evmconfig.GetConfig(appOpts)
if err := evmConfig.Validate(); err != nil {
panic(err)
}
app := minitiaapp.NewMinitiaApp(
logger, db, indexerDB, kvindexerDB, traceStore, true,
evmconfig.GetConfig(appOpts),
evmConfig,
appOpts,
baseappOptions...,
)
Expand Down
3 changes: 3 additions & 0 deletions indexer/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,5 +215,8 @@ func (e *EVMIndexerImpl) ListenFinalizeBlock(ctx context.Context, req abci.Reque
e.doPrune(ctx, uint64(blockHeight))
}

// trigger bloom indexing
e.doBloomIndexing(ctx, uint64(blockHeight))

return nil
}
107 changes: 107 additions & 0 deletions indexer/bloom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package indexer

import (
"context"

"cosmossdk.io/collections"
"github.com/ethereum/go-ethereum/core/bloombits"
coretypes "github.com/ethereum/go-ethereum/core/types"

evmconfig "github.com/initia-labs/minievm/x/evm/config"
)

// doBloomIndexing triggers bloom indexing in a goroutine. If bloom indexing is already running,
// it does nothing.
func (e *EVMIndexerImpl) doBloomIndexing(ctx context.Context, height uint64) {
if running := e.bloomIndexingRunning.Swap(true); running {
return
}

go func(ctx context.Context, height uint64) {
defer e.bloomIndexingRunning.Store(false)
if err := e.bloomIndexing(ctx, height); err != nil {
e.logger.Error("failed to do bloom indexing", "err", err)
}

e.logger.Debug("bloom indexing finished", "height", height)
}(ctx, height)
}

// bloomIndexing generates the bloom index if the current section is complete.
func (e *EVMIndexerImpl) bloomIndexing(ctx context.Context, height uint64) error {
section, err := e.PeekBloomBitsNextSection(ctx)
if err != nil {
return err
}
if (height / evmconfig.SectionSize) <= section {
return nil
}

e.logger.Info("Processing new bloom indexing section", "section", section)

gen, err := bloombits.NewGenerator(uint(evmconfig.SectionSize))
if err != nil {
return err
}

for i := uint64(0); i < evmconfig.SectionSize; i++ {
header, err := e.BlockHeaderByNumber(ctx, section*evmconfig.SectionSize+i)
if err != nil {
return err
}

if err := gen.AddBloom(uint(header.Number.Uint64()-section*evmconfig.SectionSize), header.Bloom); err != nil {
return err
}
}

// write the bloom bits to the store
for i := 0; i < coretypes.BloomBitLength; i++ {
bits, err := gen.Bitset(uint(i))
if err != nil {
return err
}

if err := e.RecordBloomBits(ctx, section, uint32(i), bits); err != nil {
return err
}
}

// increment the section number; if this fails, the section will be reprocessed
if err := e.NextBloomBitsSection(ctx); err != nil {
return err
}

return nil
}

// ReadBloomBits reads the bloom bits for the given index, section and hash.
func (e *EVMIndexerImpl) ReadBloomBits(ctx context.Context, section uint64, index uint32) ([]byte, error) {
bloomBits, err := e.BloomBits.Get(ctx, collections.Join(section, index))
if err != nil {
return nil, err
}

return bloomBits, nil
}

// RecordBloomBits records the bloom bits for the given index, section and hash.
func (e *EVMIndexerImpl) RecordBloomBits(ctx context.Context, section uint64, index uint32, bloomBits []byte) error {
return e.BloomBits.Set(ctx, collections.Join(section, index), bloomBits)
}

// NextBloomBitsSection increments the section number.
func (e *EVMIndexerImpl) NextBloomBitsSection(ctx context.Context) error {
_, err := e.BloomBitsNextSection.Next(ctx)
return err
}

// PeekBloomBitsNextSection returns the next section number to be processed.
func (e *EVMIndexerImpl) PeekBloomBitsNextSection(ctx context.Context) (uint64, error) {
return e.BloomBitsNextSection.Peek(ctx)
}

// Check if bloom indexing is running
func (e *EVMIndexerImpl) IsBloomIndexingRunning() bool {
return e.bloomIndexingRunning.Load()
}
66 changes: 66 additions & 0 deletions indexer/bloom_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package indexer_test

import (
"math/big"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
coretypes "github.com/ethereum/go-ethereum/core/types"
evmindexer "github.com/initia-labs/minievm/indexer"

"github.com/initia-labs/minievm/tests"
evmconfig "github.com/initia-labs/minievm/x/evm/config"
evmtypes "github.com/initia-labs/minievm/x/evm/types"

"github.com/stretchr/testify/require"
)

func Test_BloomIndexing(t *testing.T) {
app, addrs, privKeys := tests.CreateApp(t)
indexer := app.EVMIndexer().(*evmindexer.EVMIndexerImpl)
defer app.Close()

tx, _ := tests.GenerateCreateERC20Tx(t, app, privKeys[0])
_, finalizeRes := tests.ExecuteTxs(t, app, tx)
tests.CheckTxResult(t, finalizeRes.TxResults[0], true)

events := finalizeRes.TxResults[0].Events
createEvent := events[len(events)-3]
require.Equal(t, evmtypes.EventTypeContractCreated, createEvent.GetType())

contractAddr, err := hexutil.Decode(createEvent.Attributes[0].Value)
require.NoError(t, err)

// mint 1_000_000 tokens to the first address
tx, _ = tests.GenerateMintERC20Tx(t, app, privKeys[0], common.BytesToAddress(contractAddr), addrs[0], new(big.Int).SetUint64(1_000_000_000_000))
_, finalizeRes = tests.ExecuteTxs(t, app, tx)
tests.CheckTxResult(t, finalizeRes.TxResults[0], true)

for i := uint64(0); i < evmconfig.SectionSize; i++ {
tests.IncreaseBlockHeight(t, app)
}

// wait for bloom indexing
for {
if indexer.IsBloomIndexingRunning() {
time.Sleep(100 * time.Millisecond)
} else {
break
}
}

ctx, err := app.CreateQueryContext(0, false)
require.NoError(t, err)

for i := uint32(0); i < coretypes.BloomBitLength; i++ {
bloomBits, err := indexer.ReadBloomBits(ctx, 0, i)
require.NoError(t, err)
require.NotNil(t, bloomBits)
}

nextSection, err := indexer.PeekBloomBitsNextSection(ctx)
require.NoError(t, err)
require.Equal(t, uint64(1), nextSection)
}
27 changes: 21 additions & 6 deletions indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,22 @@ type EVMIndexer interface {
MempoolWrapper(mempool mempool.Mempool) mempool.Mempool
TxInMempool(hash common.Hash) *rpctypes.RPCTransaction

// bloom
ReadBloomBits(ctx context.Context, section uint64, index uint32) ([]byte, error)
PeekBloomBitsNextSection(ctx context.Context) (uint64, error)
IsBloomIndexingRunning() bool

// Stop
Stop()
}

// EVMIndexerImpl implements EVMIndexer.
type EVMIndexerImpl struct {
enabled bool
retainHeight uint64
pruningRunning *atomic.Bool
enabled bool
retainHeight uint64

pruningRunning *atomic.Bool
bloomIndexingRunning *atomic.Bool

db dbm.DB
logger log.Logger
Expand All @@ -85,6 +92,10 @@ type EVMIndexerImpl struct {
TxHashToCosmosTxHash collections.Map[[]byte, []byte]
CosmosTxHashToTxHash collections.Map[[]byte, []byte]

// bloom
BloomBits collections.Map[collections.Pair[uint64, uint32], []byte]
BloomBitsNextSection collections.Sequence

blockChans []chan *coretypes.Header
logsChans []chan []*coretypes.Log
pendingChans []chan *rpctypes.RPCTransaction
Expand Down Expand Up @@ -119,9 +130,11 @@ func NewEVMIndexer(

logger.Info("EVM Indexer", "enable", !cfg.IndexerDisable)
indexer := &EVMIndexerImpl{
enabled: !cfg.IndexerDisable,
retainHeight: cfg.IndexerRetainHeight,
pruningRunning: &atomic.Bool{},
enabled: !cfg.IndexerDisable,
retainHeight: cfg.IndexerRetainHeight,

pruningRunning: &atomic.Bool{},
bloomIndexingRunning: &atomic.Bool{},

db: db,
store: store,
Expand All @@ -138,6 +151,8 @@ func NewEVMIndexer(
BlockHashToNumberMap: collections.NewMap(sb, prefixBlockHashToNumber, "block_hash_to_number", collections.BytesKey, collections.Uint64Value),
TxHashToCosmosTxHash: collections.NewMap(sb, prefixTxHashToCosmosTxHash, "tx_hash_to_cosmos_tx_hash", collections.BytesKey, collections.BytesValue),
CosmosTxHashToTxHash: collections.NewMap(sb, prefixCosmosTxHashToTxHash, "cosmos_tx_hash_to_tx_hash", collections.BytesKey, collections.BytesValue),
BloomBits: collections.NewMap(sb, prefixBloomBits, "bloom_bits", collections.PairKeyCodec(collections.Uint64Key, collections.Uint32Key), collections.BytesValue),
BloomBitsNextSection: collections.NewSequence(sb, prefixBloomBitsNextSection, "bloom_bits_next_section"),

blockChans: nil,
logsChans: nil,
Expand Down
4 changes: 4 additions & 0 deletions indexer/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,8 @@ var (
// cosmos indexes
prefixTxHashToCosmosTxHash = collections.Prefix([]byte{0, 0, 3, 1})
prefixCosmosTxHashToTxHash = collections.Prefix([]byte{0, 0, 3, 2})

// bloom filter indexes
prefixBloomBits = collections.Prefix([]byte{0, 0, 5, 1})
prefixBloomBitsNextSection = collections.Prefix([]byte{0, 0, 5, 2})
)
10 changes: 10 additions & 0 deletions indexer/prune.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"cosmossdk.io/collections"
storetypes "cosmossdk.io/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
evmconfig "github.com/initia-labs/minievm/x/evm/config"
"golang.org/x/sync/errgroup"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -51,6 +52,9 @@ func (e *EVMIndexerImpl) prune(ctx context.Context, curHeight uint64) error {
g.Go(func() error {
return e.pruneTxs(ctx, minHeight)
})
g.Go(func() error {
return e.pruneBloomBits(ctx, minHeight)
})

if err := g.Wait(); err != nil {
return err
Expand Down Expand Up @@ -134,6 +138,12 @@ func (e *EVMIndexerImpl) pruneTxs(ctx context.Context, minHeight uint64) error {
return nil
}

// pruneBloomBits removes old bloom bits from the indexer.
func (e *EVMIndexerImpl) pruneBloomBits(ctx context.Context, minHeight uint64) error {
section := minHeight/evmconfig.SectionSize - 1
return e.BloomBits.Clear(ctx, collections.NewPrefixedPairRange[uint64, uint32](section))
}

//////////////////////// TESTING INTERFACE ////////////////////////

// Set custom retain height
Expand Down
45 changes: 45 additions & 0 deletions indexer/prune_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

evmindexer "github.com/initia-labs/minievm/indexer"
"github.com/initia-labs/minievm/tests"
evmconfig "github.com/initia-labs/minievm/x/evm/config"
evmtypes "github.com/initia-labs/minievm/x/evm/types"
)

Expand Down Expand Up @@ -107,3 +108,47 @@ func Test_PruneIndexer(t *testing.T) {
_, err = indexer.CosmosTxHashByTxHash(ctx, evmTxHash)
require.ErrorIs(t, err, collections.ErrNotFound)
}

func Test_PruneIndexer_BloomBits(t *testing.T) {
app, _, _ := tests.CreateApp(t)
indexer := app.EVMIndexer().(*evmindexer.EVMIndexerImpl)
defer app.Close()

// set retain height to 1, only last block is indexed
indexer.SetRetainHeight(1)

for i := uint64(0); i < evmconfig.SectionSize; i++ {
tests.IncreaseBlockHeight(t, app)
}

// wait for bloom indexing
for {
if indexer.IsBloomIndexingRunning() {
time.Sleep(100 * time.Millisecond)
} else {
break
}
}

// wait for pruning
for {
if indexer.IsPruningRunning() {
time.Sleep(100 * time.Millisecond)
} else {
break
}
}

// clear cache
indexer.ClearCache()

// check the bloom bits are pruned
ctx, err := app.CreateQueryContext(0, false)
require.NoError(t, err)

err = indexer.BloomBits.Walk(ctx, nil, func(key collections.Pair[uint64, uint32], value []byte) (bool, error) {
require.Fail(t, "bloom bits should be pruned")
return true, nil
})
require.NoError(t, err)
}
9 changes: 9 additions & 0 deletions indexer/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package indexer

import (
"context"
"math/big"

"cosmossdk.io/collections"
"github.com/ethereum/go-ethereum/common"
Expand All @@ -11,6 +12,14 @@ import (

// BlockHeaderByNumber implements EVMIndexer.
func (e *EVMIndexerImpl) BlockHeaderByNumber(ctx context.Context, blockNumber uint64) (*coretypes.Header, error) {
if blockNumber == 0 {
// this is a special case for genesis block
return &coretypes.Header{
Number: big.NewInt(0),
Bloom: coretypes.Bloom{},
}, nil
}

blockHeader, err := e.BlockHeaderMap.Get(ctx, blockNumber)
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit 703e4d7

Please sign in to comment.