@@ -5,85 +5,62 @@ import (
55 "encoding/base64"
66 "encoding/hex"
77 "fmt"
8- "log"
8+ "log/slog"
9+ "os"
910 "time"
1011
12+ "github.com/Layr-Labs/eigenda/api/clients"
13+ grpcdisperser "github.com/Layr-Labs/eigenda/api/grpc/disperser"
14+ "github.com/Layr-Labs/eigenda/disperser"
15+ "github.com/ethereum/go-ethereum/log"
1116 "github.com/stackrlabs/go-daash/da"
12- "google.golang.org/grpc"
13- "google.golang.org/grpc/credentials"
1417)
1518
1619type Client struct {
17- // DaRpc is the HTTP provider URL for the Data Availability node.
18- DARpc string
19-
20- // DisperserClient is the gRPC client for the Disperser service.
21- disperserClient DisperserClient
22-
23- // Quorum IDs and SecurityParams to use when dispersing and retrieving blobs
24- DADisperserSecurityParams []* SecurityParams
25-
26- // The total amount of time that the batcher will spend waiting for EigenDA to confirm a blob
27- DAStatusQueryTimeout time.Duration
28-
29- // The amount of time to wait between status queries of a newly dispersed blob
30- DAStatusQueryRetryInterval time.Duration
20+ // internalClient is used to interact with the EigenDA API
21+ internalClient clients.EigenDAClient
3122}
3223
3324// NewClient returns a new instance of the EigenDA client.
34- func NewClient (daRpc string , daStatusQueryTimeout time.Duration , daStatusQueryRetryInterval time.Duration ) (* Client , error ) {
35- conn , err := grpc .Dial (daRpc , grpc .WithTransportCredentials (credentials .NewClientTLSFromCert (nil , "" )))
25+ func NewClient (config clients.EigenDAClientConfig ) (* Client , error ) {
26+ logger := log .NewLogger (slog .NewTextHandler (os .Stdout , nil ))
27+ client , err := clients .NewEigenDAClient (logger , config )
3628 if err != nil {
37- fmt .Println ("Unable to connect to EigenDA, aborting" , "err" , err )
38- return nil , err
29+ return nil , fmt .Errorf ("failed to create EigenDA client: %v" , err )
3930 }
40- daClient := NewDisperserClient (conn )
41-
42- disperserSecurityParams := []* SecurityParams {}
43- disperserSecurityParams = append (disperserSecurityParams , & SecurityParams {
44- QuorumId : 0 ,
45- AdversaryThreshold : 25 ,
46- QuorumThreshold : 50 ,
47- })
48- log .Println ("🟢 EigenDA client initalised" )
31+
4932 return & Client {
50- DARpc : daRpc ,
51- disperserClient : daClient ,
52- DADisperserSecurityParams : disperserSecurityParams ,
53- DAStatusQueryTimeout : daStatusQueryTimeout ,
54- DAStatusQueryRetryInterval : daStatusQueryRetryInterval ,
33+ internalClient : * client ,
5534 }, nil
5635}
5736
5837func (e * Client ) MaxBlobSize (ctx context.Context ) (uint64 , error ) {
59- return 512 * 1024 , nil // Currently set at 512KB
38+ return 2 * 1024 * 1024 , nil // Currently set at 2MB
6039}
6140
62- func (e * Client ) Submit (ctx context.Context , daBlob da.Blob , gasPrice float64 ) (da.ID , error ) {
63- blobInfo , err := e .disperseBlob (ctx , daBlob )
41+ func (c * Client ) Submit (ctx context.Context , daBlob da.Blob , gasPrice float64 ) (da.ID , error ) {
42+ start := time .Now ()
43+ blobID , err := c .PutBlob (ctx , daBlob )
6444 if err != nil {
6545 return nil , fmt .Errorf ("failed to disperse blob: %v" , err )
6646 }
67- blobID := ID {
68- BlobIndex : blobInfo .BlobVerificationProof .BlobIndex ,
69- BatchHeaderHash : blobInfo .BlobVerificationProof .BatchMetadata .BatchHeaderHash ,
70- }
47+ end := time .Now ()
48+ fmt .Println ("Time taken to disperse blob:" , end .Sub (start ))
49+
7150 return blobID , nil
7251}
7352
74- func (e * Client ) Get (ctx context.Context , id da.ID ) (da.Blob , error ) {
75- blobID , ok := id .(ID )
53+ func (c * Client ) Get (ctx context.Context , id da.ID ) (da.Blob , error ) {
54+ blobID , ok := id .(* ID )
7655 if ! ok {
7756 return nil , fmt .Errorf ("invalid ID type" )
7857 }
79- resp , err := e .disperserClient .RetrieveBlob (ctx , & RetrieveBlobRequest {
80- BlobIndex : blobID .BlobIndex ,
81- BatchHeaderHash : blobID .BatchHeaderHash ,
82- })
58+ blob , err := c .internalClient .GetBlob (ctx , blobID .BlobInfo .BlobVerificationProof .BatchMetadata .BatchHeaderHash , blobID .BlobInfo .BlobVerificationProof .BlobIndex )
8359 if err != nil {
8460 return nil , fmt .Errorf ("failed to retrieve blob: %v" , err )
8561 }
86- return resp .Data , nil
62+
63+ return blob , nil
8764}
8865
8966func (e * Client ) Commit (ctx context.Context , daBlob da.Blob ) (da.Commitment , error ) {
@@ -98,66 +75,98 @@ func (e *Client) GetProof(ctx context.Context, id da.ID) (da.Proof, error) {
9875 return nil , nil
9976}
10077
101- type ID struct {
102- BlobIndex uint32
103- BatchHeaderHash []byte
78+ // PutBlob encodes and writes a blob to EigenDA, waiting for it to be confirmed
79+ // before returning. This function is resiliant to transient failures and
80+ // timeouts.
81+ func (c * Client ) PutBlob (ctx context.Context , data []byte ) (ID , error ) {
82+ resultChan , errorChan := c .PutBlobAsync (ctx , data )
83+ select { // no timeout here because we depend on the configured timeout in PutBlobAsync
84+ case result := <- resultChan :
85+ return result , nil
86+ case err := <- errorChan :
87+ return ID {}, err
88+ }
10489}
10590
106- func (e * Client ) disperseBlob (ctx context.Context , txData []byte ) (* BlobInfo , error ) {
107- fmt .Println ("Attempting to disperse blob to EigenDA" )
91+ func (c * Client ) PutBlobAsync (ctx context.Context , data []byte ) (resultChan chan ID , errChan chan error ) {
92+ resultChan = make (chan ID , 1 )
93+ errChan = make (chan error , 1 )
94+ go c .putBlob (ctx , data , resultChan , errChan )
95+ return
96+ }
10897
109- disperseReq := & DisperseBlobRequest {
110- Data : txData ,
111- SecurityParams : e .DADisperserSecurityParams ,
98+ func (c * Client ) putBlob (ctx context.Context , rawData []byte , resultChan chan ID , errChan chan error ) {
99+ // encode blob
100+ if c .internalClient .Codec == nil {
101+ errChan <- fmt .Errorf ("codec cannot be nil" )
102+ return
112103 }
113- daClient := e .disperserClient
114- disperseRes , err := daClient .DisperseBlob (ctx , disperseReq )
115- fmt .Println ("DisperseBlob response" , "disperseRes" , disperseRes , "err" , err )
116104
105+ data , err := c .internalClient .Codec .EncodeBlob (rawData )
117106 if err != nil {
118- fmt .Printf ( "Unable to disperse blob to EigenDA, aborting" , "err " , err )
119- return nil , err
107+ errChan <- fmt .Errorf ( "error encoding blob: %w " , err )
108+ return
120109 }
121110
122- if disperseRes .Result == BlobStatus_UNKNOWN ||
123- disperseRes .Result == BlobStatus_FAILED {
124- fmt .Printf ("Unable to disperse blob to EigenDA, aborting" , "err" , err )
125- return nil , fmt .Errorf ("reply status is %d" , disperseRes .Result )
111+ customQuorumNumbers := make ([]uint8 , len (c .internalClient .Config .CustomQuorumIDs ))
112+ for i , id := range c .internalClient .Config .CustomQuorumIDs {
113+ customQuorumNumbers [i ] = uint8 (id )
114+ }
115+ // disperse blob
116+ blobStatus , requestID , err := c .internalClient .Client .DisperseBlobAuthenticated (ctx , data , customQuorumNumbers )
117+ if err != nil {
118+ errChan <- fmt .Errorf ("error initializing DisperseBlobAuthenticated() client: %w" , err )
119+ return
126120 }
127121
128- base64RequestID := base64 .StdEncoding .EncodeToString (disperseRes .RequestId )
129-
130- fmt .Println ("Blob disepersed to EigenDA, now waiting for confirmation" , "requestID" , base64RequestID )
131-
132- var statusRes * BlobStatusReply
133- timeoutTime := time .Now ().Add (e .DAStatusQueryTimeout )
134- // Wait before first status check
135- time .Sleep (e .DAStatusQueryRetryInterval )
136- for time .Now ().Before (timeoutTime ) {
137- statusRes , err = daClient .GetBlobStatus (ctx , & BlobStatusRequest {
138- RequestId : disperseRes .RequestId ,
139- })
140- if err != nil {
141- fmt .Printf ("Unable to retrieve blob dispersal status, will retry" , "requestID" , base64RequestID , "err" , err )
142- } else if statusRes .Status == BlobStatus_CONFIRMED {
143- // TODO(eigenlayer): As long as fault proofs are disabled, we can move on once a blob is confirmed
144- // but not yet finalized, without further logic. Once fault proofs are enabled, we will need to update
145- // the proposer to wait until the blob associated with an L2 block has been finalized, i.e. the EigenDA
146- // contracts on Ethereum have confirmed the full availability of the blob on EigenDA.
147- batchHeaderHashHex := fmt .Sprintf ("0x%s" , hex .EncodeToString (statusRes .Info .BlobVerificationProof .BatchMetadata .BatchHeaderHash ))
148- fmt .Println ("Successfully dispersed blob to EigenDA" , "requestID" , base64RequestID , "batchHeaderHash" , batchHeaderHashHex )
149- return statusRes .Info , nil
150- } else if statusRes .Status == BlobStatus_UNKNOWN ||
151- statusRes .Status == BlobStatus_FAILED {
152- fmt .Println ("EigenDA blob dispersal failed in processing" , "requestID" , base64RequestID , "err" , err )
153- return nil , fmt .Errorf ("eigenDA blob dispersal failed in processing with reply status %d" , statusRes .Status )
154- } else {
155- fmt .Println ("Still waiting for confirmation from EigenDA" , "requestID" , base64RequestID )
156- }
157-
158- // Wait before first status check
159- time .Sleep (e .DAStatusQueryRetryInterval )
122+ // process response
123+ if * blobStatus == disperser .Failed {
124+ errChan <- fmt .Errorf ("reply status is %d" , blobStatus )
125+ return
160126 }
161127
162- return nil , fmt .Errorf ("timed out getting EigenDA status for dispersed blob key: %s" , base64RequestID )
128+ base64RequestID := base64 .StdEncoding .EncodeToString (requestID )
129+ fmt .Println ("Blob dispersed to EigenDA, now waiting for confirmation" , "requestID" , base64RequestID )
130+
131+ ticker := time .NewTicker (c .internalClient .Config .StatusQueryRetryInterval )
132+ defer ticker .Stop ()
133+
134+ var cancel context.CancelFunc
135+ ctx , cancel = context .WithTimeout (ctx , c .internalClient .Config .StatusQueryTimeout )
136+ defer cancel ()
137+
138+ for {
139+ select {
140+ case <- ctx .Done ():
141+ errChan <- fmt .Errorf ("timed out waiting for EigenDA blob to confirm blob with request id=%s: %w" , base64RequestID , ctx .Err ())
142+ return
143+ case <- ticker .C :
144+ statusRes , err := c .internalClient .Client .GetBlobStatus (ctx , requestID )
145+ if err != nil {
146+ c .internalClient .Log .Error ("Unable to retrieve blob dispersal status, will retry" , "requestID" , base64RequestID , "err" , err )
147+ continue
148+ }
149+
150+ switch statusRes .Status {
151+ case grpcdisperser .BlobStatus_PROCESSING , grpcdisperser .BlobStatus_DISPERSING :
152+ fmt .Println ("Blob submitted, waiting for dispersal from EigenDA" , "requestID" , base64RequestID )
153+ case grpcdisperser .BlobStatus_FAILED :
154+ errChan <- fmt .Errorf ("EigenDA blob dispersal failed in processing, requestID=%s: %w" , base64RequestID , err )
155+ return
156+ case grpcdisperser .BlobStatus_INSUFFICIENT_SIGNATURES :
157+ errChan <- fmt .Errorf ("EigenDA blob dispersal failed in processing with insufficient signatures, requestID=%s: %w" , base64RequestID , err )
158+ return
159+ case grpcdisperser .BlobStatus_CONFIRMED :
160+ fmt .Println ("EigenDA blob confirmed, waiting for finalization" , "requestID" , base64RequestID )
161+ resultChan <- ID {BlobInfo : statusRes .Info , RequestID : string (requestID )}
162+ case grpcdisperser .BlobStatus_FINALIZED :
163+ batchHeaderHashHex := fmt .Sprintf ("0x%s" , hex .EncodeToString (statusRes .Info .BlobVerificationProof .BatchMetadata .BatchHeaderHash ))
164+ fmt .Println ("Successfully dispersed blob to EigenDA" , "requestID" , base64RequestID , "batchHeaderHash" , batchHeaderHashHex )
165+ return
166+ default :
167+ errChan <- fmt .Errorf ("EigenDA blob dispersal failed in processing with reply status %d" , statusRes .Status )
168+ return
169+ }
170+ }
171+ }
163172}
0 commit comments