Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions p2p/kademlia/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,22 @@ const (
defaultMaxPayloadSize = 200 // MB
errorBusy = "Busy"
maxConcurrentFindBatchValsRequests = 25
defaultExecTimeout = 10 * time.Second
// defaultExecTimeout is for small control-plane RPCs; large payload RPCs
// have explicit entries in execTimeouts below.
defaultExecTimeout = 10 * time.Second
)

// Global map for message type timeouts
var execTimeouts map[int]time.Duration

func init() {
// Initialize the request execution timeout values
// Initialize the request execution timeout values.
// These defaults are intentionally conservative to accommodate slower
// peers and larger payloads. If future deployments consistently see
// responsive nodes, consider reducing the larger RPC timeouts (e.g.,
// BatchStoreData/BatchGetValues to ~45s, BatchFindValues to ~30s) to
// fail fast on degraded nodes. Long, user-dependent operations like
// uploads/downloads are governed at higher layers.
execTimeouts = map[int]time.Duration{
Ping: 5 * time.Second,
FindNode: 10 * time.Second,
Expand Down
17 changes: 14 additions & 3 deletions pkg/common/task/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,15 @@ func NewWorker() *Worker {
}

// cleanupLoop periodically removes tasks that are in a final state for a grace period
// or any task that has been around for too long
func (worker *Worker) cleanupLoop(ctx context.Context) {
const (
cleanupInterval = 30 * time.Second
finalTaskTTL = 2 * time.Minute
// maxTaskAge removes any task entry after this age, regardless of state.
// Keep greater than the largest server-side task envelope (RegisterTimeout ~75m)
// to avoid pruning legitimate long-running tasks from the worker registry.
maxTaskAge = 2 * time.Hour
)

ticker := time.NewTicker(cleanupInterval)
Expand All @@ -129,11 +134,17 @@ func (worker *Worker) cleanupLoop(ctx context.Context) {
kept := worker.tasks[:0]
for _, t := range worker.tasks {
st := t.Status()
if st != nil && st.SubStatus != nil && st.SubStatus.IsFinal() {
if now.Sub(st.CreatedAt) >= finalTaskTTL {
// drop this finalized task
if st != nil {
// Remove any task older than 30 minutes, regardless of state
if now.Sub(st.CreatedAt) >= maxTaskAge {
continue
}
// Also remove final tasks after 2 minutes
if st.SubStatus != nil && st.SubStatus.IsFinal() {
if now.Sub(st.CreatedAt) >= finalTaskTTL {
continue
}
}
}
kept = append(kept, t)
}
Expand Down
65 changes: 61 additions & 4 deletions sdk/adapters/supernodeservice/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,10 +370,14 @@ func (a *cascadeAdapter) CascadeSupernodeDownload(
opts ...grpc.CallOption,
) (*CascadeSupernodeDownloadResponse, error) {

// Use provided context as-is (no correlation IDs)
// Use provided context as-is (no correlation IDs). Add watchdogs:
// - idle timer: reset on every received message (event or chunk).
// - max timer: hard cap for one attempt.
phaseCtx, phaseCancel := context.WithCancel(ctx)
defer phaseCancel()

// 1. Open gRPC stream (server-stream)
stream, err := a.client.Download(ctx, &cascade.DownloadRequest{
stream, err := a.client.Download(phaseCtx, &cascade.DownloadRequest{
ActionId: in.ActionID,
Signature: in.Signature,
}, opts...)
Expand Down Expand Up @@ -401,20 +405,57 @@ func (a *cascadeAdapter) CascadeSupernodeDownload(
chunkIndex int
)

// 3. Receive streamed responses with liveness watchdog
// Start with a generous prep idle timeout; tighten after first message
currentIdle := downloadPrepIdleTimeout
idleTimer := time.AfterFunc(currentIdle, func() {
a.logger.Error(ctx, "download idle timeout; cancelling stream", "action_id", in.ActionID)
phaseCancel()
})
defer idleTimer.Stop()
maxTimer := time.AfterFunc(downloadMaxTimeout, func() {
a.logger.Error(ctx, "download max timeout; cancelling stream", "action_id", in.ActionID)
phaseCancel()
})
defer maxTimer.Stop()
start := time.Now()
lastActivity := start
firstMsg := false

// 3. Receive streamed responses
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
// Classify timeouts for clearer upstream handling
if phaseCtx.Err() != nil {
sinceLast := time.Since(lastActivity)
sinceStart := time.Since(start)
switch {
case sinceLast >= downloadIdleTimeout:
return nil, fmt.Errorf("download idle timeout: %w", context.DeadlineExceeded)
case sinceStart >= downloadMaxTimeout:
return nil, fmt.Errorf("download overall timeout: %w", context.DeadlineExceeded)
}
}
return nil, fmt.Errorf("stream recv: %w", err)
}

switch x := resp.ResponseType.(type) {

// 3a. Progress / event message
case *cascade.DownloadResponse_Event:
// On first message, tighten idle window for active transfer
if !firstMsg {
firstMsg = true
currentIdle = downloadIdleTimeout
}
if idleTimer != nil {
idleTimer.Reset(currentIdle)
}
lastActivity = time.Now()
a.logger.Info(ctx, "supernode event", "event_type", x.Event.EventType, "message", x.Event.Message, "action_id", in.ActionID)

if in.EventLogger != nil {
Expand All @@ -426,10 +467,19 @@ func (a *cascadeAdapter) CascadeSupernodeDownload(
})
}

// 3b. Actual data chunk
// 3b. Actual data chunk
case *cascade.DownloadResponse_Chunk:
data := x.Chunk.Data
if len(data) == 0 {
// Treat empty chunks as keep-alive; reset idle
if !firstMsg {
firstMsg = true
currentIdle = downloadIdleTimeout
}
if idleTimer != nil {
idleTimer.Reset(currentIdle)
}
lastActivity = time.Now()
continue
}
if _, err := outFile.Write(data); err != nil {
Expand All @@ -438,7 +488,14 @@ func (a *cascadeAdapter) CascadeSupernodeDownload(

bytesWritten += int64(len(data))
chunkIndex++

if !firstMsg {
firstMsg = true
currentIdle = downloadIdleTimeout
}
if idleTimer != nil {
idleTimer.Reset(currentIdle)
}
lastActivity = time.Now()
a.logger.Debug(ctx, "received chunk", "chunk_index", chunkIndex, "chunk_size", len(data), "bytes_written", bytesWritten)
}
}
Expand Down
13 changes: 13 additions & 0 deletions sdk/adapters/supernodeservice/timeouts.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,16 @@ const cascadeUploadTimeout = 60 * time.Minute
// cascadeProcessingTimeout bounds the time waiting for server-side processing
// and final response (e.g., tx hash) after upload completes.
const cascadeProcessingTimeout = 10 * time.Minute

// Download timeouts (adapter-level)
// - downloadPrepIdleTimeout: idle window before the first message arrives,
// allowing the server time to prepare (e.g., reconstruct large files).
// - downloadIdleTimeout: cancels if no messages (events/chunks) are received
// after transfer begins; protects against stalls while allowing long transfers.
// - downloadMaxTimeout: hard cap for a single download attempt.
const (
// Give server prep up to ~5m + cushion without client cancelling.
downloadPrepIdleTimeout = 6 * time.Minute
downloadIdleTimeout = 2 * time.Minute
downloadMaxTimeout = 60 * time.Minute
)
163 changes: 46 additions & 117 deletions sdk/docs/cascade-timeouts.md
Original file line number Diff line number Diff line change
@@ -1,134 +1,60 @@
# Cascade Registration Timeouts and Networking

This document explains how timeouts and deadlines are applied across the SDK cascade registration flow, including the current split between upload and processing phases and the relevant client/server defaults.

## Purpose

- Make slow, user‑network–dependent uploads more tolerant without impacting other stages.
- Keep health checks and connection establishment responsive.
- Enable clearer error categorization: upload vs processing.

## TL;DR Defaults

- Upload timeout (adapter): `cascadeUploadTimeout = 60m` — covers client-side file streaming to the supernode.
- Processing timeout (adapter): `cascadeProcessingTimeout = 10m` — covers waiting for server progress/final tx hash after upload completes.
- Health check to supernodes (task): `connectionTimeout = 10s` — per-node probe during discovery.
- gRPC connect (client):
- Adds a default `30s` deadline if caller context has none.
- Connection readiness gate: `ConnWaitTime = 10s` per attempt, with `MaxRetries = 3` and retry backoff.
- ALTS handshake (secure transport): `30s` internal read timeouts (client and server sides).
- Supernode gRPC server:
- No per‑RPC timeout for `Register`/`Download` handlers.
- Keepalive is permissive (idle ping at 1h, ping ack timeout 30m).
- Stream tuning: 16MB message caps, 16MB stream window, 160MB conn window, ~20 concurrent streams.
# Cascade Timeouts — Quick Guide

## Control Flow and Contexts
Concise overview of timeout locations, defaults, and intent.

1) `sdk/action/client.go: ClientImpl.StartCascade(ctx, ...)`
- Forwards `ctx` to Task Manager.
## Defaults

2) `sdk/task/manager.go: ManagerImpl.CreateCascadeTask(...)`
- Detaches from caller: `taskCtx := context.WithCancel(context.Background())`.
- All subsequent work uses `taskCtx` (no deadline by default).
- Register (client → server)
- Upload (SDK adapter): 60m — `cascadeUploadTimeout`
- Processing (SDK adapter): 10m — `cascadeProcessingTimeout`
- Server envelope: 75m — `RegisterTimeout`

3) `sdk/task/cascade.go: CascadeTask.Run(ctx)`
- Validates file size; fetches healthy supernodes; registers with one.
- Download (server → client)
- Server preparation: 5m — `DownloadPrepareTimeout`
- Client per‑attempt: 60m — `downloadTimeout`
- Client liveness (SDK adapter):
- Prep idle (pre‑first‑message): 6m — `downloadPrepIdleTimeout`
- Idle (post‑first‑message): 2m — `downloadIdleTimeout`
- Max attempt: 60m — `downloadMaxTimeout`
- Note: File streaming is not server‑bounded; the client governs transfer.

4) Discovery: `sdk/task/task.go: BaseTask.fetchSupernodes` → `BaseTask.isServing`
- `context.WithTimeout(parent, 10s)` for health probe (create client + `HealthCheck`).
- Discovery / Connect
- Health probe per supernode: 10s — `connectionTimeout`
- gRPC connect default: 30s if caller provides no deadline
- Keepalives: permissive (idle ping ~1h, ack timeout ~30m)

5) Registration attempt: `sdk/task/cascade.go: attemptRegistration`
- Client connect: uses task context (no deadline); gRPC injects a 30s default at connect if needed.
- No outer registration timeout here; the adapter handles per‑phase timers.
## Intent and Ordering

6) RPC staging:
- `sdk/net/impl.go: supernodeClient.RegisterCascade` →
- `sdk/adapters/supernodeservice/adapter.go: CascadeSupernodeRegister` performs client‑stream upload and reads server progress / final tx hash.
- Register: server envelope (75m) > SDK phases (60m + 10m) so the client surfaces errors first when appropriate.
- Download: server prep is tight (5m). Transfer is governed by the client with a generous per‑attempt window and two‑phase idle watchdogs.

## Where Timeouts Come From (by Layer)
## Where They Live

- SDK adapter level (registration RPC):
- `cascadeUploadTimeout` (60m): upload phase timer (file chunks + metadata + CloseSend).
- `cascadeProcessingTimeout` (10m): processing phase timer (receive server progress + final tx hash).
- SDK task level:
- `connectionTimeout` (10s): supernode health checks only.
- SDK adapter (upload/download phases): `sdk/adapters/supernodeservice/timeouts.go`
- SDK task (discovery, per‑attempt download): `sdk/task/timeouts.go`
- Supernode service (server envelopes): `supernode/services/cascade/timeouts.go`
- P2P internal RPCs: `p2p/kademlia/network.go` (fixed per‑message timeouts)

- gRPC client (`pkg/net/grpc/client`):
- `defaultTimeout = 30s`: applied to connect if context lacks a deadline.
- `ConnWaitTime = 10s`, `MaxRetries = 3`, backoff configured; keepalives: 30m/30m.
## Notes

- ALTS handshake (`pkg/net/credentials/alts/handshake`):
- `defaultTimeout = 30s` for handshake read operations (client/server).

- gRPC server (`pkg/net/grpc/server` and supernode runtime):
- No explicit per‑RPC timeouts; generous keepalives; tuned flow control and message sizes for 4MB chunks.

## SDK Constants

Timeout constants are defined in dedicated files for clarity:

- Upload/Processing: `supernode/sdk/adapters/supernodeservice/timeouts.go`
- Connection/health probe: `supernode/sdk/task/timeouts.go`

Notes:
- `BaseTask.isServing` keeps a short 10s budget for snappy health checks.
- Health checks use a 10s budget for snappy discovery.
- gRPC connect/handshake defaults remain unchanged.

## Implementation Details

The split is implemented inside `CascadeSupernodeRegister` where the phases are naturally separated by the client‑stream CloseSend.

1) Create a cancelable context from the inbound one for the stream lifetime:

```go
phaseCtx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := a.client.Register(phaseCtx, opts...)
```

2) Upload phase timer:

