Skip to content

Commit

Permalink
Add payload by root handler
Browse files Browse the repository at this point in the history
  • Loading branch information
terencechain committed Feb 13, 2025
1 parent 147af2a commit aa1b263
Show file tree
Hide file tree
Showing 14 changed files with 222 additions and 110 deletions.
1 change: 1 addition & 0 deletions beacon-chain/execution/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
"block_reader.go",
"deposit.go",
"engine_client.go",
"engine_client_epbs.go",
"errors.go",
"log.go",
"log_processing.go",
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/execution/engine_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ type Reconstructor interface {
ctx context.Context, blindedBlocks []interfaces.ReadOnlySignedBeaconBlock,
) ([]interfaces.SignedBeaconBlock, error)
ReconstructBlobSidecars(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, indices []bool) ([]blocks.VerifiedROBlob, error)
Client() RPCClient
ReconstructPayloadEnvelope(ctx context.Context, e *pb.SignedBlindPayloadEnvelope) (*pb.SignedExecutionPayloadEnvelope, error)
}

// EngineCaller defines a client that can interact with an Ethereum
Expand Down
54 changes: 54 additions & 0 deletions beacon-chain/execution/engine_client_epbs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package execution

import (
"context"

"github.com/ethereum/go-ethereum/common"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
pb "github.com/prysmaticlabs/prysm/v5/proto/engine/v1"
)

