Skip to content

Commit 6049e48

Browse files
sdk/cascade: split upload vs processing timeouts; add explicit constants; add upload/processing events; add finalize simulation failure event; detailed artefact metrics; docs
1 parent 221abb3 commit 6049e48

8 files changed

Lines changed: 154 additions & 56 deletions

File tree

gen/supernode/action/cascade/service.pb.go

Lines changed: 6 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

proto/supernode/action/cascade/service.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,4 +61,5 @@ enum SupernodeEventType {
6161
ARTEFACTS_STORED = 11;
6262
ACTION_FINALIZED = 12;
6363
ARTEFACTS_DOWNLOADED = 13;
64+
FINALIZE_SIMULATION_FAILED = 14;
6465
}

sdk/README.md

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -365,8 +365,13 @@ The SDK provides an event system to monitor task progress through event subscrip
365365
- `SDKTaskTxHashReceived`: Transaction hash received from supernode
366366
- `SDKTaskCompleted`: Task completed successfully
367367
- `SDKTaskFailed`: Task failed with error
368-
- `SDKUploadTimeout`: Upload phase exceeded time budget and was cancelled
369-
- `SDKProcessingTimeout`: Post-upload processing exceeded time budget and was cancelled
368+
- `SDKConnectionEstablished`: Connection to supernode established
369+
- `SDKUploadStarted`: Upload started (size, chunk size, est chunks)
370+
- `SDKUploadCompleted`: Upload completed (size, chunks, elapsed, avg throughput)
371+
- `SDKUploadFailed`: Upload failed (reason=timeout|send_error|read_error|file_open|file_stat|close_send)
372+
- `SDKProcessingStarted`: Waiting for server progress/final tx hash
373+
- `SDKProcessingFailed`: Processing failed (reason=stream_recv|missing_final_response)
374+
- `SDKProcessingTimeout`: Processing exceeded time budget and was cancelled
370375
- `SDKDownloadAttempt`: Attempting to download from supernode
371376
- `SDKDownloadFailure`: Download attempt failed
372377
- `SDKOutputPathReceived`: File download path received
@@ -384,9 +389,10 @@ The SDK provides an event system to monitor task progress through event subscrip
384389
- `SupernodeRQIDVerified`: RaptorQ ID verified
385390
- `SupernodeFinalizeSimulated`: Finalize transaction simulated successfully (pre-storage)
386391
- `SupernodeArtefactsStored`: Artifacts stored successfully
387-
- `SupernodeActionFinalized`: Action processing finalized
388-
- `SupernodeArtefactsDownloaded`: Artifacts downloaded
389-
- `SupernodeUnknown`: Unknown supernode event
392+
- `SupernodeActionFinalized`: Action processing finalized
393+
- `SupernodeArtefactsDownloaded`: Artifacts downloaded
394+
- `SupernodeFinalizeSimulationFailed`: Finalize action simulation failed
395+
- `SupernodeUnknown`: Unknown supernode event
390396

391397
Note: For backward compatibility, older supernodes may emit the finalize simulation as an `RQID_VERIFIED` event with the message `"finalize action simulation passed"`. The SDK adapter maps this to `SupernodeFinalizeSimulated` automatically.
392398

sdk/adapters/supernodeservice/adapter.go

