Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add payload by root handler and a few more things #14930

Merged
merged 2 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions beacon-chain/execution/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
"block_reader.go",
"deposit.go",
"engine_client.go",
"engine_client_epbs.go",
"errors.go",
"log.go",
"log_processing.go",
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/execution/engine_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ type Reconstructor interface {
ctx context.Context, blindedBlocks []interfaces.ReadOnlySignedBeaconBlock,
) ([]interfaces.SignedBeaconBlock, error)
ReconstructBlobSidecars(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, indices []bool) ([]blocks.VerifiedROBlob, error)
Client() RPCClient
ReconstructPayloadEnvelope(ctx context.Context, e *pb.SignedBlindPayloadEnvelope) (*pb.SignedExecutionPayloadEnvelope, error)
}

// EngineCaller defines a client that can interact with an Ethereum
Expand Down
70 changes: 70 additions & 0 deletions beacon-chain/execution/engine_client_epbs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package execution

import (
"context"

"github.com/ethereum/go-ethereum/common"
pb "github.com/prysmaticlabs/prysm/v5/proto/engine/v1"
)

func (s *Service) ReconstructPayloadEnvelope(ctx context.Context, e *pb.SignedBlindPayloadEnvelope) (*pb.SignedExecutionPayloadEnvelope, error) {
result := make([]*pb.ExecutionPayloadBody, 0)
if err := s.rpcClient.CallContext(ctx, &result, GetPayloadBodiesByHashV1, []common.Hash{common.Hash(e.Message.BlockHash)}); err != nil {
return nil, err
}
if result == nil || len(result) == 0 {
return nil, nil
}
b := result[0]

txs := make([][]byte, len(b.Transactions))
for i, t := range b.Transactions {
txs[i] = t
}
dr, err := pb.JsonDepositRequestsToProto(b.DepositRequests)
if err != nil {
return nil, err
}
wr, err := pb.JsonWithdrawalRequestsToProto(b.WithdrawalRequests)
if err != nil {
return nil, err
}
cr, err := pb.JsonConsolidationRequestsToProto(b.ConsolidationRequests)
if err != nil {
return nil, err
}
return &pb.SignedExecutionPayloadEnvelope{
Message: &pb.ExecutionPayloadEnvelope{
Payload: &pb.ExecutionPayloadDeneb{
ParentHash: e.Message.ParentHash,
FeeRecipient: e.Message.FeeRecipient,
StateRoot: e.Message.StateRoot,
ReceiptsRoot: e.Message.ReceiptsRoot,
LogsBloom: e.Message.LogsBloom,
PrevRandao: e.Message.PrevRandao,
BlockNumber: e.Message.BlockNumber,
GasLimit: e.Message.GasLimit,
GasUsed: e.Message.GasUsed,
Timestamp: e.Message.Timestamp,
ExtraData: e.Message.ExtraData,
BaseFeePerGas: e.Message.BaseFeePerGas,
BlockHash: e.Message.BlockHash,
Transactions: txs,
Withdrawals: b.Withdrawals,
BlobGasUsed: e.Message.BlobGasUsed,
ExcessBlobGas: e.Message.ExcessBlobGas,
},
ExecutionRequests: &pb.ExecutionRequests{
Deposits: dr,
Withdrawals: wr,
Consolidations: cr,
},
BuilderIndex: e.Message.BuilderIndex,
BeaconBlockRoot: e.Message.BeaconBlockRoot,
Slot: e.Message.Slot,
BlobKzgCommitments: e.Message.BlobKzgCommitments,
StateRoot: e.Message.StateRoot,
},
Signature: e.Signature,
}, nil
}
2 changes: 1 addition & 1 deletion beacon-chain/rpc/eth/beacon/handlers_epbs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (s *Server) GetExecutionPayloadV1(w http.ResponseWriter, r *http.Request) {
httputil.HandleError(w, "block_id is required in URL params", http.StatusBadRequest)
return
}
signed, err := s.Blocker.Payload(ctx, s.ExecutionReconstructor.Client(), []byte(blockId))
signed, err := s.Blocker.Payload(ctx, []byte(blockId))
if !writePayloadFetchError(w, signed, err) {
return
}
Expand Down
1 change: 0 additions & 1 deletion beacon-chain/rpc/lookup/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ go_library(
"//consensus-types/primitives:go_default_library",
"//encoding/bytesutil:go_default_library",
"//monitoring/tracing/trace:go_default_library",
"//proto/engine/v1:go_default_library",
"//runtime/version:go_default_library",
"//time/slots:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
Expand Down
67 changes: 16 additions & 51 deletions beacon-chain/rpc/lookup/blocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
enginev1 "github.com/prysmaticlabs/prysm/v5/proto/engine/v1"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
"github.com/prysmaticlabs/prysm/v5/time/slots"
log "github.com/sirupsen/logrus"
Expand All @@ -46,39 +45,40 @@ func (e BlockIdParseError) Error() string {
type Blocker interface {
Block(ctx context.Context, id []byte) (interfaces.ReadOnlySignedBeaconBlock, error)
Blobs(ctx context.Context, id string, indices []uint64) ([]*blocks.VerifiedROBlob, *core.RpcError)
Payload(ctx context.Context, client execution.RPCClient, id []byte) (interfaces.ROSignedExecutionPayloadEnvelope, error)
Payload(ctx context.Context, id []byte) (interfaces.ROSignedExecutionPayloadEnvelope, error)
}

// BeaconDbBlocker is an implementation of Blocker. It retrieves blocks from the beacon chain database.
type BeaconDbBlocker struct {
BeaconDB db.ReadOnlyDatabase
ChainInfoFetcher blockchain.ChainInfoFetcher
GenesisTimeFetcher blockchain.TimeFetcher
BlobStorage *filesystem.BlobStorage
BeaconDB db.ReadOnlyDatabase
ChainInfoFetcher blockchain.ChainInfoFetcher
GenesisTimeFetcher blockchain.TimeFetcher
BlobStorage *filesystem.BlobStorage
ExecutionReconstructor execution.Reconstructor
}

func (p *BeaconDbBlocker) headPayload(ctx context.Context, client execution.RPCClient) (interfaces.ROSignedExecutionPayloadEnvelope, error) {
func (p *BeaconDbBlocker) headPayload(ctx context.Context) (interfaces.ROSignedExecutionPayloadEnvelope, error) {
root, err := p.ChainInfoFetcher.HeadRoot(ctx)
if err != nil {
return nil, errors.Wrap(err, "could not retrieve head root")
}
return p.Payload(ctx, client, root)
return p.Payload(ctx, root)
}

func (p *BeaconDbBlocker) finalizedPayload(ctx context.Context, client execution.RPCClient) (interfaces.ROSignedExecutionPayloadEnvelope, error) {
func (p *BeaconDbBlocker) finalizedPayload(ctx context.Context) (interfaces.ROSignedExecutionPayloadEnvelope, error) {
finalized := p.ChainInfoFetcher.FinalizedCheckpt()
return p.Payload(ctx, client, finalized.Root)
return p.Payload(ctx, finalized.Root)
}

// Payload returns a ROSignedExecutionPayloadEnvelope for a given block ID.
func (p *BeaconDbBlocker) Payload(ctx context.Context, client execution.RPCClient, id []byte) (interfaces.ROSignedExecutionPayloadEnvelope, error) {
func (p *BeaconDbBlocker) Payload(ctx context.Context, id []byte) (interfaces.ROSignedExecutionPayloadEnvelope, error) {
var err error
var blk interfaces.ReadOnlySignedBeaconBlock
switch string(id) {
case "head":
return p.headPayload(ctx, client)
return p.headPayload(ctx)
case "finalized":
return p.finalizedPayload(ctx, client)
return p.finalizedPayload(ctx)
case "genesis":

default:
Expand Down Expand Up @@ -144,44 +144,9 @@ func (p *BeaconDbBlocker) Payload(ctx context.Context, client execution.RPCClien
if err != nil {
return nil, errors.Wrap(err, "could not retrieve signed blind payload envelope")
}
result := make([]*enginev1.ExecutionPayloadBody, 0)
if err := client.CallContext(ctx, &result, execution.GetPayloadBodiesByHashV1, [][]byte{hash[:]}); err != nil {
return nil, err
}
if len(result) != 1 {
return nil, errors.New("could not find execution payload body")
}
body := result[0]
parentHash := header.ParentBlockHash()
transactions := make([][]byte, len(body.Transactions))
for i, tx := range body.Transactions {
transactions[i] = tx
}

pbFull := &enginev1.SignedExecutionPayloadEnvelope{
Message: &enginev1.ExecutionPayloadEnvelope{
Payload: &enginev1.ExecutionPayloadDeneb{
ParentHash: parentHash[:],
FeeRecipient: make([]byte, 20),
StateRoot: make([]byte, 32),
ReceiptsRoot: make([]byte, 32),
LogsBloom: make([]byte, 256),
PrevRandao: make([]byte, 32),
GasLimit: header.GasLimit(),
ExtraData: make([]byte, 32),
BaseFeePerGas: make([]byte, 32),
BlockHash: hash[:],
Transactions: transactions,
Withdrawals: body.Withdrawals,
},
ExecutionRequests: pb.Message.ExecutionRequests,
BuilderIndex: pb.Message.BuilderIndex,
BeaconBlockRoot: pb.Message.BeaconBlockRoot,
Slot: pb.Message.Slot,
BlobKzgCommitments: pb.Message.BlobKzgCommitments,
StateRoot: pb.Message.StateRoot,
},
Signature: pb.Signature,
pbFull, err := p.ExecutionReconstructor.ReconstructPayloadEnvelope(ctx, pb)
if err != nil {
return nil, errors.Wrap(err, "could not reconstruct payload envelope")
}
return blocks.WrappedROSignedExecutionPayloadEnvelope(pbFull)
}
Expand Down
9 changes: 5 additions & 4 deletions beacon-chain/rpc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,11 @@ func NewService(ctx context.Context, cfg *Config) *Service {
ReplayerBuilder: ch,
}
blocker := &lookup.BeaconDbBlocker{
BeaconDB: s.cfg.BeaconDB,
ChainInfoFetcher: s.cfg.ChainInfoFetcher,
GenesisTimeFetcher: s.cfg.GenesisTimeFetcher,
BlobStorage: s.cfg.BlobStorage,
BeaconDB: s.cfg.BeaconDB,
ChainInfoFetcher: s.cfg.ChainInfoFetcher,
GenesisTimeFetcher: s.cfg.GenesisTimeFetcher,
BlobStorage: s.cfg.BlobStorage,
ExecutionReconstructor: s.cfg.ExecutionReconstructor,
}
rewardFetcher := &rewards.BlockRewardService{Replayer: ch, DB: s.cfg.BeaconDB}
coreService := &core.Service{
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_library(
"rpc_blob_sidecars_by_range.go",
"rpc_blob_sidecars_by_root.go",
"rpc_chunked_response.go",
"rpc_execution_payload_envelope.go",
"rpc_goodbye.go",
"rpc_metadata.go",
"rpc_ping.go",
Expand Down
3 changes: 3 additions & 0 deletions beacon-chain/sync/rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ func newRateLimiter(p2pProvider p2p.P2P) *limiter {
// General topic for all rpc requests.
topicMap[rpcLimiterTopic] = leakybucket.NewCollector(5, defaultBurstLimit*2, leakyBucketPeriod, false /* deleteEmptyBuckets */)

// PayloadByRoots requests
topicMap[addEncoding(p2p.RPCExecutionPayloadsByRootTopicV1)] = blockCollector // Use the same rate limiter as block since payload and block comes at similar rate.

return &limiter{limiterMap: topicMap, p2p: p2pProvider}
}

Expand Down
14 changes: 14 additions & 0 deletions beacon-chain/sync/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,20 @@ type rpcHandler func(context.Context, interface{}, libp2pcore.Stream) error

// rpcHandlerByTopicFromFork returns the RPC handlers for a given fork index.
func (s *Service) rpcHandlerByTopicFromFork(forkIndex int) (map[string]rpcHandler, error) {
if forkIndex >= version.EPBS {
return map[string]rpcHandler{
p2p.RPCStatusTopicV1: s.statusRPCHandler,
p2p.RPCGoodByeTopicV1: s.goodbyeRPCHandler,
p2p.RPCBlocksByRangeTopicV2: s.beaconBlocksByRangeRPCHandler,
p2p.RPCBlocksByRootTopicV2: s.beaconBlocksRootRPCHandler,
p2p.RPCPingTopicV1: s.pingHandler,
p2p.RPCMetaDataTopicV2: s.metaDataHandler,
p2p.RPCBlobSidecarsByRootTopicV1: s.blobSidecarByRootRPCHandler,
p2p.RPCBlobSidecarsByRangeTopicV1: s.blobSidecarsByRangeRPCHandler,
p2p.RPCExecutionPayloadsByRootTopicV1: s.executionPayloadByRootRPCHandler,
}, nil
}

// Electra: https://github.com/ethereum/consensus-specs/blob/dev/specs/electra/p2p-interface.md#messages
if forkIndex >= version.Electra {
return map[string]rpcHandler{
Expand Down
77 changes: 77 additions & 0 deletions beacon-chain/sync/rpc_execution_payload_envelope.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package sync

import (
"context"

libp2pcore "github.com/libp2p/go-libp2p/core"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/encoder"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v5/network/forks"
enginev1 "github.com/prysmaticlabs/prysm/v5/proto/engine/v1"
"github.com/prysmaticlabs/prysm/v5/time/slots"
)

func (s *Service) executionPayloadByRootRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error {
SetRPCStreamDeadlines(stream)

r, ok := msg.(*types.BeaconBlockByRootsReq)
if !ok {
return errors.New("message is not type BlobSidecarsByRootReq")
}
blockRoots := *r
if err := s.rateLimiter.validateRequest(stream, uint64(len(blockRoots))); err != nil {
return err
}
if len(blockRoots) == 0 {
s.rateLimiter.add(stream, 1)
s.writeErrorResponseToStream(responseCodeInvalidRequest, "no block roots provided in request", stream)
return errors.New("no block roots provided")
}
for _, root := range blockRoots {
blindPayload, err := s.cfg.beaconDB.SignedBlindPayloadEnvelope(ctx, root[:])
if err != nil {
log.WithError(err).WithField("root", root).Error("Failed to get payload envelope")
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
return err
}
if blindPayload == nil {
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
return errors.New("payload is nil")
}
SetStreamWriteDeadline(stream, defaultWriteDuration)

constructedPayload, err := s.cfg.executionReconstructor.ReconstructPayloadEnvelope(ctx, blindPayload)
if err != nil {
log.WithError(err).WithField("root", root).Error("Failed to reconstruct payload envelope")
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
return err
}

if chunkErr := writePayloadChunk(stream, s.cfg.chain, s.cfg.p2p.Encoding(), constructedPayload); chunkErr != nil {
log.WithError(chunkErr).Debug("Could not send a chunked response")
s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream)
return chunkErr
}
}
closeStream(stream, log)
return nil
}

func writePayloadChunk(stream libp2pcore.Stream, tor blockchain.TemporalOracle, encoding encoder.NetworkEncoding, payload *enginev1.SignedExecutionPayloadEnvelope) error {
if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil {
return err
}
valRoot := tor.GenesisValidatorsRoot()
ctxBytes, err := forks.ForkDigestFromEpoch(slots.ToEpoch(payload.Message.Slot), valRoot[:])
if err != nil {
return err
}

if err := writeContextToStream(ctxBytes[:], stream); err != nil {
return err
}
_, err = encoding.EncodeWithMaxLength(stream, payload)
return err
}
Loading
Loading