Skip to content

Commit 3252f66

Browse files
Nick Ficanocursoragent
andcommitted
fix(minors): clean-code, perf, safety, feature gating
- watchdog skips terminal jobs (#89); wire retryable + Unknown arm (#90) - client observes cancellation tokens (#98); reject re-hello (#106) - session-scoped LastEventSeq via Interlocked; subscribed_from in subscriber space (#111) - enforce negotiated features on submit (#114); duplicate-currency budget subset sum (#116) - readEnvelope rejects null/empty envelopes (#117); ArcpResult avoids shadowing FSharp.Core Result (#118) - StaticBearerVerifier constant-time compare (#119); DevMode rejects whitespace (#54) - glob regex cache (#44); literalPrefix/isSubset cleanup (#43,#45); array retryDelays (#55) - single idempotency lookup (#52); credential revoke failure observable (#53) - dead-code removal (#51,#67); volatile stdio closed (#68); typed pending error (#69) - Connected member (#70); optional jobId mapping (#71); lazy Otel source from Version.Sdk (#42) Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent f5508b7 commit 3252f66

20 files changed

Lines changed: 292 additions & 110 deletions

src/Arcp.Client/ArcpClient.fs

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,18 +40,31 @@ type ArcpClient(transport: ITransport, options: ArcpClientOptions) =
4040
| Some s when s.NegotiatedFeatures.Contains flag -> Ok()
4141
| _ -> Error(ARCPError.InvalidRequest(sprintf "Feature %s was not negotiated" flag, None))
4242

43-
let sendEnvelope (env: Envelope) : Task =
43+
let sendEnvelopeCt (env: Envelope) (ct: CancellationToken) : Task =
4444
let env =
4545
match sessionCtx with
4646
| Some s -> Envelope.withSessionId s.SessionId env
4747
| None -> env
4848

49-
transport.SendAsync(env, receiveLoopCts.Token)
49+
transport.SendAsync(env, ct)
50+
51+
let sendEnvelope (env: Envelope) : Task = sendEnvelopeCt env receiveLoopCts.Token
5052

5153
let sendMessage (msg: Message) : Task =
5254
let env = Codec.toEnvelope msg
5355
sendEnvelope env
5456

57+
/// Await a correlated response while honoring the caller's token; on
58+
/// cancellation drop the pending entry so it does not leak (#98).
59+
let awaitResponse (requestId: string) (waiter: Task<Envelope>) (ct: CancellationToken) : Task<Envelope> =
60+
task {
61+
try
62+
return! waiter.WaitAsync(ct)
63+
with :? OperationCanceledException as ex ->
64+
pending.Remove requestId
65+
return raise ex
66+
}
67+
5568
// Job-addressed envelopes can arrive before `SubmitAsync`/
5669
// `SubscribeAsync` register the handle (the receive loop completes
5770
// the request waiter and races ahead). Buffer such envelopes per
@@ -83,7 +96,10 @@ type ArcpClient(transport: ITransport, options: ArcpClientOptions) =
8396
w.ResultSetter.TrySetResult(Ok payload) |> ignore
8497
| Message.JobError payload ->
8598
handles.TryRemove jid |> ignore
86-
let err = JobErrorMapper.ofWire payload.Code payload.Message payload.Details jid
99+
100+
let err =
101+
JobErrorMapper.ofWireWith payload.Code payload.Message payload.Details payload.Retryable (Some jid)
102+
87103
w.Channel.Writer.TryComplete() |> ignore
88104
w.ResultSetter.TrySetResult(Error err) |> ignore
89105
| _ -> ()
@@ -236,7 +252,7 @@ type ArcpClient(transport: ITransport, options: ArcpClientOptions) =
236252
let env = Codec.toEnvelope (Message.SessionHello(buildHello ()))
237253
let waiter = pending.Register env.Id
238254
do! transport.SendAsync(env, ct)
239-
let! welcomeEnv = waiter
255+
let! welcomeEnv = awaitResponse env.Id waiter ct
240256

241257
match Codec.toMessage welcomeEnv with
242258
| Ok(Message.SessionWelcome w) -> return acceptWelcome welcomeEnv w
@@ -269,8 +285,8 @@ type ArcpClient(transport: ITransport, options: ArcpClientOptions) =
269285
| Ok() ->
270286
let env = Codec.toEnvelope (Message.JobSubmit payload)
271287
let waiter = pending.Register env.Id
272-
do! sendEnvelope env
273-
let! acceptedEnv = waiter
288+
do! sendEnvelopeCt env ct
289+
let! acceptedEnv = awaitResponse env.Id waiter ct
274290

275291
match Codec.toMessage acceptedEnv with
276292
| Ok(Message.JobAccepted accepted) ->
@@ -293,8 +309,14 @@ type ArcpClient(transport: ITransport, options: ArcpClientOptions) =
293309
registerHandle accepted.JobId writer
294310
return handle
295311
| Ok(Message.JobError errPayload) ->
312+
// §71: no job id context here — pass None.
296313
let err =
297-
JobErrorMapper.ofWire errPayload.Code errPayload.Message errPayload.Details ""
314+
JobErrorMapper.ofWireWith
315+
errPayload.Code
316+
errPayload.Message
317+
errPayload.Details
318+
errPayload.Retryable
319+
None
298320

299321
return raise (ArcpException err)
300322
| _ -> return raise (ArcpException(ARCPError.InvalidRequest("Expected job.accepted", None)))
@@ -315,8 +337,8 @@ type ArcpClient(transport: ITransport, options: ArcpClientOptions) =
315337

316338
let env = Codec.toEnvelope (Message.JobSubscribe payload)
317339
let waiter = pending.Register env.Id
318-
do! sendEnvelope env
319-
let! subscribedEnv = waiter
340+
do! sendEnvelopeCt env ct
341+
let! subscribedEnv = awaitResponse env.Id waiter ct
320342

321343
// §7.6 / #96: surface subscription denials instead of
322344
// returning a live-looking handle.
@@ -329,7 +351,10 @@ type ArcpClient(transport: ITransport, options: ArcpClientOptions) =
329351
registerHandle jobId.Value writer
330352
return handle
331353
| Ok(Message.SessionError e) ->
332-
return raise (ArcpException(JobErrorMapper.ofWire e.Code e.Message e.Details jobId.Value))
354+
return
355+
raise (
356+
ArcpException(JobErrorMapper.ofWireWith e.Code e.Message e.Details e.Retryable (Some jobId.Value))
357+
)
333358
| _ -> return raise (ArcpException(ARCPError.InvalidRequest("Expected job.subscribed", None)))
334359
}
335360

