Skip to content

Commit 5adb891

Browse files
committed
implement cascade action download functionality in action-sdk
1 parent c269fac commit 5adb891

24 files changed

Lines changed: 548 additions & 124 deletions

File tree

gen/supernode/action/cascade/service.pb.go

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,6 @@ github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3
6161
github.com/DataDog/zstd v1.5.5 h1:oWf5W7GtOLgp6bciQYDmhHHjdhYkALu6S/5Ni9ZgSvQ=
6262
github.com/DataDog/zstd v1.5.5/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
6363
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
64-
github.com/LumeraProtocol/lumera v1.5.0 h1:LDPtd155PjG/LKk34x/3vhC9H+J9tHoxwrcwRMG6jzM=
65-
github.com/LumeraProtocol/lumera v1.5.0/go.mod h1:c1M+sjewuCvxw+pznwlspUzenDJI8Y+suKB3RFKS2Wo=
6664
github.com/LumeraProtocol/rq-go v0.2.1 h1:8B3UzRChLsGMmvZ+UVbJsJj6JZzL9P9iYxbdUwGsQI4=
6765
github.com/LumeraProtocol/rq-go v0.2.1/go.mod h1:APnKCZRh1Es2Vtrd2w4kCLgAyaL5Bqrkz/BURoRJ+O8=
6866
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=

pkg/codec/decode.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package codec
22

