Skip to content

Commit

Permalink
feat: introduce filter max block range and make unindexedLogs to do b…
Browse files Browse the repository at this point in the history
…atch processing (#149)

* introduce filter max block range and make unindexedLogs to do batch works

* update range check
  • Loading branch information
beer-1 committed Jan 28, 2025
1 parent 9d2df76 commit 8fb5b44
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 12 deletions.
7 changes: 7 additions & 0 deletions jsonrpc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ func NewJSONRPCBackend(
if cfg.LogCacheSize == 0 {
cfg.LogCacheSize = config.DefaultLogCacheSize
}
if cfg.FilterMaxBlockRange == 0 {
cfg.FilterMaxBlockRange = config.DefaultFilterMaxBlockRange
}

queuedTxHashes := new(sync.Map)
queuedTxs, err := lrucache.NewWithEvict(cfg.QueuedTransactionCap, func(_ string, txCache txQueueItem) {
Expand Down Expand Up @@ -227,3 +230,7 @@ func (b *JSONRPCBackend) releaseAccMut(senderHex string, accMut *AccMut) {
func (b *JSONRPCBackend) FilterTimeout() time.Duration {
return b.cfg.FilterTimeout
}

func (b *JSONRPCBackend) FilterMaxBlockRange() int {
return b.cfg.FilterMaxBlockRange
}
13 changes: 12 additions & 1 deletion jsonrpc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ const (
DefaultFeeHistoryMaxBlocks = 1024
// DefaultFilterTimeout is the default filter timeout, how long filters stay active.
DefaultFilterTimeout = 5 * time.Minute
// DefaultFilterMaxBlockRange is the default maximum number of blocks that can be queried in a filter.
DefaultFilterMaxBlockRange = 500
// DefaultLogCacheSize is the maximum number of cached blocks.
DefaultLogCacheSize = 32
)
Expand Down Expand Up @@ -67,6 +69,7 @@ const (
flagJSONRPCFeeHistoryMaxBlocks = "json-rpc.fee-history-max-blocks"
flagJSONRPCFilterTimeout = "json-rpc.filter-timeout"
flagJSONRPCLogCacheSize = "json-rpc.log-cache-size"
flagJSONRPCFilterMaxBlockRange = "json-rpc.filter-max-block-range"
)

// JSONRPCConfig defines configuration for the EVM RPC server.
Expand Down Expand Up @@ -102,6 +105,8 @@ type JSONRPCConfig struct {
FeeHistoryMaxBlocks int `mapstructure:"fee-history-max-blocks"`
// FilterTimeout is a duration how long filters stay active (default: 5min)
FilterTimeout time.Duration `mapstructure:"filter-timeout"`
// FilterMaxBlockRange is the maximum number of blocks that can be queried in a filter.
FilterMaxBlockRange int `mapstructure:"filter-max-block-range"`
// LogCacheSize is the maximum number of cached blocks.
LogCacheSize int `mapstructure:"log-cache-size"`
}
Expand Down Expand Up @@ -129,7 +134,8 @@ func DefaultJSONRPCConfig() JSONRPCConfig {
FeeHistoryMaxHeaders: DefaultFeeHistoryMaxHeaders,
FeeHistoryMaxBlocks: DefaultFeeHistoryMaxBlocks,

FilterTimeout: DefaultFilterTimeout,
FilterTimeout: DefaultFilterTimeout,
FilterMaxBlockRange: DefaultFilterMaxBlockRange,

LogCacheSize: DefaultLogCacheSize,
}
Expand All @@ -152,6 +158,7 @@ func AddConfigFlags(startCmd *cobra.Command) {
startCmd.Flags().Int(flagJSONRPCFeeHistoryMaxHeaders, DefaultFeeHistoryMaxHeaders, "Maximum number of headers used to lookup the fee history")
startCmd.Flags().Int(flagJSONRPCFeeHistoryMaxBlocks, DefaultFeeHistoryMaxBlocks, "Maximum number of blocks used to lookup the fee history")
startCmd.Flags().Duration(flagJSONRPCFilterTimeout, DefaultFilterTimeout, "Duration how long filters stay active")
startCmd.Flags().Int(flagJSONRPCFilterMaxBlockRange, DefaultFilterMaxBlockRange, "Maximum number of blocks that can be queried in a filter")
startCmd.Flags().Int(flagJSONRPCLogCacheSize, DefaultLogCacheSize, "Maximum number of cached blocks for the log filter")
}

Expand All @@ -173,6 +180,7 @@ func GetConfig(appOpts servertypes.AppOptions) JSONRPCConfig {
FeeHistoryMaxHeaders: cast.ToInt(appOpts.Get(flagJSONRPCFeeHistoryMaxHeaders)),
FeeHistoryMaxBlocks: cast.ToInt(appOpts.Get(flagJSONRPCFeeHistoryMaxBlocks)),
FilterTimeout: cast.ToDuration(appOpts.Get(flagJSONRPCFilterTimeout)),
FilterMaxBlockRange: cast.ToInt(appOpts.Get(flagJSONRPCFilterMaxBlockRange)),
LogCacheSize: cast.ToInt(appOpts.Get(flagJSONRPCLogCacheSize)),
}
}
Expand Down Expand Up @@ -233,6 +241,9 @@ fee-history-max-blocks = {{ .JSONRPCConfig.FeeHistoryMaxBlocks }}
# FilterTimeout is a duration how long filters stay active (default: 5min)
filter-timeout = "{{ .JSONRPCConfig.FilterTimeout }}"
# FilterMaxBlockRange is the maximum number of blocks that can be queried in a filter.
filter-max-block-range = {{ .JSONRPCConfig.FilterMaxBlockRange }}
# LogCacheSize is the maximum number of cached blocks for the log filter.
log-cache-size = {{ .JSONRPCConfig.LogCacheSize }}
`
7 changes: 7 additions & 0 deletions jsonrpc/namespaces/eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package filters
import (
"context"
"errors"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -374,6 +375,9 @@ func (api *FilterAPI) GetLogs(ctx context.Context, crit ethfilters.FilterCriteri
if begin > 0 && end > 0 && begin > end {
return nil, errInvalidBlockRange
}
if maxRange := api.backend.FilterMaxBlockRange(); end-begin+1 > int64(maxRange) {
return nil, fmt.Errorf("block range greater than %d", maxRange)
}
// Construct the range filter
filter = newRangeFilter(api.logger, api.backend, begin, end, crit.Addresses, crit.Topics)
}
Expand Down Expand Up @@ -426,6 +430,9 @@ func (api *FilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*coretype
if f.crit.ToBlock != nil {
end = f.crit.ToBlock.Int64()
}
if maxRange := api.backend.FilterMaxBlockRange(); end-begin+1 > int64(maxRange) {
return nil, fmt.Errorf("block range greater than %d", maxRange)
}
// Construct the range filter
bloomFilter = newRangeFilter(api.logger, api.backend, begin, end, f.crit.Addresses, f.crit.Topics)
}
Expand Down
79 changes: 68 additions & 11 deletions jsonrpc/namespaces/eth/filters/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"slices"

"cosmossdk.io/log"
"golang.org/x/sync/errgroup"

"github.com/ethereum/go-ethereum/common"
coretypes "github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -147,7 +148,7 @@ func (f *Filter) rangeLogsAsync(ctx context.Context) (chan *coretypes.Log, chan
}()

// Gather all non indexed ones
if err := f.unindexedLogs(ctx, uint64(f.end), logChan); err != nil {
if err := f.unindexedLogs(ctx, logChan); err != nil {
errChan <- err
return
}
Expand All @@ -160,27 +161,83 @@ func (f *Filter) rangeLogsAsync(ctx context.Context) (chan *coretypes.Log, chan

// unindexedLogs returns the logs matching the filter criteria based on raw block
// iteration and bloom matching.
func (f *Filter) unindexedLogs(ctx context.Context, end uint64, logChan chan *coretypes.Log) error {
for ; f.begin <= int64(end); f.begin++ {
header, err := f.backend.GetHeaderByNumber(rpc.BlockNumber(f.begin))
if header == nil || err != nil {
return err
}
found, err := f.blockLogs(header)
if err != nil {
return err
func (f *Filter) unindexedLogs(ctx context.Context, logChan chan *coretypes.Log) error {
const batchSize = 100

g, innerCtx := errgroup.WithContext(ctx)
diff := f.end - f.begin + 1
batchNum := diff / batchSize
if diff%batchSize != 0 {
batchNum++
}

logsArray := make([][]*coretypes.Log, batchNum)
for i := int64(0); i < batchNum; i++ {

// make local copy of i for goroutine
idx := i
begin := f.begin + i*batchSize
end := begin + batchSize - 1
if end > f.end {
end = f.end
}
for _, log := range found {

// fetch logs in parallel
g.Go(func() error {
logs, err := f.searchLogs(innerCtx, begin, end)
if err != nil {
return err
}

logsArray[idx] = logs
return nil
})
}

// wait for all goroutines to finish
err := g.Wait()
if err != nil {
return err
}

// send logs to channel in order
for _, logs := range logsArray {
for _, log := range logs {
select {
case logChan <- log:
case <-ctx.Done():
return ctx.Err()
}
}
}

return nil
}

func (f *Filter) searchLogs(ctx context.Context, begin, end int64) ([]*coretypes.Log, error) {
logs := make([]*coretypes.Log, 0)
for ; begin <= int64(end); begin++ {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

header, err := f.backend.GetHeaderByNumber(rpc.BlockNumber(begin))
if header == nil || err != nil {
return nil, err
}
found, err := f.blockLogs(header)
if err != nil {
return nil, err
}

logs = append(logs, found...)
}

return logs, nil
}

// blockLogs returns the logs matching the filter criteria within a single block.
func (f *Filter) blockLogs(header *coretypes.Header) ([]*coretypes.Log, error) {
if bloomFilter(header.Bloom, f.addresses, f.topics) {
Expand Down

0 comments on commit 8fb5b44

Please sign in to comment.