```go
uploadTimer := time.AfterFunc(cascadeUploadTimeout, cancel)

// send chunks...
// send metadata...

if err := stream.CloseSend(); err != nil { /* ... */ }
uploadTimer.Stop()
```

3) Processing phase timer (server progress → final tx hash):

```go
processingTimer := time.AfterFunc(cascadeProcessingTimeout, cancel)
defer processingTimer.Stop()

for {
resp, err := stream.Recv()
// handle EOF, errors, progress, final tx hash
}
```

4) Error mapping and events:
- If cancellation occurs during Send loop → classify as upload timeout and emit `SDKUploadTimeout`.
- If cancellation occurs during Recv loop → classify as processing timeout and emit `SDKProcessingTimeout`.
- Surface distinct error messages and publish events accordingly.
## Events (SDK)
- Upload timeout → `SDKUploadFailed`
- Processing timeout → `SDKProcessingTimeout`
- Download failure (timeout/canceled) → `SDKDownloadFailure`

This approach requires no request‑struct changes and preserves existing call sites. It uses a single cancelable context across both phases and phase‑specific timers.

## Additional Notes

- Health checks use `connectionTimeout = 10s` during supernode discovery.
- gRPC client connect behavior: adds a `30s` deadline if none is present, waits up to `ConnWaitTime = 10s` per attempt with retries.
- Downloads use a separate `downloadTimeout = 5m` envelope (per-attempt). On timeout during download, the SDK emits `SDKDownloadFailure` with a reason-coded message `| reason=timeout` and sets `event.KeyMessage = "timeout"`.