@@ -365,8 +390,8 @@ type ArcpClient(transport: ITransport, options: ArcpClientOptions) =
365390

366391
let env = Codec.toEnvelope (Message.SessionListJobs payload)
367392
let waiter = pending.Register env.Id
368-
do! sendEnvelope env
369-
let! respEnv = waiter
393+
do! sendEnvelopeCt env ct
394+
let! respEnv = awaitResponse env.Id waiter ct
370395

371396
match Codec.toMessage respEnv with
372397
| Ok(Message.SessionJobs jobs) -> return jobs
@@ -398,6 +423,11 @@ type ArcpClient(transport: ITransport, options: ArcpClientOptions) =
398423
/// or fault). Lets callers observe that the client stopped pumping (#61).
399424
member _.Completion: Task = receiveLoopTask
400425

426+
/// Resolves with the negotiated session once `session.welcome` is
427+
/// received; faults if the handshake fails. A separate handle to the
428+
/// connect result for callers that did not await `ConnectAsync` (#70).
429+
member _.Connected: Task<SessionContext> = connectedTcs.Task
430+
401431
/// Close the session cleanly with an optional reason.
402432
member this.CloseAsync(reason: string option, ct: CancellationToken) : Task =
403433
task {

src/Arcp.Client/Internal/JobErrorMapper.fs

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,16 +45,23 @@ module internal JobErrorMapper =
4545
| _ -> None)
4646
|> Option.defaultValue fallback
4747

