Skip to content

Commit f5508b7

Browse files
Nick Ficanocursoragent
andcommitted
fix(bugs): crypto trace ids, error mapping, caps, lifecycle, transports
- crypto RNG for trace/span ids (#41) - handler resolved before job.accepted; shared unwind (#47) - lease/runtime watchdog callbacks observe exceptions (#48) - 3-state RevocationOutcome; permanent failures keep credential outstanding (#49) - auto-ack/receive-loop/dispose tasks observed; Completion exposed (#60,#61,#62) - JobErrorMapper parses details fields (#63) - AutoAck docs corrected to event-driven (#64) - ChunkAssembler + WebSocket message size/chunk caps (#65,#66) - Giraffe short-circuits ws/400 branches (#40) - CLI awaits enumerator dispose and honors --stdio (#38,#39) Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 5868c64 commit f5508b7

15 files changed

Lines changed: 256 additions & 114 deletions

File tree

samples/LiteLLM/Program.fs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ type LiteLLMProvisioner(baseUrl: Uri, adminKey: string, http: HttpClient) =
8989
task {
9090
let body = {| key = credentialId |} :> obj
9191
let! _ = postJsonAsync "/key/delete" body ct
92-
return true
92+
return RevocationOutcome.Revoked
9393
}
9494

9595
[<EntryPoint>]

samples/ProvisionedCredentials/Program.fs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ type StaticProvisioner(revoked: HashSet<string>) =
3131

3232
member _.RevokeAsync(credentialId, _ct) =
3333
lock revoked (fun () -> revoked.Add credentialId |> ignore)
34-
Task.FromResult true
34+
Task.FromResult RevocationOutcome.Revoked
3535

3636
[<EntryPoint>]
3737
let main _argv =

src/Arcp.Cli/Program.fs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,11 @@ let private streamEventsAsync (handle: JobHandle) : Task =
8888
else
8989
writeLine (sprintf "event: %s" (JobEventBody.kind enumerator.Current))
9090
finally
91-
ignore (enumerator.DisposeAsync().AsTask())
91+
()
92+
93+
// §38: await disposal so teardown errors surface and complete
94+
// before this function returns.
95+
do! enumerator.DisposeAsync()
9296
}
9397
:> Task
9498

@@ -162,7 +166,12 @@ let main argv =
162166
let env = Environment.GetEnvironmentVariable "ARCP_TOKEN"
163167
if String.IsNullOrEmpty env then None else Some env)
164168

165-
(serveStdio token).GetAwaiter().GetResult()
169+
// §39: honor the --stdio flag instead of ignoring it.
170+
if sub.Contains ServeArgs.Stdio then
171+
(serveStdio token).GetAwaiter().GetResult()
172+
else
173+
errorLine "serve requires a transport flag; only --stdio is currently supported (pass --stdio)"
174+
2
166175
| Send sub :: _ ->
167176
let url = sub.GetResult SendArgs.Url
168177

src/Arcp.Client/ArcpClient.fs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,20 @@ type ArcpClient(transport: ITransport, options: ArcpClientOptions) =
1818
let handles = ConcurrentDictionary<string, JobHandleWriter>()
1919
let mutable sessionCtx: SessionContext option = None
2020
let mutable autoAck: AutoAckScheduler option = None
21+
let mutable receiveLoopTask: Task = Task.CompletedTask
2122
let receiveLoopCts = new CancellationTokenSource()
2223

24+
/// Attach a faulted-state observer so fire-and-forget sends do not
25+
/// become unobserved task exceptions (#60).
26+
let observeTask (t: Task) : unit =
27+
t.ContinueWith(
28+
(fun (tt: Task) ->
29+
if tt.IsFaulted then
30+
eprintfn "[ARCP] background send failed: %O" tt.Exception),
31+
TaskContinuationOptions.OnlyOnFaulted
32+
)
33+
|> ignore
34+
2335
let connectedTcs =
2436
TaskCompletionSource<SessionContext>(TaskCreationOptions.RunContinuationsAsynchronously)
2537

