Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node api audit #1186

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 67 additions & 9 deletions api/errors.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package api

import (
"errors"
"fmt"

"google.golang.org/grpc/codes"
Expand All @@ -10,39 +11,96 @@ import (
// The canonical errors from the EigenDA gRPC API endpoints.
//
// Notes:
// - We start with a small (but sufficient) subset of grpc's error codes,
// and expand when there is an important failure case to separate out. See:
// https://grpc.io/docs/guides/status-codes/
// - Make sure that internally propagated errors are eventually wrapped in one of the
// user-facing errors defined here, since grpc otherwise returns an UNKNOWN error code,
// which is harder to debug and understand for users.
// - See https://github.com/googleapis/googleapis/blob/ba8ea80f25d19bde8501cd51f314391f8d39bde8/google/rpc/code.proto
// for the mapping of grpc error codes to HTTP status codes.

// - We start with a small (but sufficient) subset of grpc's error codes,
// and expand when there is an important failure case to separate out. See:
// https://grpc.io/docs/guides/status-codes/
// - Make sure that internally propagated errors are eventually wrapped in one of the
// user-facing errors defined here, since grpc otherwise returns an UNKNOWN error code,
// which is harder to debug and understand for users.
// - See https://github.com/googleapis/googleapis/blob/ba8ea80f25d19bde8501cd51f314391f8d39bde8/google/rpc/code.proto
// for the mapping of grpc error codes to HTTP status codes.
func newErrorGRPC(code codes.Code, msg string) error {
return status.Error(code, msg)
}

// WrapAsGRPCError wraps an error with a gRPC code and message. If the error already has a gRPC code,
// it retains the code but combines the messages. Otherwise, it uses the provided code and combined message.
func WrapAsGRPCError(err error, code codes.Code, msg string) error {
Comment on lines +26 to +28
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When should this happen? Feels weird to me to have to wrap a GRPC error in another one, because a grpc status code wrapping should be terminal and returned to the user. Otherwise don't we need to define the overwrite/precedence logic for which error to return to user (eg 400 comes before 500 or not?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also should prob make private so force people to use the other specific ones you wrote for each error code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a server uses other services as backend, this can happen.

If the server calls HandleFooWithServiceX(params) error, and an error returned, this error by default should be INTERNAL error, but it's also possible the params are no valid which results in an error (the caller may not have sufficient information to validate params before calling).

In such case you may want to WrapAsGRPCError(err, INTERNAL, "...."): if HandleFooWithServiceX has an opinion about the error type, then we should use that; otherwise, it should be INTERNAL.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see! Thanks for the explanation.

So right, if an external service returns a 400, that should be, from our server's client's pov, converted to a 500 (we are doing something wrong).

So I feel like something simpler would be:

  • rename this function something clearer (convertExternalServiceError?) and your explanation should be added as documentation comment)
  • or just get rid of this function altogether, and force people to use the other methods (unwrap the error string from external service's error and use NewErrorInternal(err.Err()))

I personally like to keep any package's public method surface small, so would prefer to remove functions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not just external service error, it can be the validation with more info from external service failed

if err == nil {
return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldnt this just use newErrorGRPC in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explained above

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't really like this semantic though, would prefer at each call site the more explicit

if err != nil {
   WrapAsGRPCError(err, code, msg)
}

instead of wrapping by default and forcing reader to go make sure that the function returns nil if no error was present

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's defensive coding to handle nil parameter, why is it an issue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its not defensive coding, defensive coding would return an error or panic, this just silently returns nil, which is the opposite of defensive coding.

}

combinedMessage := fmt.Sprintf("%s: %s", msg, err.Error())
if s, ok := status.FromError(err); ok {
// If it's already a gRPC error, use the same code but combine the messages
return status.Error(s.Code(), combinedMessage)
Comment on lines +35 to +36
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why use the old code? Why not the new one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explained above

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't match your description above though?
If externalService returns 400, we want to return 500 to our users, not 400. So you should use the code passed to wrapAsGRPCError always I think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above explanation is basically "if lower level function has opinion of error, preserve it"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But that is just wrong...

}

// If it's not a gRPC error, create a new gRPC error with the given code and message
return status.Error(code, combinedMessage)
}

// WrapGRPCError wraps an error with a message. If the error is a gRPC error, it retains the code
// and combines the messages. Otherwise, it returns a new error with the combined message.
func WrapGRPCError(err error, msg string) error {
if err == nil {
return nil
}

combinedMessage := fmt.Sprintf("%s: %s", msg, err.Error())
if s, ok := status.FromError(err); ok {
// If it's a gRPC error, keep it as gRPC error but combine the messages
return status.Error(s.Code(), combinedMessage)
}

return errors.New(combinedMessage)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function has weird semantics, its called WrapGRPCError but can accept a non-grpc error and return a non-grpc error. Can we simplify it instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's exactly the desired semantics. If there is a better concise name I'm happy to use it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WrapGRPCError indicates that only GRPC errors should be allowed to be wrapped, yet your documentation says that its fine to pass another error and itll just get wrapped normally. Error wrapping also typically means wrapping an error, not just extending the error with some other msg string. So it feels convoluted and unclear to me.

I like when functions have a single small, and clear purpose. Just like golang's errors api, only has .Is() and .As(), and errors.join() for combining errors, thats it. Forces users to use it in the same way, and not too much cognitive overload when trying to use it.

How about we mention that this should ONLY be used for wrapping grpc errors? I don't think any function should return either a GRPC error or not right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

golang's api needs to be small as it has no context

the functions are not expected to return grpc error, only the APIs do; and the wrapper here works no matter if a function returns grpc error or not.

}

// HTTP Mapping: 400 Bad Request
func NewErrorInvalidArg(msg string) error {
return newErrorGRPC(codes.InvalidArgument, msg)
}

func WrapAsInvalidArg(err error, msg string) error {
return WrapAsGRPCError(err, codes.InvalidArgument, msg)
}

// HTTP Mapping: 404 Not Found
func NewErrorNotFound(msg string) error {
return newErrorGRPC(codes.NotFound, msg)
}

func WrapAsNotFound(err error, msg string) error {
return WrapAsGRPCError(err, codes.NotFound, msg)
}

// HTTP Mapping: 429 Too Many Requests
func NewErrorResourceExhausted(msg string) error {
return newErrorGRPC(codes.ResourceExhausted, msg)
}

func WrapAsResourceExhausted(err error, msg string) error {
return WrapAsGRPCError(err, codes.ResourceExhausted, msg)
}

// HTTP Mapping: 500 Internal Server Error
func NewErrorInternal(msg string) error {
return newErrorGRPC(codes.Internal, msg)
}

func WrapAsInternal(err error, msg string) error {
return WrapAsGRPCError(err, codes.Internal, msg)
}

// HTTP Mapping: 401 Unauthorized
func NewErrorUnauthenticated(msg string) error {
return newErrorGRPC(codes.Unauthenticated, msg)
}

func WrapAsUnauthenticated(err error, msg string) error {
return WrapAsGRPCError(err, codes.Unauthenticated, msg)
}

// HTTP Mapping: 500 Internal Server Error
func NewErrorUnknown(msg string) error {
return newErrorGRPC(codes.Unknown, msg)
Expand Down
9 changes: 5 additions & 4 deletions node/auth/authenticator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package auth
import (
"context"
"fmt"
"time"

"github.com/Layr-Labs/eigenda/api"
grpc "github.com/Layr-Labs/eigenda/api/grpc/validator"
"github.com/Layr-Labs/eigenda/core"
gethcommon "github.com/ethereum/go-ethereum/common"
lru "github.com/hashicorp/golang-lru/v2"
"time"
)

// RequestAuthenticator authenticates requests to the DA node. This object is thread safe.
Expand Down Expand Up @@ -120,7 +121,7 @@ func (a *requestAuthenticator) AuthenticateStoreChunksRequest(

key, err := a.getDisperserKey(ctx, now, request.DisperserID)
if err != nil {
return fmt.Errorf("failed to get operator key: %w", err)
return api.WrapGRPCError(err, "failed to get operator key")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we know for sure that err here is a grpc error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point of this wrapper is to make it easier for APIs to return canonical errors, without having to read all underlying code

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally think this is a bad idea though. It's better to be SUPER clear on the semantics on functions. If a function returns a grpc error, then ALL of its returned errors should be GRPC. If not, then the function is too large and should be broken down to respect this invariant. This way error handling semantics and documentation is simple, and you can't go wrong.

Otherwise if "everything is easy" people just don't know what they are dealing with, and to extend a function or return a new error code they need to go read the entire code path to understand what kind of errors they could be receiving in different cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to code for easy static checkability

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the issue is you cannot ask all functions to return grpc error which fits only at interface level

}

err = VerifyStoreChunksRequest(*key, request)
Expand All @@ -139,7 +140,7 @@ func (a *requestAuthenticator) getDisperserKey(
disperserID uint32) (*gethcommon.Address, error) {

if !a.disperserIDFilter(disperserID) {
return nil, fmt.Errorf("invalid disperser ID: %d", disperserID)
return nil, api.NewErrorInvalidArg(fmt.Sprintf("invalid disperser ID: %d", disperserID))
samlaf marked this conversation as resolved.
Show resolved Hide resolved
}

key, ok := a.keyCache.Get(disperserID)
Expand All @@ -152,7 +153,7 @@ func (a *requestAuthenticator) getDisperserKey(

address, err := a.chainReader.GetDisperserAddress(ctx, disperserID)
if err != nil {
return nil, fmt.Errorf("failed to get disperser address: %w", err)
return nil, api.WrapAsInternal(err, "failed to get disperser address")
}

a.keyCache.Add(disperserID, &keyWithTimeout{
Expand Down
62 changes: 38 additions & 24 deletions node/grpc/server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ func (s *ServerV2) GetNodeInfo(ctx context.Context, in *pb.GetNodeInfoRequest) (
Os: runtime.GOOS,
Arch: runtime.GOARCH,
NumCpu: uint32(runtime.GOMAXPROCS(0)),
MemBytes: memBytes}, nil
MemBytes: memBytes,
}, nil
}

func (s *ServerV2) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (*pb.StoreChunksReply, error) {
Expand All @@ -103,19 +104,6 @@ func (s *ServerV2) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (
return nil, api.NewErrorInvalidArg("v2 API is disabled")
}

if s.authenticator != nil {
disperserPeer, ok := peer.FromContext(ctx)
if !ok {
return nil, errors.New("could not get peer information")
}
disperserAddress := disperserPeer.Addr.String()

err := s.authenticator.AuthenticateStoreChunksRequest(ctx, disperserAddress, in, time.Now())
if err != nil {
return nil, fmt.Errorf("failed to authenticate request: %w", err)
}
}

if s.node.StoreV2 == nil {
return nil, api.NewErrorInternal("v2 store not initialized")
}
Expand All @@ -124,25 +112,40 @@ func (s *ServerV2) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (
return nil, api.NewErrorInternal("missing bls signer")
}

// Validate the request parameters (which is cheap) before starting any further
// processing of the request.
batch, err := s.validateStoreChunksRequest(in)
if err != nil {
return nil, err
return nil, api.NewErrorInvalidArg(fmt.Sprintf("failed to validate store chunk request: %v", err))
samlaf marked this conversation as resolved.
Show resolved Hide resolved
}

batchHeaderHash, err := batch.BatchHeader.Hash()
if err != nil {
return nil, api.NewErrorInternal(fmt.Sprintf("invalid batch header: %v", err))
return nil, api.NewErrorInvalidArg(fmt.Sprintf("failed to serialize batch header hash: %v", err))
}

if s.authenticator != nil {
disperserPeer, ok := peer.FromContext(ctx)
if !ok {
return nil, api.NewErrorInvalidArg("could not get peer information from request context")
}
disperserAddress := disperserPeer.Addr.String()

err := s.authenticator.AuthenticateStoreChunksRequest(ctx, disperserAddress, in, time.Now())
if err != nil {
return nil, api.WrapAsUnauthenticated(err, "failed to authenticate request")
}
}

s.logger.Info("new StoreChunks request", "batchHeaderHash", hex.EncodeToString(batchHeaderHash[:]), "numBlobs", len(batch.BlobCertificates), "referenceBlockNumber", batch.BatchHeader.ReferenceBlockNumber)
operatorState, err := s.node.ChainState.GetOperatorStateByOperator(ctx, uint(batch.BatchHeader.ReferenceBlockNumber), s.node.Config.ID)
if err != nil {
return nil, err
return nil, api.WrapAsInternal(err, "failed to get the operator state")
}

blobShards, rawBundles, err := s.node.DownloadBundles(ctx, batch, operatorState)
if err != nil {
return nil, api.NewErrorInternal(fmt.Sprintf("failed to download batch: %v", err))
return nil, api.WrapAsInternal(err, "failed to download batch")
}

type storeResult struct {
Expand All @@ -155,7 +158,7 @@ func (s *ServerV2) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (
if err != nil {
storeChan <- storeResult{
keys: nil,
err: fmt.Errorf("failed to store batch: %v", err),
err: err,
}
return
}
Expand All @@ -176,17 +179,17 @@ func (s *ServerV2) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (
s.logger.Error("failed to delete keys", "err", deleteErr, "batchHeaderHash", hex.EncodeToString(batchHeaderHash[:]))
}
}
return nil, api.NewErrorInternal(fmt.Sprintf("failed to validate batch: %v", err))
return nil, api.WrapAsInternal(err, "failed to validate batch")
}

res := <-storeChan
if res.err != nil {
return nil, api.NewErrorInternal(fmt.Sprintf("failed to store batch: %v", res.err))
return nil, api.WrapAsInternal(res.err, "failed to store batch")
}

sig, err := s.node.BLSSigner.Sign(ctx, batchHeaderHash[:])
if err != nil {
return nil, api.NewErrorInternal(fmt.Sprintf("failed to sign batch: %v", err))
return nil, api.WrapAsInternal(err, "failed to sign batch")
}

s.metrics.ReportStoreChunksLatency(time.Since(start))
Expand All @@ -198,13 +201,24 @@ func (s *ServerV2) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (

// validateStoreChunksRequest validates the StoreChunksRequest and returns deserialized batch in the request
func (s *ServerV2) validateStoreChunksRequest(req *pb.StoreChunksRequest) (*corev2.Batch, error) {
if req.GetDisperserID() != api.EigenLabsDisperserID {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment why this would result in an invalid store chunks request?

return nil, fmt.Errorf("disperserID is invalid: %d", req.GetDisperserID())
}

// The signature is created by go-ethereum library, which contains 1 additional byte (for
// recovering the public key from signature), so it's 65 bytes.
if len(req.GetSignature()) != 65 {
return nil, fmt.Errorf("signature must be 65 bytes, found %d bytes", len(req.GetSignature()))
}

if req.GetBatch() == nil {
return nil, api.NewErrorInvalidArg("missing batch in request")
return nil, errors.New("missing batch in request")
}

// BatchFromProtobuf internally validates the Batch while deserializing
batch, err := corev2.BatchFromProtobuf(req.GetBatch())
if err != nil {
return nil, api.NewErrorInvalidArg(fmt.Sprintf("failed to deserialize batch: %v", err))
return nil, fmt.Errorf("failed to deserialize batch: %v", err)
}

return batch, nil
Expand Down
Loading
Loading