48-
let ofWire
48+
/// Map a wire error. `jobId` is the job context when known (used for
49+
/// `JOB_NOT_FOUND`); `None` for non-job dispatch sites (#71).
50+
/// `retryable` is the wire flag, honored for unknown codes (#90).
51+
let ofWireWith
4952
(code: string)
5053
(message: string)
5154
(details: System.Text.Json.JsonElement option)
52-
(jobId: string)
55+
(retryable: bool)
56+
(jobId: string option)
5357
: ARCPError =
5458
match code with
5559
| "PERMISSION_DENIED" -> ARCPError.PermissionDenied(message, details)
5660
| "LEASE_SUBSET_VIOLATION" -> ARCPError.LeaseSubsetViolation(message, details)
57-
| "JOB_NOT_FOUND" -> ARCPError.JobNotFound jobId
61+
| "JOB_NOT_FOUND" ->
62+
match jobId with
63+
| Some j -> ARCPError.JobNotFound j
64+
| None -> ARCPError.InvalidRequest(message, details)
5865
| "DUPLICATE_KEY" -> ARCPError.DuplicateKey message
5966
| "AGENT_NOT_AVAILABLE" -> ARCPError.AgentNotAvailable message
6067
| "AGENT_VERSION_NOT_AVAILABLE" -> ARCPError.AgentVersionNotAvailable(message, strField details "version" "")
@@ -67,4 +74,15 @@ module internal JobErrorMapper =
6774
| "UNAUTHENTICATED" -> ARCPError.Unauthenticated message
6875
| "RESUME_WINDOW_EXPIRED" ->
6976
ARCPError.ResumeWindowExpired(int64Field details "from_seq" 0L, intField details "window_sec" 0)
70-
| _ -> ARCPError.InternalError message
77+
| other -> ARCPError.Unknown(other, message, retryable)
78+
79+
/// Backwards-compatible entry point: jobId as a plain string and a
80+
/// default retryable of false for unknown codes.
81+
let ofWire
82+
(code: string)
83+
(message: string)
84+
(details: System.Text.Json.JsonElement option)
85+
(jobId: string)
86+
: ARCPError =
87+
let jid = if String.IsNullOrEmpty jobId then None else Some jobId
88+
ofWireWith code message details false jid

src/Arcp.Client/Internal/Pending.fs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ type internal PendingRegistry() =
1919
TaskCompletionSource<Envelope>(TaskCreationOptions.RunContinuationsAsynchronously)
2020

2121
if not (pending.TryAdd(requestId, tcs)) then
22-
failwithf "Duplicate pending request id: %s" requestId
22+
// §69: surface as a typed ARCP error rather than a bare exn.
23+
raise (ArcpException(ARCPError.InternalError(sprintf "Duplicate pending request id: %s" requestId)))
2324

2425
tcs.Task
2526

@@ -32,6 +33,10 @@ type internal PendingRegistry() =
3233
true
3334
| _ -> false
3435

36+
/// Drop a pending request without completing it (e.g. when the
37+
/// caller's cancellation token fires).
38+
member _.Remove(requestId: string) : unit = pending.TryRemove requestId |> ignore
39+
3540
/// Fail all pending operations (e.g. on transport close).
3641
member _.FailAll(error: exn) : unit =
3742
for kvp in pending do