@@ -120,7 +132,7 @@ type ArcpClient(transport: ITransport, options: ArcpClientOptions) =
120132
match sched.OnEvent seq with
121133
| Some toAck ->
122134
let ack: SessionAckPayload = { LastProcessedSeq = toAck }
123-
ignore (sendMessage (Message.SessionAck ack))
135+
observeTask (sendMessage (Message.SessionAck ack))
124136
| None -> ()
125137
| _ -> ()
126138

@@ -172,7 +184,9 @@ type ArcpClient(transport: ITransport, options: ArcpClientOptions) =
172184
handles.Clear()
173185
orphans.Clear())
174186

175-
ignore (enumerator.DisposeAsync().AsTask())
187+
// §62: await enumerator disposal so transport teardown errors
188+
// surface rather than being swallowed on a background thread.
189+
do! enumerator.DisposeAsync()
176190
}
177191
:> Task
178192

@@ -215,7 +229,10 @@ type ArcpClient(transport: ITransport, options: ArcpClientOptions) =
215229
/// receive loop, then resolves with the negotiated session context.
216230
member this.ConnectAsync(ct: CancellationToken) : Task<SessionContext> =
217231
task {
218-
ignore (runReceiveLoop ())
232+
// §61: retain the receive-loop task so its completion (and any
233+
// fault) is observable via `Completion`.
234+
receiveLoopTask <- runReceiveLoop ()
235+
observeTask receiveLoopTask
219236
let env = Codec.toEnvelope (Message.SessionHello(buildHello ()))
220237
let waiter = pending.Register env.Id
221238
do! transport.SendAsync(env, ct)
@@ -377,6 +394,10 @@ type ArcpClient(transport: ITransport, options: ArcpClientOptions) =
377394

378395
member _.Session = sessionCtx
379396

397+
/// Completes when the receive loop terminates (clean EOF, cancellation,
398+
/// or fault). Lets callers observe that the client stopped pumping (#61).
399+
member _.Completion: Task = receiveLoopTask
400+
380401
/// Close the session cleanly with an optional reason.
381402
member this.CloseAsync(reason: string option, ct: CancellationToken) : Task =
382403
task {

src/Arcp.Client/Internal/AutoAck.fs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,11 @@ type AutoAckOptions =
1010
{
1111
/// Maximum number of events to receive before forcing an `ack`.
1212
EveryEvents: int
13-
/// Maximum time between acks. The scheduler flushes if either
14-
/// threshold is reached.
13+
/// Minimum elapsed time before the next event triggers an ack.
14+
/// NOTE: the scheduler is event-driven — it is evaluated only on
15+
/// `OnEvent`, so this interval gates acks on the next event
16+
/// arrival rather than firing on its own timer. `session.ack` is
17+
/// advisory (spec §6.5), so no standalone timer is used.
1518
Interval: TimeSpan
1619
}
1720

@@ -24,12 +27,14 @@ module AutoAckOptions =
2427
}
2528

2629
/// Tracks `last_processed_seq` and decides when to emit a
27-
/// `session.ack` based on event count and elapsed time.
30+
/// `session.ack`. Evaluation is event-driven: `OnEvent` returns the
31+
/// seq to ack when the event-count threshold is hit, or when the
32+
/// configured interval has elapsed *as observed on the current event*.
33+
/// There is no background timer — if events stop arriving, no ack is
34+
/// produced (acks are advisory per spec §6.5).
2835
///
2936
/// The scheduler does NOT send the ack itself — it returns the seq
30-
/// to send so the client can build/send the envelope. Spec §6.5
31-
/// notes ack is purely advisory; this implementation matches the
32-
/// TS SDK's behaviour (ack every 32 events / 250 ms by default).
37+
/// to send so the client can build/send the envelope.
3338
type internal AutoAckScheduler(options: AutoAckOptions, timeProvider: TimeProvider) =
3439
let lockObj = obj ()
3540
let mutable lastSeq: int64 = 0L

src/Arcp.Client/Internal/ChunkAssembler.fs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,35 @@ open ARCP.Core
1111
/// One assembler instance per `result_id`. Chunks MUST arrive in
1212
/// `chunk_seq` order per spec §8.4; out-of-order arrivals raise
1313
/// `InvalidRequest` and the caller is expected to terminate the job.
14-
type internal ChunkAssembler() =
14+
/// Default caps guarding against unbounded result streams (DoS).
15+
[<RequireQualifiedAccess>]
16+
module internal ChunkLimits =
17+
[<Literal>]
18+
let DefaultMaxBytes: int64 = 256L * 1024L * 1024L // 256 MiB
19+
20+
[<Literal>]
21+
let DefaultMaxChunks: int = 1_000_000
22+
23+
type internal ChunkAssembler(maxBytes: int64, maxChunks: int) =
1524
let buffer = ResizeArray<byte[]>()
1625
let mutable expectedSeq: int64 = 0L
26+
let mutable totalBytes: int64 = 0L
1727
let mutable closed = false
1828

29+
new() = ChunkAssembler(ChunkLimits.DefaultMaxBytes, ChunkLimits.DefaultMaxChunks)
30+
1931
/// Append a chunk. Returns `Ok finished` where `finished` is
20-
/// `true` once a `more = false` chunk has arrived.
32+
/// `true` once a `more = false` chunk has arrived. A stream that
33+
/// exceeds the byte/chunk cap is rejected to bound memory (§8.4).
2134
member _.Append(chunkSeq: int64, data: string, encoding: ChunkEncoding, more: bool) : Result<bool, ARCPError> =
2235
if closed then
2336
Error(ARCPError.InvalidRequest("Chunk arrived after stream closed", None))
2437
elif chunkSeq <> expectedSeq then
2538
Error(
2639
ARCPError.InvalidRequest(sprintf "Out-of-order chunk: expected %d, got %d" expectedSeq chunkSeq, None)
2740
)
41+
elif int64 buffer.Count >= int64 maxChunks then
42+
Error(ARCPError.InvalidRequest(sprintf "Result stream exceeded max chunk count (%d)" maxChunks, None))
2843
else
2944
let bytesResult =
3045
try
@@ -42,8 +57,11 @@ type internal ChunkAssembler() =
4257

4358
match bytesResult with
4459
| Error e -> Error e
60+
| Ok bytes when totalBytes + int64 bytes.Length > maxBytes ->
61+
Error(ARCPError.InvalidRequest(sprintf "Result stream exceeded max byte budget (%d)" maxBytes, None))
4562
| Ok bytes ->
4663
buffer.Add bytes
64+
totalBytes <- totalBytes + int64 bytes.Length
4765
expectedSeq <- expectedSeq + 1L
4866

4967
if not more then
Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,50 @@
11
namespace ARCP.Client.Internal
22

33
open System
4+
open System.Text.Json
45
open ARCP.Core
56

67
/// Map a wire `job.error` / `session.error` code string back to an
7-
/// `ARCPError` DU. The reverse direction is `ARCPError.code`. Out
8-
/// of scope: details payloads beyond `message`.
8+
/// `ARCPError` DU, parsing structured fields out of the `details`
9+
/// payload where the DU carries them. The reverse direction is
10+
/// `ARCPError.code`.
911
[<RequireQualifiedAccess>]
1012
module internal JobErrorMapper =
13+
let private prop (details: JsonElement option) (name: string) : JsonElement option =
14+
match details with
15+
| Some d when d.ValueKind = JsonValueKind.Object ->
16+
match d.TryGetProperty name with
17+
| true, v when v.ValueKind <> JsonValueKind.Null -> Some v
18+
| _ -> None
19+
| _ -> None
20+
21+
let private strField details name fallback =
22+
prop details name |> Option.map (fun v -> v.GetString()) |> Option.defaultValue fallback
23+
24+
let private intField details name fallback =
25+
prop details name
26+
|> Option.bind (fun v ->
27+
match v.TryGetInt32() with
28+
| true, n -> Some n
29+
| _ -> None)
30+
|> Option.defaultValue fallback
31+
32+
let private int64Field details name fallback =
33+
prop details name
34+
|> Option.bind (fun v ->
35+
match v.TryGetInt64() with
36+
| true, n -> Some n
37+
| _ -> None)
38+
|> Option.defaultValue fallback
39+
40+
let private dateField details name fallback =
41+
prop details name
42+
|> Option.bind (fun v ->
43+
match v.TryGetDateTimeOffset() with
44+
| true, d -> Some d
45+
| _ -> None)
46+
|> Option.defaultValue fallback
47+
1148
let ofWire
1249
(code: string)
1350
(message: string)
@@ -20,13 +57,14 @@ module internal JobErrorMapper =
2057
| "JOB_NOT_FOUND" -> ARCPError.JobNotFound jobId
2158
| "DUPLICATE_KEY" -> ARCPError.DuplicateKey message
2259
| "AGENT_NOT_AVAILABLE" -> ARCPError.AgentNotAvailable message
23-
| "AGENT_VERSION_NOT_AVAILABLE" -> ARCPError.AgentVersionNotAvailable(message, "")
60+
| "AGENT_VERSION_NOT_AVAILABLE" -> ARCPError.AgentVersionNotAvailable(message, strField details "version" "")
2461
| "CANCELLED" -> ARCPError.Cancelled(Some message)
25-
| "TIMEOUT" -> ARCPError.Timeout 0
62+
| "TIMEOUT" -> ARCPError.Timeout(intField details "timeout_sec" 0)
2663
| "HEARTBEAT_LOST" -> ARCPError.HeartbeatLost
27-
| "LEASE_EXPIRED" -> ARCPError.LeaseExpired DateTimeOffset.MinValue
28-
| "BUDGET_EXHAUSTED" -> ARCPError.BudgetExhausted "USD"
64+
| "LEASE_EXPIRED" -> ARCPError.LeaseExpired(dateField details "expires_at" DateTimeOffset.MinValue)
65+
| "BUDGET_EXHAUSTED" -> ARCPError.BudgetExhausted(strField details "currency" "USD")
2966
| "INVALID_REQUEST" -> ARCPError.InvalidRequest(message, details)
3067
| "UNAUTHENTICATED" -> ARCPError.Unauthenticated message
31-
| "RESUME_WINDOW_EXPIRED" -> ARCPError.ResumeWindowExpired(0L, 0)
68+
| "RESUME_WINDOW_EXPIRED" ->
69+
ARCPError.ResumeWindowExpired(int64Field details "from_seq" 0L, intField details "window_sec" 0)
3270
| _ -> ARCPError.InternalError message

src/Arcp.Client/Transport/WebSocket.fs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ open ARCP.Client
1717
///
1818
/// One text frame per envelope. The receive loop reassembles
1919
/// continuation frames into a single message.
20-
type WebSocketClientTransport(socket: WebSocket, ownsSocket: bool) =
20+
type WebSocketClientTransport(socket: WebSocket, ownsSocket: bool, maxMessageBytes: int64) =
2121
// The BCL WebSocket allows only one outstanding send at a time,
2222
// so we serialise concurrent callers (auto-ack, pong, submit,
2323
// cancel, close) through an async-aware semaphore — a plain
@@ -59,6 +59,16 @@ type WebSocketClientTransport(socket: WebSocket, ownsSocket: bool) =
5959
if result.MessageType = WebSocketMessageType.Close then
6060
closedRemotely <- true
6161
else
62+
if ms.Length + int64 result.Count > maxMessageBytes then
63+
// §66: reject oversized messages before the
64+
// codec ever sees them.
65+
raise (
66+
WebSocketException(
67+
WebSocketError.HeaderError,
68+
sprintf "Inbound message exceeded %d bytes" maxMessageBytes
69+
)
70+
)
71+
6272
ms.Write(buffer, 0, result.Count)
6373
endOfMessage <- result.EndOfMessage
6474

@@ -80,6 +90,11 @@ type WebSocketClientTransport(socket: WebSocket, ownsSocket: bool) =
8090
| Error _ -> return! receiveOne ct
8191
}
8292

