Skip to content

Allow nitro nodes to forward requests based on block number to archive nodes #451

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 28 additions & 12 deletions arbitrum/apibackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ type APIBackend struct {

dbForAPICalls ethdb.Database

fallbackClient types.FallbackClient
sync SyncProgressBackend
fallbackClient types.FallbackClient
archiveClientsManager *archiveFallbackClientsManager
sync SyncProgressBackend
}

type errorFilteredFallbackClient struct {
Expand Down Expand Up @@ -79,11 +80,11 @@ func (c *timeoutFallbackClient) CallContext(ctxIn context.Context, result interf
return c.impl.CallContext(ctx, result, method, args...)
}

func CreateFallbackClient(fallbackClientUrl string, fallbackClientTimeout time.Duration) (types.FallbackClient, error) {
func CreateFallbackClient(fallbackClientUrl string, fallbackClientTimeout time.Duration, isArchiveNode bool) (types.FallbackClient, error) {
if fallbackClientUrl == "" {
return nil, nil
}
if strings.HasPrefix(fallbackClientUrl, "error:") {
if !isArchiveNode && strings.HasPrefix(fallbackClientUrl, "error:") {
fields := strings.Split(fallbackClientUrl, ":")[1:]
errNumber, convErr := strconv.ParseInt(fields[0], 0, 0)
if convErr == nil {
Expand Down Expand Up @@ -124,8 +125,8 @@ type SyncProgressBackend interface {
BlockMetadataByNumber(ctx context.Context, blockNum uint64) (common.BlockMetadata, error)
}

func createRegisterAPIBackend(backend *Backend, filterConfig filters.Config, fallbackClientUrl string, fallbackClientTimeout time.Duration) (*filters.FilterSystem, error) {
fallbackClient, err := CreateFallbackClient(fallbackClientUrl, fallbackClientTimeout)
func createRegisterAPIBackend(backend *Backend, filterConfig filters.Config, fallbackClientUrl string, fallbackClientTimeout time.Duration, archiveRedirects []BlockRedirectConfig) (*filters.FilterSystem, error) {
fallbackClient, err := CreateFallbackClient(fallbackClientUrl, fallbackClientTimeout, false)
if err != nil {
return nil, err
}
Expand All @@ -135,10 +136,18 @@ func createRegisterAPIBackend(backend *Backend, filterConfig filters.Config, fal
if tag != 0 || len(backend.chainDb.WasmTargets()) > 1 {
dbForAPICalls = rawdb.WrapDatabaseWithWasm(backend.chainDb, wasmStore, 0, []ethdb.WasmTarget{rawdb.LocalTarget()})
}
var archiveClientsManager *archiveFallbackClientsManager
if len(archiveRedirects) != 0 {
archiveClientsManager, err = newArchiveFallbackClientsManager(archiveRedirects)
if err != nil {
return nil, err
}
}
backend.apiBackend = &APIBackend{
b: backend,
dbForAPICalls: dbForAPICalls,
fallbackClient: fallbackClient,
b: backend,
dbForAPICalls: dbForAPICalls,
fallbackClient: fallbackClient,
archiveClientsManager: archiveClientsManager,
}
filterSystem := filters.NewFilterSystem(backend.apiBackend, filterConfig)
backend.stack.RegisterAPIs(backend.apiBackend.GetAPIs(filterSystem))
Expand Down Expand Up @@ -500,7 +509,7 @@ func (a *APIBackend) BlockMetadataByNumber(ctx context.Context, blockNum uint64)
return a.sync.BlockMetadataByNumber(ctx, blockNum)
}

func StateAndHeaderFromHeader(ctx context.Context, chainDb ethdb.Database, bc *core.BlockChain, maxRecreateStateDepth int64, header *types.Header, err error) (*state.StateDB, *types.Header, error) {
func StateAndHeaderFromHeader(ctx context.Context, chainDb ethdb.Database, bc *core.BlockChain, maxRecreateStateDepth int64, header *types.Header, err error, archiveClientsManager *archiveFallbackClientsManager) (*state.StateDB, *types.Header, error) {
if err != nil {
return nil, header, err
}
Expand All @@ -510,6 +519,9 @@ func StateAndHeaderFromHeader(ctx context.Context, chainDb ethdb.Database, bc *c
if !bc.Config().IsArbitrumNitro(header.Number) {
return nil, header, types.ErrUseFallback
}
if archiveClientsManager != nil && header.Number.Uint64() <= archiveClientsManager.lastAvailableBlock() {
return nil, header, &types.ErrUseArchiveFallback{BlockNum: header.Number.Uint64()}
}
stateFor := func(db state.Database, snapshots *snapshot.Tree) func(header *types.Header) (*state.StateDB, StateReleaseFunc, error) {
return func(header *types.Header) (*state.StateDB, StateReleaseFunc, error) {
if header.Root != (common.Hash{}) {
Expand Down Expand Up @@ -583,7 +595,7 @@ func StateAndHeaderFromHeader(ctx context.Context, chainDb ethdb.Database, bc *c

func (a *APIBackend) StateAndHeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*state.StateDB, *types.Header, error) {
header, err := a.HeaderByNumber(ctx, number)
return StateAndHeaderFromHeader(ctx, a.ChainDb(), a.b.arb.BlockChain(), a.b.config.MaxRecreateStateDepth, header, err)
return StateAndHeaderFromHeader(ctx, a.ChainDb(), a.b.arb.BlockChain(), a.b.config.MaxRecreateStateDepth, header, err, a.archiveClientsManager)
}

func (a *APIBackend) StateAndHeaderByNumberOrHash(ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) (*state.StateDB, *types.Header, error) {
Expand All @@ -594,7 +606,7 @@ func (a *APIBackend) StateAndHeaderByNumberOrHash(ctx context.Context, blockNrOr
if ishash && header != nil && header.Number.Cmp(bc.CurrentBlock().Number) > 0 && bc.GetCanonicalHash(header.Number.Uint64()) != hash {
return nil, nil, errors.New("requested block ahead of current block and the hash is not currently canonical")
}
return StateAndHeaderFromHeader(ctx, a.ChainDb(), a.b.arb.BlockChain(), a.b.config.MaxRecreateStateDepth, header, err)
return StateAndHeaderFromHeader(ctx, a.ChainDb(), a.b.arb.BlockChain(), a.b.config.MaxRecreateStateDepth, header, err, a.archiveClientsManager)
}

func (a *APIBackend) StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, checkLive bool, preferDisk bool) (statedb *state.StateDB, release tracers.StateReleaseFunc, err error) {
Expand Down Expand Up @@ -730,3 +742,7 @@ func (a *APIBackend) Pending() (*types.Block, types.Receipts, *state.StateDB) {
func (a *APIBackend) FallbackClient() types.FallbackClient {
return a.fallbackClient
}

func (a *APIBackend) ArchiveFallbackClient(blockNum uint64) types.FallbackClient {
return a.archiveClientsManager.fallbackClient(blockNum)
}
53 changes: 53 additions & 0 deletions arbitrum/archivefallbackclients.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package arbitrum

import (
"sort"

"github.com/ethereum/go-ethereum/core/types"
)

type lastBlockAndClient struct {
lastBlock uint64
client types.FallbackClient
}

type archiveFallbackClientsManager struct {
lastBlockAndClients []*lastBlockAndClient
}

func newArchiveFallbackClientsManager(archiveRedirects []BlockRedirectConfig) (*archiveFallbackClientsManager, error) {
manager := &archiveFallbackClientsManager{}
for _, archiveConfig := range archiveRedirects {
fallbackClient, err := CreateFallbackClient(archiveConfig.URL, archiveConfig.Timeout, true)
if err != nil {
return nil, err
}
if fallbackClient == nil {
continue
}
manager.lastBlockAndClients = append(manager.lastBlockAndClients, &lastBlockAndClient{
lastBlock: archiveConfig.LastBlock,
client: fallbackClient,
})
}
if len(manager.lastBlockAndClients) == 0 {
return nil, nil
}
sort.Slice(manager.lastBlockAndClients, func(i, j int) bool {
return manager.lastBlockAndClients[i].lastBlock < manager.lastBlockAndClients[j].lastBlock
})
return manager, nil
}

func (a *archiveFallbackClientsManager) lastAvailableBlock() uint64 {
return a.lastBlockAndClients[len(a.lastBlockAndClients)-1].lastBlock
}

func (a *archiveFallbackClientsManager) fallbackClient(blockNum uint64) types.FallbackClient {
for _, lastBlockAndClient := range a.lastBlockAndClients {
if blockNum <= lastBlockAndClient.lastBlock {
return lastBlockAndClient.client
}
}
Comment on lines +47 to +51
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we have it randomly select within clients that have the same last block? Otherwise it'll always send to the same one.

return nil
}
2 changes: 1 addition & 1 deletion arbitrum/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func NewBackend(stack *node.Node, config *Config, chainDb ethdb.Database, publis
}

backend.bloomIndexer.Start(backend.arb.BlockChain())
filterSystem, err := createRegisterAPIBackend(backend, filterConfig, config.ClassicRedirect, config.ClassicRedirectTimeout)
filterSystem, err := createRegisterAPIBackend(backend, filterConfig, config.ClassicRedirect, config.ClassicRedirectTimeout, config.BlockRedirects)
if err != nil {
return nil, nil, err
}
Expand Down
25 changes: 25 additions & 0 deletions arbitrum/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package arbitrum

import (
"encoding/json"
"fmt"
"time"

"github.com/ethereum/go-ethereum/eth/ethconfig"
Expand Down Expand Up @@ -39,6 +41,27 @@ type Config struct {
MaxRecreateStateDepth int64 `koanf:"max-recreate-state-depth"`

AllowMethod []string `koanf:"allow-method"`

BlockRedirects []BlockRedirectConfig `koanf:"block-redirects"`
BlockRedirectsList string `koanf:"block-redirects-list"`
}

type BlockRedirectConfig struct {
URL string `koanf:"url"`
Timeout time.Duration `koanf:"timeout"`
LastBlock uint64 `koanf:"last-block"`
}

func (c *Config) Validate() error {
// BlockRedirectsList command line option overrides directly supplied BlockRedirects array of BlockRedirectConfig in the conf file
if c.BlockRedirectsList != "default" {
var blockRedirects []BlockRedirectConfig
if err := json.Unmarshal([]byte(c.BlockRedirectsList), &blockRedirects); err != nil {
return fmt.Errorf("failed to parse rpc block-redirects-list string: %w", err)
}
c.BlockRedirects = blockRedirects
}
return nil
}

type ArbDebugConfig struct {
Expand All @@ -63,6 +86,7 @@ func ConfigAddOptions(prefix string, f *flag.FlagSet) {
arbDebug := DefaultConfig.ArbDebug
f.Uint64(prefix+".arbdebug.block-range-bound", arbDebug.BlockRangeBound, "bounds the number of blocks arbdebug calls may return")
f.Uint64(prefix+".arbdebug.timeout-queue-bound", arbDebug.TimeoutQueueBound, "bounds the length of timeout queues arbdebug calls may return")
f.String(prefix+".block-redirects-list", DefaultConfig.BlockRedirectsList, "array of node configs to redirect block requests given as a json string. time duration should be supplied in number indicating nanoseconds")
}

const (
Expand All @@ -89,4 +113,5 @@ var DefaultConfig = Config{
BlockRangeBound: 256,
TimeoutQueueBound: 512,
},
BlockRedirectsList: "default",
}
9 changes: 9 additions & 0 deletions core/types/arb_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/binary"
"fmt"
"math/big"

"github.com/ethereum/go-ethereum/common/hexutil"
Expand Down Expand Up @@ -41,6 +42,14 @@ func (f fallbackError) Error() string { return fallbackErrorMsg }

var ErrUseFallback = fallbackError{}

type ErrUseArchiveFallback struct {
BlockNum uint64
}

func (e *ErrUseArchiveFallback) Error() string {
return fmt.Sprintf("use archive fallback client for block %d", e.BlockNum)
}

type FallbackClient interface {
CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error
}
Expand Down
4 changes: 4 additions & 0 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,3 +441,7 @@ func (b *EthAPIBackend) StateAtTransaction(ctx context.Context, block *types.Blo
func (b *EthAPIBackend) FallbackClient() types.FallbackClient {
return nil
}

func (b *EthAPIBackend) ArchiveFallbackClient(_ uint64) types.FallbackClient {
return nil
}
10 changes: 7 additions & 3 deletions internal/ethapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,14 @@ var (
)

func fallbackClientFor(b Backend, err error) types.FallbackClient {
if !errors.Is(err, types.ErrUseFallback) {
return nil
if errors.Is(err, types.ErrUseFallback) {
return b.FallbackClient()
}
var archiveErr *types.ErrUseArchiveFallback
if errors.As(err, &archiveErr) {
return b.ArchiveFallbackClient(archiveErr.BlockNum)
}
return b.FallbackClient()
return nil
}

// EthereumAPI provides an API to access Ethereum related information.
Expand Down
4 changes: 4 additions & 0 deletions internal/ethapi/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,10 @@ func (b testBackend) FallbackClient() types.FallbackClient {
return nil
}

func (b testBackend) ArchiveFallbackClient(_ uint64) types.FallbackClient {
return nil
}

func (b testBackend) SyncProgressMap(ctx context.Context) map[string]interface{} {
return map[string]interface{}{}
}
Expand Down
1 change: 1 addition & 0 deletions internal/ethapi/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
// both full and light clients) with access to necessary functions.
type Backend interface {
FallbackClient() types.FallbackClient
ArchiveFallbackClient(blockNum uint64) types.FallbackClient

// General Ethereum API
SyncProgress() ethereum.SyncProgress
Expand Down
4 changes: 4 additions & 0 deletions internal/ethapi/transaction_args_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,10 @@ func (b *backendMock) FallbackClient() types.FallbackClient {
return nil
}

func (b *backendMock) ArchiveFallbackClient(_ uint64) types.FallbackClient {
return nil
}

func (b *backendMock) SyncProgressMap(ctx context.Context) map[string]interface{} {
return nil
}
Loading