Lines changed: 85 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -90,13 +90,25 @@ func (a *cascadeAdapter) CascadeSupernodeRegister(ctx context.Context, in *Casca
9090
if err != nil {
9191
a.logger.Error(ctx, "Failed to create register stream",
9292
"error", err)
93+
if in.EventLogger != nil {
94+
in.EventLogger(baseCtx, event.SDKUploadFailed, "upload failed | reason=stream_open", event.EventData{
95+
event.KeyTaskID: in.TaskId,
96+
event.KeyActionID: in.ActionID,
97+
})
98+
}
9399
return nil, err
94100
}
95101

96102
// Open the file for reading
97103
file, err := os.Open(in.FilePath)
98104
if err != nil {
99105
a.logger.Error(ctx, "Failed to open file", "filePath", in.FilePath, "error", err)
106+
if in.EventLogger != nil {
107+
in.EventLogger(baseCtx, event.SDKUploadFailed, "upload failed | reason=file_open", event.EventData{
108+
event.KeyTaskID: in.TaskId,
109+
event.KeyActionID: in.ActionID,
110+
})
111+
}
100112
return nil, fmt.Errorf("failed to open file: %w", err)
101113
}
102114
defer file.Close()
@@ -105,6 +117,12 @@ func (a *cascadeAdapter) CascadeSupernodeRegister(ctx context.Context, in *Casca
105117
fileInfo, err := file.Stat()
106118
if err != nil {
107119
a.logger.Error(ctx, "Failed to get file stats", "filePath", in.FilePath, "error", err)
120+
if in.EventLogger != nil {
121+
in.EventLogger(baseCtx, event.SDKUploadFailed, "upload failed | reason=file_stat", event.EventData{
122+
event.KeyTaskID: in.TaskId,
123+
event.KeyActionID: in.ActionID,
124+
})
125+
}
108126
return nil, fmt.Errorf("failed to get file stats: %w", err)
109127
}
110128
totalBytes := fileInfo.Size()
@@ -128,14 +146,21 @@ func (a *cascadeAdapter) CascadeSupernodeRegister(ctx context.Context, in *Casca
128146
chunkIndex := 0
129147
buffer := make([]byte, chunkSize)
130148

149+
// Emit upload started event
150+
if in.EventLogger != nil {
151+
estChunks := (totalBytes + int64(chunkSize) - 1) / int64(chunkSize)
152+
in.EventLogger(baseCtx, event.SDKUploadStarted,
153+
fmt.Sprintf("upload started | size=%dB chunk_size=%dB est_chunks=%d", totalBytes, chunkSize, estChunks),
154+
event.EventData{event.KeyTaskID: in.TaskId, event.KeyActionID: in.ActionID})
155+
}
156+
157+
uploadStart := time.Now()
158+
131159
// Start upload phase timer
132160
uploadTimer := time.AfterFunc(cascadeUploadTimeout, func() {
133161
a.logger.Error(baseCtx, "Upload phase timeout reached; cancelling stream")
134162
if in.EventLogger != nil {
135-
in.EventLogger(baseCtx, event.SDKUploadTimeout, "upload phase timeout", event.EventData{
136-
event.KeyTaskID: in.TaskId,
137-
event.KeyActionID: in.ActionID,
138-
})
163+
in.EventLogger(baseCtx, event.SDKUploadFailed, "upload failed | reason=timeout", event.EventData{event.KeyTaskID: in.TaskId, event.KeyActionID: in.ActionID})
139164
}
140165
cancel()
141166
})
@@ -149,6 +174,12 @@ func (a *cascadeAdapter) CascadeSupernodeRegister(ctx context.Context, in *Casca
149174
}
150175
if err != nil {
151176
a.logger.Error(ctx, "Failed to read file chunk", "chunkIndex", chunkIndex, "error", err)
177+
if in.EventLogger != nil {
178+
in.EventLogger(baseCtx, event.SDKUploadFailed, fmt.Sprintf("upload failed | reason=read_error chunk=%d", chunkIndex), event.EventData{
179+
event.KeyTaskID: in.TaskId,
180+
event.KeyActionID: in.ActionID,
181+
})
182+
}
152183
return nil, fmt.Errorf("failed to read file chunk: %w", err)
153184
}
154185

@@ -163,6 +194,12 @@ func (a *cascadeAdapter) CascadeSupernodeRegister(ctx context.Context, in *Casca
163194

164195
if err := stream.Send(chunk); err != nil {
165196
a.logger.Error(ctx, "Failed to send data chunk", "chunkIndex", chunkIndex, "error", err)
197+
if in.EventLogger != nil {
198+
in.EventLogger(baseCtx, event.SDKUploadFailed, fmt.Sprintf("upload failed | reason=send_error chunk=%d", chunkIndex), event.EventData{
199+
event.KeyTaskID: in.TaskId,
200+
event.KeyActionID: in.ActionID,
201+
})
202+
}
166203
return nil, fmt.Errorf("failed to send chunk: %w", err)
167204
}
168205

@@ -186,13 +223,25 @@ func (a *cascadeAdapter) CascadeSupernodeRegister(ctx context.Context, in *Casca
186223

187224
if err := stream.Send(metadata); err != nil {
188225
a.logger.Error(ctx, "Failed to send metadata", "TaskId", in.TaskId, "ActionID", in.ActionID, "error", err)
226+
if in.EventLogger != nil {
227+
in.EventLogger(baseCtx, event.SDKUploadFailed, "upload failed | reason=send_metadata", event.EventData{
228+
event.KeyTaskID: in.TaskId,
229+
event.KeyActionID: in.ActionID,
230+
})
231+
}
189232
return nil, fmt.Errorf("failed to send metadata: %w", err)
190233
}
191234

192235
a.logger.Debug(ctx, "Sent metadata", "TaskId", in.TaskId, "ActionID", in.ActionID)
193236

194237
if err := stream.CloseSend(); err != nil {
195238
a.logger.Error(ctx, "Failed to close stream and receive response", "TaskId", in.TaskId, "ActionID", in.ActionID, "error", err)
239+
if in.EventLogger != nil {
240+
in.EventLogger(baseCtx, event.SDKUploadFailed, "upload failed | reason=close_send", event.EventData{
241+
event.KeyTaskID: in.TaskId,
242+
event.KeyActionID: in.ActionID,
243+
})
244+
}
196245
return nil, fmt.Errorf("failed to receive response: %w", err)
197246
}
198247

@@ -201,14 +250,24 @@ func (a *cascadeAdapter) CascadeSupernodeRegister(ctx context.Context, in *Casca
201250
uploadTimer.Stop()
202251
}
203252

253+
// Emit upload completed with throughput metrics
254+
if in.EventLogger != nil {
255+
elapsed := time.Since(uploadStart).Seconds()
256+
mb := float64(bytesRead) / (1024.0 * 1024.0)
257+
avg := 0.0
258+
if elapsed > 0 {
259+
avg = mb / elapsed
260+
}
261+
in.EventLogger(baseCtx, event.SDKUploadCompleted,
262+
fmt.Sprintf("upload complete | size=%dB chunks=%d elapsed=%.2fs avg=%.2fMB/s", totalBytes, chunkIndex, elapsed, avg),
263+
event.EventData{event.KeyTaskID: in.TaskId, event.KeyActionID: in.ActionID})
264+
}
265+
204266
// Processing phase timer starts now (waiting for server streamed responses)
205267
processingTimer := time.AfterFunc(cascadeProcessingTimeout, func() {
206268
a.logger.Error(baseCtx, "Processing phase timeout reached; cancelling stream")
207269
if in.EventLogger != nil {
208-
in.EventLogger(baseCtx, event.SDKProcessingTimeout, "processing phase timeout", event.EventData{
209-
event.KeyTaskID: in.TaskId,
210-
event.KeyActionID: in.ActionID,
211-
})
270+
in.EventLogger(baseCtx, event.SDKProcessingTimeout, "processing timeout", event.EventData{event.KeyTaskID: in.TaskId, event.KeyActionID: in.ActionID})
212271
}
213272
cancel()
214273
})
@@ -218,6 +277,11 @@ func (a *cascadeAdapter) CascadeSupernodeRegister(ctx context.Context, in *Casca
218277
}
219278
}()
220279

280+
// Emit processing started
281+
if in.EventLogger != nil {
282+
in.EventLogger(baseCtx, event.SDKProcessingStarted, "processing started | awaiting server progress", event.EventData{event.KeyTaskID: in.TaskId, event.KeyActionID: in.ActionID})
283+
}
284+
221285
// Handle streaming responses from supernode
222286
var finalResp *cascade.RegisterResponse
223287
for {
@@ -228,12 +292,16 @@ func (a *cascadeAdapter) CascadeSupernodeRegister(ctx context.Context, in *Casca
228292
if err != nil {
229293
// Distinguish timeout phase for clearer error messages
230294
if phaseCtx.Err() != nil {
231-
// Determine which phase we were in by whether upload finished
232295
// At this point, upload is finished; classify as processing timeout/cancel
233296
if phaseCtx.Err() == context.DeadlineExceeded || phaseCtx.Err() == context.Canceled {
234297
return nil, fmt.Errorf("processing timed out or cancelled: %w", phaseCtx.Err())
235298
}
236299
}
300+
if in.EventLogger != nil {
301+
in.EventLogger(baseCtx, event.SDKProcessingFailed,
302+
fmt.Sprintf("processing failed | reason=stream_recv error=%v", err),
303+
event.EventData{event.KeyTaskID: in.TaskId, event.KeyActionID: in.ActionID})
304+
}
237305
return nil, fmt.Errorf("failed to receive server response: %w", err)
238306
}
239307

@@ -268,6 +336,9 @@ func (a *cascadeAdapter) CascadeSupernodeRegister(ctx context.Context, in *Casca
268336
if phaseCtx.Err() != nil {
269337
return nil, fmt.Errorf("processing timed out or cancelled before final response: %w", phaseCtx.Err())
270338
}
339+
if in.EventLogger != nil {
340+
in.EventLogger(baseCtx, event.SDKProcessingFailed, "processing failed | reason=missing_final_response", event.EventData{event.KeyTaskID: in.TaskId, event.KeyActionID: in.ActionID})
341+
}
271342
return nil, fmt.Errorf("no final response with tx_hash received")
272343
}
273344

@@ -380,7 +451,7 @@ func (a *cascadeAdapter) CascadeSupernodeDownload(
380451

381452
// toSdkEvent converts a supernode-side enum value into an internal SDK EventType.
382453
func toSdkEvent(e cascade.SupernodeEventType) event.EventType {
383-
switch e {
454+
switch e {
384455
case cascade.SupernodeEventType_ACTION_RETRIEVED:
385456
return event.SupernodeActionRetrieved
386457
case cascade.SupernodeEventType_ACTION_FEE_VERIFIED:
@@ -405,8 +476,10 @@ func toSdkEvent(e cascade.SupernodeEventType) event.EventType {
405476
return event.SupernodeActionFinalized
406477
case cascade.SupernodeEventType_ARTEFACTS_DOWNLOADED:
407478
return event.SupernodeArtefactsDownloaded
408-
case cascade.SupernodeEventType_FINALIZE_SIMULATED:
409-
return event.SupernodeFinalizeSimulated
479+
case cascade.SupernodeEventType_FINALIZE_SIMULATED:
480+
return event.SupernodeFinalizeSimulated
481+
case cascade.SupernodeEventType_FINALIZE_SIMULATION_FAILED:
482+
return event.SupernodeFinalizeSimulationFailed
410483
default:
411484
return event.SupernodeUnknown
412485
}

sdk/event/types.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,23 @@ type EventType string
1414
// These events are used to track the progress of tasks
1515
// and to notify subscribers about important changes in the system.
1616
const (
17-
SDKTaskStarted EventType = "sdk:started"
17+
SDKTaskStarted EventType = "sdk:started"
1818
SDKSupernodesUnavailable EventType = "sdk:supernodes_unavailable"
1919
SDKSupernodesFound EventType = "sdk:supernodes_found"
2020
SDKRegistrationAttempt EventType = "sdk:registration_attempt"
2121
SDKRegistrationFailure EventType = "sdk:registration_failure"
2222
SDKRegistrationSuccessful EventType = "sdk:registration_successful"
2323
SDKTaskTxHashReceived EventType = "sdk:txhash_received"
2424
SDKTaskCompleted EventType = "sdk:completed"
25-
SDKTaskFailed EventType = "sdk:failed"
26-
// Fine-grained timeouts for cascade registration
27-
SDKUploadTimeout EventType = "sdk:upload_timeout"
28-
SDKProcessingTimeout EventType = "sdk:processing_timeout"
25+
SDKTaskFailed EventType = "sdk:failed"
26+
SDKConnectionEstablished EventType = "sdk:connection_established"
27+
// Upload/processing phase events for cascade registration
28+
SDKUploadStarted EventType = "sdk:upload_started"
29+
SDKUploadCompleted EventType = "sdk:upload_completed"
30+
SDKUploadFailed EventType = "sdk:upload_failed" // reason includes timeout
31+
SDKProcessingStarted EventType = "sdk:processing_started"
32+
SDKProcessingFailed EventType = "sdk:processing_failed"
33+
SDKProcessingTimeout EventType = "sdk:processing_timeout"
2934

3035
SDKDownloadAttempt EventType = "sdk:download_attempt"
3136
SDKDownloadFailure EventType = "sdk:download_failure"
@@ -45,9 +50,10 @@ const (
4550
SupernodeRQIDVerified EventType = "supernode:rqid_verified"
4651
SupernodeFinalizeSimulated EventType = "supernode:finalize_simulated"
4752
SupernodeArtefactsStored EventType = "supernode:artefacts_stored"
48-
SupernodeActionFinalized EventType = "supernode:action_finalized"
49-
SupernodeArtefactsDownloaded EventType = "supernode:artefacts_downloaded"
50-
SupernodeUnknown EventType = "supernode:unknown"
53+
SupernodeActionFinalized EventType = "supernode:action_finalized"
54+
SupernodeArtefactsDownloaded EventType = "supernode:artefacts_downloaded"
55+
SupernodeUnknown EventType = "supernode:unknown"
56+
SupernodeFinalizeSimulationFailed EventType = "supernode:finalize_simulation_failed"
5157
)
5258

5359
// EventData is a map of event data attributes using standardized keys

sdk/task/cascade.go

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -103,15 +103,21 @@ func (t *CascadeTask) registerWithSupernodes(ctx context.Context, supernodes lum
103103
}
104104

105105
func (t *CascadeTask) attemptRegistration(ctx context.Context, _ int, sn lumera.Supernode, factory *net.ClientFactory, req *supernodeservice.CascadeSupernodeRegisterRequest) error {
106-
client, err := factory.CreateClient(ctx, sn)
107-
if err != nil {
108-
return fmt.Errorf("create client %s: %w", sn.CosmosAddress, err)
109-
}
110-
defer client.Close(ctx)
111-
112-
req.EventLogger = func(ctx context.Context, evt event.EventType, msg string, data event.EventData) {
113-
t.LogEvent(ctx, evt, msg, data)
114-
}
106+
client, err := factory.CreateClient(ctx, sn)
107+
if err != nil {
108+
return fmt.Errorf("create client %s: %w", sn.CosmosAddress, err)
109+
}
110+
defer client.Close(ctx)
111+
112+
// Emit connection established event for observability
113+
t.LogEvent(ctx, event.SDKConnectionEstablished, "connection established", event.EventData{
114+
event.KeySupernode: sn.GrpcEndpoint,
115+
event.KeySupernodeAddress: sn.CosmosAddress,
116+
})
117+
118+
req.EventLogger = func(ctx context.Context, evt event.EventType, msg string, data event.EventData) {
119+
t.LogEvent(ctx, evt, msg, data)
120+
}
115121
// Use ctx directly; per-phase timers are applied inside the adapter
116122
resp, err := client.RegisterCascade(ctx, req)
117123
if err != nil {

supernode/services/cascade/events.go

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,19 @@ package cascade
33
type SupernodeEventType int
44

55
const (
6-
SupernodeEventTypeUNKNOWN SupernodeEventType = 0
7-
SupernodeEventTypeActionRetrieved SupernodeEventType = 1
8-
SupernodeEventTypeActionFeeVerified SupernodeEventType = 2
9-
SupernodeEventTypeTopSupernodeCheckPassed SupernodeEventType = 3
10-
SupernodeEventTypeMetadataDecoded SupernodeEventType = 4
11-
SupernodeEventTypeDataHashVerified SupernodeEventType = 5
12-
SupernodeEventTypeInputEncoded SupernodeEventType = 6
13-
SupernodeEventTypeSignatureVerified SupernodeEventType = 7
14-
SupernodeEventTypeRQIDsGenerated SupernodeEventType = 8
15-
SupernodeEventTypeRqIDsVerified SupernodeEventType = 9
16-
SupernodeEventTypeFinalizeSimulated SupernodeEventType = 10
17-
SupernodeEventTypeArtefactsStored SupernodeEventType = 11
18-
SupernodeEventTypeActionFinalized SupernodeEventType = 12
19-
SupernodeEventTypeArtefactsDownloaded SupernodeEventType = 13
6+
SupernodeEventTypeUNKNOWN SupernodeEventType = 0
7+
SupernodeEventTypeActionRetrieved SupernodeEventType = 1
8+
SupernodeEventTypeActionFeeVerified SupernodeEventType = 2
9+
SupernodeEventTypeTopSupernodeCheckPassed SupernodeEventType = 3
10+
SupernodeEventTypeMetadataDecoded SupernodeEventType = 4
11+
SupernodeEventTypeDataHashVerified SupernodeEventType = 5
12+
SupernodeEventTypeInputEncoded SupernodeEventType = 6
13+
SupernodeEventTypeSignatureVerified SupernodeEventType = 7
14+
SupernodeEventTypeRQIDsGenerated SupernodeEventType = 8
15+
SupernodeEventTypeRqIDsVerified SupernodeEventType = 9
16+
SupernodeEventTypeFinalizeSimulated SupernodeEventType = 10
17+
SupernodeEventTypeArtefactsStored SupernodeEventType = 11
18+
SupernodeEventTypeActionFinalized SupernodeEventType = 12
19+
SupernodeEventTypeArtefactsDownloaded SupernodeEventType = 13
20+
SupernodeEventTypeFinalizeSimulationFailed SupernodeEventType = 14
2021
)

0 commit comments

Comments
 (0)