## Operational Guidance
## Minimal Tuning Guidance
- Slow client links: keep download attempt at 60m; adjust idle windows if needed.
- Very large inputs: raise `cascadeUploadTimeout` (keep processing modest at 10m).

- For slow client links: raise `cascadeUploadTimeout` (e.g., 30–120m). Keep processing modest (e.g., 5–10m) unless chain finalization is known to stall.
- Server tuning is already generous; no server change required to support longer uploads.
- Telemetry: differentiate upload vs processing timeout in logs and emitted events for better retry behavior and user messaging.
- Retry policy: on upload timeout, prefer retrying with a different supernode; on processing timeout, consider whether the server might still finalize (idempotency depends on service semantics).
## Reference Map
- SDK: `sdk/task/timeouts.go`, `sdk/adapters/supernodeservice/timeouts.go`, `sdk/adapters/supernodeservice/adapter.go`
- Server: `supernode/services/cascade/timeouts.go`, server handlers in `supernode/node/action/server/cascade`
- Network: `pkg/net/grpc/client`, `p2p/kademlia/network.go`

## File/Code Reference Map

Expand All @@ -145,7 +71,8 @@ This approach requires no request‑struct changes and preserves existing call s

- Supernode
- `supernode/supernode/node/supernode/server/server.go` — server options (16MB caps, windows, 20 streams).
- `supernode/supernode/node/action/server/cascade/cascade_action_server.go` — server-side Register/Download handlers (no per‑RPC timeout).
- `supernode/supernode/node/action/server/cascade/cascade_action_server.go` — server-side handlers.
- `supernode/supernode/services/cascade/timeouts.go` — Register (`RegisterTimeout = 75m`) and Download prep (`DownloadPrepareTimeout = 5m`) timeouts.