33
import (
44
"context"
5+
"encoding/json"
56
"fmt"
67
"os"
78
"path/filepath"
@@ -17,8 +18,8 @@ type DecodeRequest struct {
1718
}
1819

1920
type DecodeResponse struct {
20-
Path string
21-
LayoutPath string
21+
Path string
22+
DecodeTmpDir string
2223
}
2324

2425
func (rq *raptorQ) Decode(ctx context.Context, req DecodeRequest) (DecodeResponse, error) {
@@ -52,14 +53,27 @@ func (rq *raptorQ) Decode(ctx context.Context, req DecodeRequest) (DecodeRespons
5253
}
5354
logtrace.Info(ctx, "symbols written to disk", fields)
5455

56+
// ---------- write layout.json ---------- ←★
57+
layoutPath := filepath.Join(symbolsDir, "layout.json")
58+
layoutBytes, err := json.Marshal(req.Layout)
59+
if err != nil {
60+
fields[logtrace.FieldError] = err.Error()
61+
return DecodeResponse{}, fmt.Errorf("marshal layout: %w", err)
62+
}
63+
if err := os.WriteFile(layoutPath, layoutBytes, 0o644); err != nil {
64+
fields[logtrace.FieldError] = err.Error()
65+
return DecodeResponse{}, fmt.Errorf("write layout file: %w", err)
66+
}
67+
logtrace.Info(ctx, "layout.json written", fields)
68+
5569
// Decode
5670
outputPath := filepath.Join(symbolsDir, "output")
57-
if err := processor.DecodeSymbols(symbolsDir, outputPath, ""); err != nil {
71+
if err := processor.DecodeSymbols(symbolsDir, outputPath, layoutPath); err != nil {
5872
fields[logtrace.FieldError] = err.Error()
5973
_ = os.Remove(outputPath)
6074
return DecodeResponse{}, fmt.Errorf("raptorq decode: %w", err)
6175
}
6276

6377
logtrace.Info(ctx, "RaptorQ decoding completed successfully", fields)
64-
return DecodeResponse{Path: outputPath, LayoutPath: ""}, nil
78+
return DecodeResponse{Path: outputPath, DecodeTmpDir: symbolsDir}, nil
6579
}

proto/supernode/action/cascade/service.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ enum SupernodeEventType {
5959
RQID_VERIFIED = 9;
6060
ARTEFACTS_STORED = 10;
6161
ACTION_FINALIZED = 11;
62-
Artefacts_Downloaded = 12;
62+
ARTEFACTS_DOWNLOADED = 12;
6363
}
6464

6565
message HealthCheckRequest {}

sdk/action/client.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ type Client interface {
2525
GetTask(ctx context.Context, taskID string) (*task.TaskEntry, bool)
2626
SubscribeToEvents(ctx context.Context, eventType event.EventType, handler event.Handler) error
2727
SubscribeToAllEvents(ctx context.Context, handler event.Handler) error
28+
DownloadCascade(ctx context.Context, actionID, outputPath string) (string, error)
2829
}
2930

3031
// ClientImpl implements the Client interface
@@ -128,3 +129,25 @@ func (c *ClientImpl) SubscribeToAllEvents(ctx context.Context, handler event.Han
128129

129130
return nil
130131
}
132+
133+
func (c *ClientImpl) DownloadCascade(
134+
ctx context.Context,
135+
actionID, outputPath string,
136+
) (string, error) {
137+
138+
if actionID == "" {
139+
return "", fmt.Errorf("actionID is empty")
140+
}
141+
142+
taskID, err := c.taskManager.CreateDownloadTask(ctx, actionID, outputPath)
143+
if err != nil {
144+
return "", fmt.Errorf("create download task: %w", err)
145+
}
146+
147+
c.logger.Info(ctx, "cascade download task created",
148+
"task_id", taskID,
149+
"action_id", actionID,
150+
)
151+
152+
return taskID, nil
153+
}

sdk/adapters/supernodeservice/adapter.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,98 @@ func (a *cascadeAdapter) CascadeSupernodeRegister(ctx context.Context, in *Casca
164164
}, nil
165165
}
166166

167+
// CascadeSupernodeDownload downloads a file from a supernode gRPC stream
168+
func (a *cascadeAdapter) CascadeSupernodeDownload(
169+
ctx context.Context,
170+
in *CascadeSupernodeDownloadRequest,
171+
opts ...grpc.CallOption,
172+
) (*CascadeSupernodeDownloadResponse, error) {
173+
174+
ctx = net.AddCorrelationID(ctx)
175+
176+
// 1. Open gRPC stream (server-stream)
177+
stream, err := a.client.Download(ctx, &cascade.DownloadRequest{
178+
ActionId: in.ActionID,
179+
}, opts...)
180+
if err != nil {
181+
a.logger.Error(ctx, "failed to create download stream",
182+
"action_id", in.ActionID, "error", err)
183+
return nil, err
184+
}
185+
186+
// 2. Prepare destination file
187+
outFile, err := os.Create(in.OutputPath)
188+
if err != nil {
189+
a.logger.Error(ctx, "failed to create output file",
190+
"path", in.OutputPath, "error", err)
191+
return nil, fmt.Errorf("create output file: %w", err)
192+
}
193+
defer outFile.Close()
194+
195+
var (
196+
bytesWritten int64
197+
chunkIndex int
198+
)
199+
200+
// 3. Receive streamed responses
201+
for {
202+
resp, err := stream.Recv()
203+
if err == io.EOF {
204+
break
205+
}
206+
if err != nil {
207+
return nil, fmt.Errorf("stream recv: %w", err)
208+
}
209+
210+
switch x := resp.ResponseType.(type) {
211+
212+
// 3a. Progress / event message
213+
case *cascade.DownloadResponse_Event:
214+
a.logger.Info(ctx, "supernode event",
215+
"event_type", x.Event.EventType,
216+
"message", x.Event.Message,
217+
"action_id", in.ActionID)
218+
219+
if in.EventLogger != nil {
220+
in.EventLogger(ctx, toSdkEvent(x.Event.EventType), x.Event.Message, event.EventData{
221+
event.KeyActionID: in.ActionID,
222+
event.KeyEventType: x.Event.EventType,
223+
event.KeyMessage: x.Event.Message,
224+
})
225+
}
226+
227+
// 3b. Actual data chunk
228+
case *cascade.DownloadResponse_Chunk:
229+
data := x.Chunk.Data
230+
if len(data) == 0 {
231+
continue
232+
}
233+
if _, err := outFile.Write(data); err != nil {
234+
return nil, fmt.Errorf("write chunk: %w", err)
235+
}
236+
237+
bytesWritten += int64(len(data))
238+
chunkIndex++
239+
240+
a.logger.Debug(ctx, "received chunk",
241+
"chunk_index", chunkIndex,
242+
"chunk_size", len(data),
243+
"bytes_written", bytesWritten)
244+
}
245+
}
246+
247+
a.logger.Info(ctx, "download complete",
248+
"bytes_written", bytesWritten,
249+
"path", in.OutputPath,
250+
"action_id", in.ActionID)
251+
252+
return &CascadeSupernodeDownloadResponse{
253+
Success: true,
254+
Message: "artefact downloaded",
255+
OutputPath: in.OutputPath,
256+
}, nil
257+
}
258+
167259
// toSdkEvent converts a supernode-side enum value into an internal SDK EventType.
168260
func toSdkEvent(e cascade.SupernodeEventType) event.EventType {
169261
switch e {
@@ -189,6 +281,8 @@ func toSdkEvent(e cascade.SupernodeEventType) event.EventType {
189281
return event.SupernodeArtefactsStored
190282
case cascade.SupernodeEventType_ACTION_FINALIZED:
191283
return event.SupernodeActionFinalized
284+
case cascade.SupernodeEventType_ARTEFACTS_DOWNLOADED:
285+
return event.SupernodeArtefactsDownloaded
192286
default:
193287
return event.SupernodeUnknown
194288
}

sdk/adapters/supernodeservice/types.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,21 @@ type CascadeSupernodeRegisterResponse struct {
2828
TxHash string
2929
}
3030

31+
type CascadeSupernodeDownloadRequest struct {
32+
ActionID string
33+
TaskID string
34+
OutputPath string
35+
EventLogger LoggerFunc
36+
}
37+
38+
type CascadeSupernodeDownloadResponse struct {
39+
Success bool
40+
Message string
41+
OutputPath string
42+
}
43+
3144
//go:generate mockery --name=CascadeServiceClient --output=testutil/mocks --outpkg=mocks --filename=cascade_service_mock.go
3245
type CascadeServiceClient interface {
3346
CascadeSupernodeRegister(ctx context.Context, in *CascadeSupernodeRegisterRequest, opts ...grpc.CallOption) (*CascadeSupernodeRegisterResponse, error)
47+
CascadeSupernodeDownload(ctx context.Context, in *CascadeSupernodeDownloadRequest, opts ...grpc.CallOption) (*CascadeSupernodeDownloadResponse, error)
3448
}

sdk/event/keys.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ const (
1414
KeyMessage EventDataKey = "message"
1515
KeyProgress EventDataKey = "progress"
1616
KeyEventType EventDataKey = "event_type"
17+
KeyOutputPath EventDataKey = "output_path"
1718

1819
// Task specific keys
1920
KeyTaskID EventDataKey = "task_id"

sdk/event/types.go

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,21 +23,27 @@ const (
2323
SDKTaskTxHashReceived EventType = "sdk:txhash_received"
2424
SDKTaskCompleted EventType = "sdk:completed"
2525
SDKTaskFailed EventType = "sdk:failed"
26+
27+
SDKDownloadAttempt EventType = "sdk:download_attempt"
28+
SDKDownloadFailure EventType = "sdk:download_failure"
29+
SDKOutputPathReceived EventType = "sdk:output_path_received"
30+
SDKDownloadSuccessful EventType = "sdk:download_successful"
2631
)
2732

2833
const (
29-
SupernodeActionRetrieved EventType = "supernode:action_retrieved"
30-
SupernodeActionFeeVerified EventType = "supernode:action_fee_verified"
31-
SupernodeTopCheckPassed EventType = "supernode:top_check_passed"
32-
SupernodeMetadataDecoded EventType = "supernode:metadata_decoded"
33-
SupernodeDataHashVerified EventType = "supernode:data_hash_verified"
34-
SupernodeInputEncoded EventType = "supernode:input_encoded"
35-
SupernodeSignatureVerified EventType = "supernode:signature_verified"
36-
SupernodeRQIDGenerated EventType = "supernode:rqid_generated"
37-
SupernodeRQIDVerified EventType = "supernode:rqid_verified"
38-
SupernodeArtefactsStored EventType = "supernode:artefacts_stored"
39-
SupernodeActionFinalized EventType = "supernode:action_finalized"
40-
SupernodeUnknown EventType = "supernode:unknown"
34+
SupernodeActionRetrieved EventType = "supernode:action_retrieved"
35+
SupernodeActionFeeVerified EventType = "supernode:action_fee_verified"
36+
SupernodeTopCheckPassed EventType = "supernode:top_check_passed"
37+
SupernodeMetadataDecoded EventType = "supernode:metadata_decoded"
38+
SupernodeDataHashVerified EventType = "supernode:data_hash_verified"
39+
SupernodeInputEncoded EventType = "supernode:input_encoded"
40+
SupernodeSignatureVerified EventType = "supernode:signature_verified"
41+
SupernodeRQIDGenerated EventType = "supernode:rqid_generated"
42+
SupernodeRQIDVerified EventType = "supernode:rqid_verified"
43+
SupernodeArtefactsStored EventType = "supernode:artefacts_stored"
44+
SupernodeActionFinalized EventType = "supernode:action_finalized"
45+
SupernodeArtefactsDownloaded EventType = "supernode:artefacts_downloaded"
46+
SupernodeUnknown EventType = "supernode:unknown"
4147
)
4248

4349
// EventData is a map of event data attributes using standardized keys

sdk/net/client.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ type SupernodeClient interface {
1515
// HealthCheck performs a health check on the supernode
1616
HealthCheck(ctx context.Context) (*grpc_health_v1.HealthCheckResponse, error)
1717

18+
// Download downloads the cascade action file
19+
Download(ctx context.Context, in *supernodeservice.CascadeSupernodeDownloadRequest, opts ...grpc.CallOption) (*supernodeservice.CascadeSupernodeDownloadResponse, error)
20+
1821
// Close releases resources used by the client
1922
Close(ctx context.Context) error
2023
}

0 commit comments

Comments
 (0)