93+
/// Defaults to a 256 MiB cap on a single reassembled message to
94+
/// bound memory against a peer streaming continuation frames
95+
/// indefinitely.
96+
new(socket: WebSocket, ownsSocket: bool) = WebSocketClientTransport(socket, ownsSocket, 256L * 1024L * 1024L)
97+
8398
interface ITransport with
8499
member _.SendAsync(env, ct) = sendOne env ct
85100

src/Arcp.Core/Trace.fs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
namespace ARCP.Core
22

33
open System
4+
open System.Security.Cryptography
45

56
/// W3C Trace Context identifiers. Trace ids are 32 hex chars,
67
/// span ids 16 hex chars; the strings are kept opaque on the wire.
@@ -19,22 +20,21 @@ type SpanId =
1920
member this.Value = let (SpanId v) = this in v
2021
override this.ToString() = this.Value
2122

22-
[<RequireQualifiedAccess>]
23-
module TraceId =
24-
let private hex (count: int) =
23+
/// Cryptographically random hex string. Trace/span ids cross trust
24+
/// boundaries, so guess-resistance matters (W3C Trace Context).
25+
[<AutoOpen>]
26+
module private TraceRandom =
27+
let cryptoHex (count: int) =
2528
let buf = Array.zeroCreate<byte> count
26-
Random.Shared.NextBytes(buf)
29+
RandomNumberGenerator.Fill(Span<byte>(buf))
2730
Convert.ToHexString(buf).ToLowerInvariant()
2831