src/Arcp.Client/Transport/Memory.fs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,6 @@ type MemoryTransport private (outgoing: Channel<Envelope>, incoming: Channel<Env
1717
task { do! outgoing.Writer.WriteAsync(env, ct).AsTask() } :> Task
1818

1919
member _.Receive(ct) =
20-
let enumerable =
21-
seq {
22-
while not ct.IsCancellationRequested do
23-
yield incoming
24-
}
25-
// Convert the channel reader into IAsyncEnumerable via a helper.
2620
let reader = incoming.Reader
2721

2822
{ new IAsyncEnumerable<Envelope> with

src/Arcp.Client/Transport/Stdio.fs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,12 @@ open ARCP.Client
1111
/// Newline-delimited JSON over a pair of streams. Used for stdio
1212
/// child processes (spec §4: stdio mandatory for in-process children).
1313
type StdioTransport(input: TextReader, output: TextWriter, ownsStreams: bool) =
14+
// §68: `closed` is written by CloseAsync (possibly another thread)
15+
// and read in the receive loop; mark it volatile so the write is
16+
// observed promptly on weak memory models.
17+
[<VolatileField>]
1418
let mutable closed = false
19+
1520
let writeLock = obj ()
1621

1722
interface ITransport with

src/Arcp.Core/Codec.fs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,15 @@ module Codec =
8181
/// Parse a JSON string from the wire into an envelope.
8282
let readEnvelope (json: string) : Result<Envelope, ARCPError> =
8383
try
84-
Ok(Json.deserialize<Envelope> json)
84+
let env = Json.deserialize<Envelope> json
85+
86+
// The JSON literal `null` deserializes to a null record; reject
87+
// it (and missing type/id) so the session loop never NREs (§5, §12).
88+
if obj.ReferenceEquals(env, null) then
89+
Error(ARCPError.InvalidRequest("Envelope must be a JSON object", None))
90+
elif String.IsNullOrEmpty env.Type || String.IsNullOrEmpty env.Id then
91+
Error(ARCPError.InvalidRequest("Envelope is missing required type/id", None))
92+
else
93+
Ok env
8594
with :? JsonException as ex ->
8695
Error(ARCPError.InvalidRequest(sprintf "Malformed envelope: %s" ex.Message, None))

src/Arcp.Core/Errors.fs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ type ARCPError =
3131
| InvalidRequest of message: string * details: JsonElement option
3232
| Unauthenticated of message: string
3333
| InternalError of message: string
34+
/// Forward-compatible arm for wire error codes this SDK version
35+
/// does not model, carrying the wire `retryable` flag verbatim (§12).
36+
| Unknown of code: string * message: string * retryable: bool
3437

3538
[<RequireQualifiedAccess>]
3639
module ARCPError =
@@ -52,6 +55,7 @@ module ARCPError =
5255
| ARCPError.InvalidRequest _ -> "INVALID_REQUEST"
5356
| ARCPError.Unauthenticated _ -> "UNAUTHENTICATED"
5457
| ARCPError.InternalError _ -> "INTERNAL_ERROR"
58+
| ARCPError.Unknown(c, _, _) -> c
5559

5660
/// Human-readable message; suitable for `error.message` on the wire.
5761
let message (e: ARCPError) : string =
@@ -73,13 +77,15 @@ module ARCPError =
7377
| ARCPError.InvalidRequest(m, _) -> m
7478
| ARCPError.Unauthenticated m -> m
7579
| ARCPError.InternalError m -> m
80+
| ARCPError.Unknown(_, m, _) -> m
7681

7782
/// Spec §12: retryable iff a different attempt could succeed.
7883
let retryable (e: ARCPError) : bool =
7984
match e with
8085
| ARCPError.Timeout _
8186
| ARCPError.HeartbeatLost
8287
| ARCPError.InternalError _ -> true
88+
| ARCPError.Unknown(_, _, r) -> r
8389
| _ -> false
8490

8591
let details (e: ARCPError) : JsonElement option =
@@ -105,8 +111,11 @@ type ArcpException(error: ARCPError, ?inner: exn) =
105111
member _.Code = ARCPError.code error
106112
member _.Retryable = ARCPError.retryable error
107113

114+
/// Helpers bridging `Result<_, ARCPError>` and the throwing C#-style
115+
/// surface. Named `ArcpResult` (not `Result`) so it does not shadow
116+
/// FSharp.Core's `Result` for consumers that `open ARCP.Core` (#118).
108117
[<RequireQualifiedAccess>]
109-
module Result =
118+
module ArcpResult =
110119
/// Throw `ArcpException` on `Error`, return the value on `Ok`.
111120
/// The seam between internal `Result<_, ARCPError>` and the
112121
/// public C#-friendly throwing API.

0 commit comments

Comments
 (0)