## Events

Expand All @@ -160,10 +87,12 @@ This document describes how the SDK applies timeouts and deadlines during cascad
- Upload (adapter): `cascadeUploadTimeout = 60m` — client-side streaming of file chunks and metadata.
- Processing (adapter): `cascadeProcessingTimeout = 10m` — wait for server progress and final tx hash after upload completes.
- Discovery (task): `connectionTimeout = 10s` — per-supernode health probe during discovery.
- Download (task): `downloadTimeout = 5m` — envelope for cascade download.
- Download (task): `downloadTimeout = 60m` — per-attempt envelope. Adapter adds
`downloadPrepIdleTimeout = 6m` (pre-first-message), `downloadIdleTimeout = 2m`
(post-first-message), and `downloadMaxTimeout = 60m`.
- gRPC client connect: adds a `30s` deadline if none is present; readiness wait per attempt `ConnWaitTime = 10s` with retries and backoff.
- ALTS handshake: internal `30s` read timeouts on both client and server sides.
- Supernode gRPC server: no per-RPC timeout; keepalive is permissive (idle ping ~1h, ack timeout ~30m); flow-control and message-size tuning supports 4MB chunks.
- Supernode gRPC server: task-level timeouts are applied (Register 75m). Download preparation is bounded to 5m; file streaming is client-governed. Keepalive is permissive (idle ping ~1h, ack timeout ~30m); flow-control and message-size tuning supports 4MB chunks.

## Control Flow

Expand Down
Loading