29-
let newId () : TraceId = TraceId(hex 16)
32+
[<RequireQualifiedAccess>]
33+
module TraceId =
34+
let newId () : TraceId = TraceId(cryptoHex 16)
3035
let ofString (s: string) : TraceId = TraceId s
3136

3237
[<RequireQualifiedAccess>]
3338
module SpanId =
34-
let private hex (count: int) =
35-
let buf = Array.zeroCreate<byte> count
36-
Random.Shared.NextBytes(buf)
37-
Convert.ToHexString(buf).ToLowerInvariant()
38-
39-
let newId () : SpanId = SpanId(hex 8)
39+
let newId () : SpanId = SpanId(cryptoHex 8)
4040
let ofString (s: string) : SpanId = SpanId s

src/Arcp.Giraffe/GiraffeEndpoint.fs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,21 @@ module ArcpGiraffe =
1919
fun (next: HttpFunc) (ctx: HttpContext) ->
2020
task {
2121
if ctx.Request.Path.Value <> path then
22+
// Only fall through to the next handler on a path miss.
2223
return! next ctx
2324
elif not ctx.WebSockets.IsWebSocketRequest then
25+
// §40: short-circuit so downstream handlers cannot
26+
// overwrite the 400 status.
2427
ctx.Response.StatusCode <- StatusCodes.Status400BadRequest
25-
return! next ctx
28+
return Some ctx
2629
else
2730
let! socket = ctx.WebSockets.AcceptWebSocketAsync()
2831

2932
let transport =
3033
new WebSocketClientTransport(socket, ownsSocket = true) :> ITransport
3134

3235
do! server.HandleSessionAsync(transport, ctx.RequestAborted)
33-
return! next ctx
36+
// §40: the response is hijacked by the upgrade; do not
37+
// continue the pipeline.
38+
return Some ctx
3439
}

0 commit comments

Comments
 (0)