func (s *Service) ReconstructPayloadEnvelope(ctx context.Context, e *pb.SignedBlindPayloadEnvelope) (*pb.SignedExecutionPayloadEnvelope, error) {
b, err := s.ExecutionBlockByHash(ctx, common.Hash(e.Message.BlockHash), true)
if err != nil {
return nil, err
}
txs := make([][]byte, len(b.Transactions))
for i, t := range b.Transactions {
txs[i], err = t.MarshalBinary()
if err != nil {
return nil, err
}
}
return &pb.SignedExecutionPayloadEnvelope{
Message: &pb.ExecutionPayloadEnvelope{
Payload: &pb.ExecutionPayloadDeneb{
ParentHash: b.ParentHash.Bytes(),
FeeRecipient: b.Coinbase.Bytes(),
StateRoot: b.Root.Bytes(),
ReceiptsRoot: b.ReceiptHash.Bytes(),
LogsBloom: b.Bloom.Bytes(),
PrevRandao: b.MixDigest.Bytes(),
BlockNumber: b.Number.Uint64(),
GasLimit: b.GasLimit,
GasUsed: b.GasUsed,
Timestamp: b.Time,
ExtraData: b.Extra,
BaseFeePerGas: bytesutil.PadTo(bytesutil.ReverseByteOrder(b.BaseFee.Bytes()), fieldparams.RootLength),
BlockHash: b.Hash.Bytes(),
Transactions: txs,
Withdrawals: b.Withdrawals,
BlobGasUsed: *b.BlobGasUsed,
ExcessBlobGas: *b.ExcessBlobGas,
},
ExecutionRequests: e.Message.ExecutionRequests,
BuilderIndex: e.Message.BuilderIndex,
BeaconBlockRoot: e.Message.BeaconBlockRoot,
Slot: e.Message.Slot,
BlobKzgCommitments: e.Message.BlobKzgCommitments,
StateRoot: e.Message.StateRoot,
},
Signature: e.Signature,
}, nil
}
2 changes: 1 addition & 1 deletion beacon-chain/rpc/eth/beacon/handlers_epbs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (s *Server) GetExecutionPayloadV1(w http.ResponseWriter, r *http.Request) {
httputil.HandleError(w, "block_id is required in URL params", http.StatusBadRequest)
return
}
signed, err := s.Blocker.Payload(ctx, s.ExecutionReconstructor.Client(), []byte(blockId))
signed, err := s.Blocker.Payload(ctx, []byte(blockId))
if !writePayloadFetchError(w, signed, err) {
return
}
Expand Down
67 changes: 16 additions & 51 deletions beacon-chain/rpc/lookup/blocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
enginev1 "github.com/prysmaticlabs/prysm/v5/proto/engine/v1"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
"github.com/prysmaticlabs/prysm/v5/time/slots"
log "github.com/sirupsen/logrus"
Expand All @@ -46,39 +45,40 @@ func (e BlockIdParseError) Error() string {
type Blocker interface {
Block(ctx context.Context, id []byte) (interfaces.ReadOnlySignedBeaconBlock, error)
Blobs(ctx context.Context, id string, indices []uint64) ([]*blocks.VerifiedROBlob, *core.RpcError)
Payload(ctx context.Context, client execution.RPCClient, id []byte) (interfaces.ROSignedExecutionPayloadEnvelope, error)
Payload(ctx context.Context, id []byte) (interfaces.ROSignedExecutionPayloadEnvelope, error)
}

// BeaconDbBlocker is an implementation of Blocker. It retrieves blocks from the beacon chain database.
type BeaconDbBlocker struct {
BeaconDB db.ReadOnlyDatabase
ChainInfoFetcher blockchain.ChainInfoFetcher
GenesisTimeFetcher blockchain.TimeFetcher
BlobStorage *filesystem.BlobStorage
BeaconDB db.ReadOnlyDatabase
ChainInfoFetcher blockchain.ChainInfoFetcher
GenesisTimeFetcher blockchain.TimeFetcher
BlobStorage *filesystem.BlobStorage
ExecutionReconstructor execution.Reconstructor
}

func (p *BeaconDbBlocker) headPayload(ctx context.Context, client execution.RPCClient) (interfaces.ROSignedExecutionPayloadEnvelope, error) {
func (p *BeaconDbBlocker) headPayload(ctx context.Context) (interfaces.ROSignedExecutionPayloadEnvelope, error) {
root, err := p.ChainInfoFetcher.HeadRoot(ctx)
if err != nil {
return nil, errors.Wrap(err, "could not retrieve head root")
}
return p.Payload(ctx, client, root)
return p.Payload(ctx, root)
}

func (p *BeaconDbBlocker) finalizedPayload(ctx context.Context, client execution.RPCClient) (interfaces.ROSignedExecutionPayloadEnvelope, error) {
func (p *BeaconDbBlocker) finalizedPayload(ctx context.Context) (interfaces.ROSignedExecutionPayloadEnvelope, error) {
finalized := p.ChainInfoFetcher.FinalizedCheckpt()
return p.Payload(ctx, client, finalized.Root)
return p.Payload(ctx, finalized.Root)
}

// Payload returns a ROSignedExecutionPayloadEnvelope for a given block ID.
func (p *BeaconDbBlocker) Payload(ctx context.Context, client execution.RPCClient, id []byte) (interfaces.ROSignedExecutionPayloadEnvelope, error) {
func (p *BeaconDbBlocker) Payload(ctx context.Context, id []byte) (interfaces.ROSignedExecutionPayloadEnvelope, error) {
var err error
var blk interfaces.ReadOnlySignedBeaconBlock
switch string(id) {
case "head":
return p.headPayload(ctx, client)
return p.headPayload(ctx)
case "finalized":
return p.finalizedPayload(ctx, client)
return p.finalizedPayload(ctx)
case "genesis":

default:
Expand Down Expand Up @@ -144,44 +144,9 @@ func (p *BeaconDbBlocker) Payload(ctx context.Context, client execution.RPCClien
if err != nil {
return nil, errors.Wrap(err, "could not retrieve signed blind payload envelope")
}
result := make([]*enginev1.ExecutionPayloadBody, 0)
if err := client.CallContext(ctx, &result, execution.GetPayloadBodiesByHashV1, [][]byte{hash[:]}); err != nil {
return nil, err
}
if len(result) != 1 {
return nil, errors.New("could not find execution payload body")
}
body := result[0]
parentHash := header.ParentBlockHash()
transactions := make([][]byte, len(body.Transactions))
for i, tx := range body.Transactions {
transactions[i] = tx
}

pbFull := &enginev1.SignedExecutionPayloadEnvelope{
Message: &enginev1.ExecutionPayloadEnvelope{
Payload: &enginev1.ExecutionPayloadDeneb{
ParentHash: parentHash[:],
FeeRecipient: make([]byte, 20),
StateRoot: make([]byte, 32),
ReceiptsRoot: make([]byte, 32),
LogsBloom: make([]byte, 256),
PrevRandao: make([]byte, 32),
GasLimit: header.GasLimit(),
ExtraData: make([]byte, 32),
BaseFeePerGas: make([]byte, 32),
BlockHash: hash[:],
Transactions: transactions,
Withdrawals: body.Withdrawals,
},
ExecutionRequests: pb.Message.ExecutionRequests,
BuilderIndex: pb.Message.BuilderIndex,
BeaconBlockRoot: pb.Message.BeaconBlockRoot,
Slot: pb.Message.Slot,
BlobKzgCommitments: pb.Message.BlobKzgCommitments,
StateRoot: pb.Message.StateRoot,
},
Signature: pb.Signature,
pbFull, err := p.ExecutionReconstructor.ReconstructPayloadEnvelope(ctx, pb)
if err != nil {
return nil, errors.Wrap(err, "could not reconstruct payload envelope")
}
return blocks.WrappedROSignedExecutionPayloadEnvelope(pbFull)
}
Expand Down
9 changes: 5 additions & 4 deletions beacon-chain/rpc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,11 @@ func NewService(ctx context.Context, cfg *Config) *Service {
ReplayerBuilder: ch,
}
blocker := &lookup.BeaconDbBlocker{
BeaconDB: s.cfg.BeaconDB,
ChainInfoFetcher: s.cfg.ChainInfoFetcher,
GenesisTimeFetcher: s.cfg.GenesisTimeFetcher,
BlobStorage: s.cfg.BlobStorage,
BeaconDB: s.cfg.BeaconDB,
ChainInfoFetcher: s.cfg.ChainInfoFetcher,
GenesisTimeFetcher: s.cfg.GenesisTimeFetcher,
BlobStorage: s.cfg.BlobStorage,
ExecutionReconstructor: s.cfg.ExecutionReconstructor,
}
rewardFetcher := &rewards.BlockRewardService{Replayer: ch, DB: s.cfg.BeaconDB}
coreService := &core.Service{
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_library(
"rpc_blob_sidecars_by_range.go",
"rpc_blob_sidecars_by_root.go",
"rpc_chunked_response.go",
"rpc_execution_payload_envelope.go",
"rpc_goodbye.go",
"rpc_metadata.go",
"rpc_ping.go",
Expand Down
3 changes: 3 additions & 0 deletions beacon-chain/sync/rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ func newRateLimiter(p2pProvider p2p.P2P) *limiter {
// General topic for all rpc requests.
topicMap[rpcLimiterTopic] = leakybucket.NewCollector(5, defaultBurstLimit*2, leakyBucketPeriod, false /* deleteEmptyBuckets */)

// PayloadByRoots requests
topicMap[addEncoding(p2p.RPCExecutionPayloadsByRootTopicV1)] = blockCollector // Use the same rate limiter as block since payload and block comes at similar rate.

return &limiter{limiterMap: topicMap, p2p: p2pProvider}
}

Expand Down
14 changes: 14 additions & 0 deletions beacon-chain/sync/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,20 @@ type rpcHandler func(context.Context, interface{}, libp2pcore.Stream) error

// rpcHandlerByTopicFromFork returns the RPC handlers for a given fork index.
func (s *Service) rpcHandlerByTopicFromFork(forkIndex int) (map[string]rpcHandler, error) {
if forkIndex >= version.EPBS {
return map[string]rpcHandler{
p2p.RPCStatusTopicV1: s.statusRPCHandler,
p2p.RPCGoodByeTopicV1: s.goodbyeRPCHandler,
p2p.RPCBlocksByRangeTopicV2: s.beaconBlocksByRangeRPCHandler,
p2p.RPCBlocksByRootTopicV2: s.beaconBlocksRootRPCHandler,
p2p.RPCPingTopicV1: s.pingHandler,
p2p.RPCMetaDataTopicV2: s.metaDataHandler,
p2p.RPCBlobSidecarsByRootTopicV1: s.blobSidecarByRootRPCHandler,
p2p.RPCBlobSidecarsByRangeTopicV1: s.blobSidecarsByRangeRPCHandler,
p2p.RPCExecutionPayloadsByRootTopicV1: s.executionPayloadByRootRPCHandler,
}, nil
}

// Electra: https://github.com/ethereum/consensus-specs/blob/dev/specs/electra/p2p-interface.md#messages
if forkIndex >= version.Electra {
return map[string]rpcHandler{
Expand Down
77 changes: 77 additions & 0 deletions beacon-chain/sync/rpc_execution_payload_envelope.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package sync

import (
"context"

libp2pcore "github.com/libp2p/go-libp2p/core"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/encoder"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v5/network/forks"
enginev1 "github.com/prysmaticlabs/prysm/v5/proto/engine/v1"
"github.com/prysmaticlabs/prysm/v5/time/slots"
)

func (s *Service) executionPayloadByRootRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error {
SetRPCStreamDeadlines(stream)

r, ok := msg.(*types.BeaconBlockByRootsReq)
if !ok {
return errors.New("message is not type BlobSidecarsByRootReq")
}
blockRoots := *r
if err := s.rateLimiter.validateRequest(stream, uint64(len(blockRoots))); err != nil {
return err
}
if len(blockRoots) == 0 {
s.rateLimiter.add(stream, 1)
s.writeErrorResponseToStream(responseCodeInvalidRequest, "no block roots provided in request", stream)
return errors.New("no block roots provided")
}
for _, root := range blockRoots {
blindPayload, err := s.cfg.beaconDB.SignedBlindPayloadEnvelope(ctx, root[:])
if err != nil {
log.WithError(err).WithField("root", root).Error("Failed to get payload envelope")
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
return err
}
if blindPayload == nil {
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
return errors.New("payload is nil")
}
SetStreamWriteDeadline(stream, defaultWriteDuration)

constructedPayload, err := s.cfg.executionReconstructor.ReconstructPayloadEnvelope(ctx, blindPayload)
if err != nil {
log.WithError(err).WithField("root", root).Error("Failed to reconstruct payload envelope")
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
return err
}

if chunkErr := writePayloadChunk(stream, s.cfg.chain, s.cfg.p2p.Encoding(), constructedPayload); chunkErr != nil {
log.WithError(chunkErr).Debug("Could not send a chunked response")
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
return chunkErr
}
}
closeStream(stream, log)
return nil
}

func writePayloadChunk(stream libp2pcore.Stream, tor blockchain.TemporalOracle, encoding encoder.NetworkEncoding, payload *enginev1.SignedExecutionPayloadEnvelope) error {
if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil {
return err
}
valRoot := tor.GenesisValidatorsRoot()
ctxBytes, err := forks.ForkDigestFromEpoch(slots.ToEpoch(payload.Message.Slot), valRoot[:])
if err != nil {
return err
}

if err := writeContextToStream(ctxBytes[:], stream); err != nil {
return err
}
_, err = encoding.EncodeWithMaxLength(stream, payload)
return err
}
6 changes: 1 addition & 5 deletions proto/engine/v1/epbs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,9 @@ func (s *SignedExecutionPayloadEnvelope) Blind() *SignedBlindPayloadEnvelope {
if s.Message.Payload == nil {
return nil
}
payloadRoot, err := s.Message.Payload.HashTreeRoot()
if err != nil {
return nil
}
return &SignedBlindPayloadEnvelope{
Message: &BlindPayloadEnvelope{
PayloadRoot: payloadRoot[:],
BlockHash: s.Message.Payload.BlockHash,
ExecutionRequests: s.Message.ExecutionRequests,
BuilderIndex: s.Message.BuilderIndex,
BeaconBlockRoot: s.Message.BeaconBlockRoot,
Expand Down
Loading

0 comments on commit aa1b263

Please sign in to comment.