Skip to content

Commit 055659b

Browse files
Nick Ficanocursoragent
andcommitted
fix(runtime/client): pagination, subscribe history, ordering, rotation, client races
- list_jobs stable ordering + cursor pagination + next_cursor; bare-name agent filter; limit applied via take+1 (#109, #91, #108) - job.subscribe history:true replays buffered events; replayed reflects reality (#115) - per-session gate serializes event_seq assignment with send (#112) - credential rotation keeps id outstanding for terminal revocation (#107) - client buffers job-addressed envelopes until handle registered (#95) - SubscribeAsync surfaces denials instead of a dead handle (#96) - receive-loop exit faults pending waiters and completes handles (#97) Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 2f7eece commit 055659b

5 files changed

Lines changed: 211 additions & 87 deletions

File tree

src/Arcp.Client/ArcpClient.fs

Lines changed: 91 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -40,48 +40,70 @@ type ArcpClient(transport: ITransport, options: ArcpClientOptions) =
4040
let env = Codec.toEnvelope msg
4141
sendEnvelope env
4242

43-
let dispatchJobEvent (env: Envelope) (payload: JobEventPayload) : unit =
44-
match env.JobId with
45-
| None -> ()
46-
| Some jid ->
47-
match handles.TryGetValue jid with
48-
| true, w ->
49-
match payload.Body with
50-
| JobEventBody.ResultChunk(rid, chunkSeq, data, enc, more) ->
51-
let assembler = w.ChunkIndex.GetOrCreate rid
52-
53-
match assembler.Append(chunkSeq, data, enc, more) with
54-
| Ok _ -> w.Channel.Writer.TryWrite payload.Body |> ignore
55-
| Error err ->
56-
// Out-of-order or undecodable chunk: tear down
57-
// the handle so callers don't sit on a job that
58-
// will never produce a usable result.
59-
handles.TryRemove jid |> ignore
60-
w.Channel.Writer.TryComplete() |> ignore
61-
w.ResultSetter.TrySetResult(Error err) |> ignore
62-
| other -> w.Channel.Writer.TryWrite other |> ignore
63-
| _ -> ()
64-
65-
let dispatchJobResult (env: Envelope) (payload: JobResultPayload) : unit =
66-
match env.JobId with
67-
| None -> ()
68-
| Some jid ->
69-
match handles.TryRemove jid with
70-
| true, w ->
71-
w.Channel.Writer.TryComplete() |> ignore
72-
w.ResultSetter.TrySetResult(Ok payload) |> ignore
73-
| _ -> ()
43+
// Job-addressed envelopes can arrive before `SubmitAsync`/
44+
// `SubscribeAsync` register the handle (the receive loop completes
45+
// the request waiter and races ahead). Buffer such envelopes per
46+
// job id and flush them in order once the handle is registered, all
47+
// under one gate so registration and delivery cannot interleave (#95).
48+
let dispatchGate = obj ()
49+
let orphans = ConcurrentDictionary<string, ResizeArray<Envelope>>()
50+
51+
let deliver (jid: string) (w: JobHandleWriter) (msg: Message) : unit =
52+
match msg with
53+
| Message.JobEvent payload ->
54+
match payload.Body with
55+
| JobEventBody.ResultChunk(rid, chunkSeq, data, enc, more) ->
56+
let assembler = w.ChunkIndex.GetOrCreate rid
57+
58+
match assembler.Append(chunkSeq, data, enc, more) with
59+
| Ok _ -> w.Channel.Writer.TryWrite payload.Body |> ignore
60+
| Error err ->
61+
// Out-of-order or undecodable chunk: tear down the
62+
// handle so callers don't sit on a job that will never
63+
// produce a usable result.
64+
handles.TryRemove jid |> ignore
65+
w.Channel.Writer.TryComplete() |> ignore
66+
w.ResultSetter.TrySetResult(Error err) |> ignore
67+
| other -> w.Channel.Writer.TryWrite other |> ignore
68+
| Message.JobResult payload ->
69+
handles.TryRemove jid |> ignore
70+
w.Channel.Writer.TryComplete() |> ignore
71+
w.ResultSetter.TrySetResult(Ok payload) |> ignore
72+
| Message.JobError payload ->
73+
handles.TryRemove jid |> ignore
74+
let err = JobErrorMapper.ofWire payload.Code payload.Message payload.Details jid
75+
w.Channel.Writer.TryComplete() |> ignore
76+
w.ResultSetter.TrySetResult(Error err) |> ignore
77+
| _ -> ()
7478

75-
let dispatchJobError (env: Envelope) (payload: JobErrorPayload) : unit =
79+
let dispatchJob (env: Envelope) (msg: Message) : unit =
7680
match env.JobId with
7781
| None -> ()
7882
| Some jid ->
79-
match handles.TryRemove jid with
80-
| true, w ->
81-
let err = JobErrorMapper.ofWire payload.Code payload.Message payload.Details jid
82-
w.Channel.Writer.TryComplete() |> ignore
83-
w.ResultSetter.TrySetResult(Error err) |> ignore
84-
| _ -> ()
83+
lock dispatchGate (fun () ->
84+
match handles.TryGetValue jid with
85+
| true, w -> deliver jid w msg
86+
| _ ->
87+
// Buffer until the handle appears.
88+
let q = orphans.GetOrAdd(jid, (fun _ -> ResizeArray<Envelope>()))
89+
q.Add env)
90+
91+
/// Register a job handle and flush any envelopes that arrived before
92+
/// it was known, preserving order (#95).
93+
let registerHandle (jid: string) (w: JobHandleWriter) : unit =
94+
lock dispatchGate (fun () ->
95+
handles.[jid] <- w
96+
97+
match orphans.TryRemove jid with
98+
| true, q ->
99+
for env in q do
100+
match Codec.toMessage env with
101+
| Ok m ->
102+
match handles.TryGetValue jid with
103+
| true, w2 -> deliver jid w2 m
104+
| _ -> ()
105+
| _ -> ()
106+
| _ -> ())
85107

86108
let onPing (payload: SessionPingPayload) : Task =
87109
let pong: SessionPongPayload =
@@ -128,14 +150,28 @@ type ArcpClient(transport: ITransport, options: ArcpClientOptions) =
128150

129151
match msg with
130152
| Message.SessionPing p -> do! onPing p
131-
| Message.JobEvent p -> dispatchJobEvent env p
132-
| Message.JobResult p -> dispatchJobResult env p
133-
| Message.JobError p -> dispatchJobError env p
153+
| Message.JobEvent _
154+
| Message.JobResult _
155+
| Message.JobError _ -> dispatchJob env msg
134156
| _ -> ()
135157
with
136158
| :? OperationCanceledException -> ()
137159
| ex -> pending.FailAll ex
138160
finally
161+
// §97: on any loop exit (clean EOF or cancellation) fault
162+
// every in-flight request waiter and complete every open
163+
// job handle so callers never hang forever.
164+
let closed = ARCPError.InternalError "ARCP transport closed"
165+
pending.FailAll(ArcpException closed)
166+
167+
lock dispatchGate (fun () ->
168+
for kv in handles do
169+
kv.Value.Channel.Writer.TryComplete() |> ignore
170+
kv.Value.ResultSetter.TrySetResult(Error closed) |> ignore
171+
172+
handles.Clear()
173+
orphans.Clear())
174+
139175
ignore (enumerator.DisposeAsync().AsTask())
140176
}
141177
:> Task
@@ -237,7 +273,7 @@ type ArcpClient(transport: ITransport, options: ArcpClientOptions) =
237273

238274
let credentials = accepted.Credentials |> Option.defaultValue []
239275
let handle, writer = mkHandle jid credentials cancelDelegate
240-
handles.[accepted.JobId] <- writer
276+
registerHandle accepted.JobId writer
241277
return handle
242278
| Ok(Message.JobError errPayload) ->
243279
let err =
@@ -263,14 +299,21 @@ type ArcpClient(transport: ITransport, options: ArcpClientOptions) =
263299
let env = Codec.toEnvelope (Message.JobSubscribe payload)
264300
let waiter = pending.Register env.Id
265301
do! sendEnvelope env
266-
let! _subscribed = waiter
302+
let! subscribedEnv = waiter
267303

268-
let cancelDelegate (_reason, _ct') =
269-
task { return Error(ARCPError.PermissionDenied("Subscribers cannot cancel", None)) }
304+
// §7.6 / #96: surface subscription denials instead of
305+
// returning a live-looking handle.
306+
match Codec.toMessage subscribedEnv with
307+
| Ok(Message.JobSubscribed _) ->
308+
let cancelDelegate (_reason, _ct') =
309+
task { return Error(ARCPError.PermissionDenied("Subscribers cannot cancel", None)) }
270310

271-
let handle, writer = mkHandle jobId [] cancelDelegate
272-
handles.[jobId.Value] <- writer
273-
return handle
311+
let handle, writer = mkHandle jobId [] cancelDelegate
312+
registerHandle jobId.Value writer
313+
return handle
314+
| Ok(Message.SessionError e) ->
315+
return raise (ArcpException(JobErrorMapper.ofWire e.Code e.Message e.Details jobId.Value))
316+
| _ -> return raise (ArcpException(ARCPError.InvalidRequest("Expected job.subscribed", None)))
274317
}
275318

276319
/// Stop receiving events for a subscribed job.

src/Arcp.Runtime/ArcpServer.fs

Lines changed: 96 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -435,29 +435,55 @@ type ArcpServer(options: ArcpServerOptions) =
435435
(ARCPError.InvalidRequest("list_jobs feature not negotiated", None))
436436
ct
437437
else
438-
let filtered =
438+
// §6.6: bare-name agent filter matches any version;
439+
// `name@version` matches that version exactly.
440+
let agentMatches (filterAgent: string) (jobAgent: string) =
441+
jobAgent = filterAgent || jobAgent.StartsWith(filterAgent + "@")
442+
443+
// Stable ordering by JobId (ULIDs are time-ordered), so
444+
// repeated requests page deterministically (§6.6, §109).
445+
let ordered =
439446
jobs.AllForPrincipal ctx.Principal.Id
440447
|> Seq.filter (fun r ->
441448
match req.Filter with
442449
| None -> true
443450
| Some f ->
444451
(f.Status |> Option.map (List.contains r.Status) |> Option.defaultValue true)
445-
&& (f.Agent |> Option.map (fun a -> r.Agent = a) |> Option.defaultValue true)
452+
&& (f.Agent |> Option.map (fun a -> agentMatches a r.Agent) |> Option.defaultValue true)
446453
&& (f.CreatedAfter
447454
|> Option.map (fun ca -> r.CreatedAt >= ca)
448455
|> Option.defaultValue true))
449-
|> Seq.toList
456+
|> Seq.sortBy (fun r -> r.JobId.Value)
450457

451-
let limited =
458+
// Skip past the cursor (the last JobId of the prior page).
459+
let afterCursor =
460+
match req.Cursor with
461+
| Some c -> ordered |> Seq.filter (fun r -> r.JobId.Value > c)
462+
| None -> ordered
463+
464+
let limit =
452465
match req.Limit with
453-
| Some n when n > 0 -> List.truncate n filtered
454-
| _ -> filtered
466+
| Some n when n > 0 -> n
467+
| _ -> Int32.MaxValue
468+
469+
// Take limit+1 to detect whether more pages remain without
470+
// materialising the entire visible set (§91).
471+
let takeCount = if limit = Int32.MaxValue then limit else limit + 1
472+
let page = afterCursor |> Seq.truncate takeCount |> Seq.toList
473+
let hasMore = List.length page > limit
474+
let pageRows = page |> List.truncate limit
475+
476+
let nextCursor =
477+
if hasMore then
478+
pageRows |> List.tryLast |> Option.map (fun r -> r.JobId.Value)
479+
else
480+
None
455481

456482
let resp: SessionJobsPayload =
457483
{
458484
RequestId = requestId
459-
Jobs = limited |> List.map jobs.ToSummary
460-
NextCursor = None
485+
Jobs = pageRows |> List.map jobs.ToSummary
486+
NextCursor = nextCursor
461487
}
462488

463489
let env =
@@ -495,28 +521,68 @@ type ArcpServer(options: ArcpServerOptions) =
495521
(ARCPError.PermissionDenied("Subscribe denied", None))
496522
ct
497523
| Some record ->
498-
jobs.Subscriptions.Subscribe(record.JobId, ctx.SessionId)
499-
500-
let payload: JobSubscribedPayload =
501-
{
502-
JobId = record.JobId.Value
503-
CurrentStatus = record.Status
504-
Agent = record.Agent
505-
Lease = record.Lease
506-
ParentJobId = record.ParentJobId
507-
TraceId = record.TraceId
508-
SubscribedFrom = record.LastEventSeq
509-
Replayed = sub.History |> Option.defaultValue false
510-
}
511-
512-
let env =
513-
Message.JobSubscribed payload
514-
|> Codec.toEnvelope
515-
|> Envelope.withId requestId
516-
|> Envelope.withSessionId ctx.SessionId
517-
|> Envelope.withJobId record.JobId
518-
519-
do! ctx.Transport.SendAsync(env, ct)
524+
let wantHistory = sub.History |> Option.defaultValue false
525+
let fromSeq = sub.FromEventSeq |> Option.defaultValue 0L
526+
527+
// §7.6: gather buffered `job.event`s for replay (from the
528+
// owning session's log) before registering live delivery.
529+
let replayResult =
530+
if wantHistory then
531+
eventLog.Replay(record.SessionId, fromSeq)
532+
|> Result.map (fun entries ->
533+
entries
534+
|> Seq.filter (fun e ->
535+
e.Envelope.JobId = Some record.JobId.Value && e.Envelope.Type = "job.event")
536+
|> Seq.toList)
537+
else
538+
Ok []
539+
540+
match replayResult with
541+
| Error _ ->
542+
// Buffer no longer covers from_event_seq.
543+
do!
544+
EnvelopeOut.respondWithError
545+
ctx
546+
requestId
547+
(ARCPError.ResumeWindowExpired(fromSeq, options.ResumeWindowSec))
548+
ct
549+
| Ok replayEntries ->
550+
jobs.Subscriptions.Subscribe(record.JobId, ctx.SessionId)
551+
552+
let payload: JobSubscribedPayload =
553+
{
554+
JobId = record.JobId.Value
555+
CurrentStatus = record.Status
556+
Agent = record.Agent
557+
Lease = record.Lease
558+
ParentJobId = record.ParentJobId
559+
TraceId = record.TraceId
560+
SubscribedFrom = fromSeq
561+
Replayed = not (List.isEmpty replayEntries)
562+
}
563+
564+
let env =
565+
Message.JobSubscribed payload
566+
|> Codec.toEnvelope
567+
|> Envelope.withId requestId
568+
|> Envelope.withSessionId ctx.SessionId
569+
|> Envelope.withJobId record.JobId
570+
571+
do! ctx.Transport.SendAsync(env, ct)
572+
573+
// Replay buffered events into the subscriber's seq space
574+
// before live events flow.
575+
for e in replayEntries do
576+
match Codec.toMessage e.Envelope with
577+
| Ok(Message.JobEvent p) ->
578+
do!
579+
EnvelopeOut.pushJobEvent
580+
sessions
581+
options.TimeProvider
582+
ctx.SessionId
583+
record.JobId
584+
p.Body
585+
| _ -> ()
520586
}
521587
:> Task
522588

src/Arcp.Runtime/Internal/EnvelopeOut.fs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,21 @@ module internal EnvelopeOut =
2626
task {
2727
match sessions.TryGetValue sid.Value with
2828
| true, sctx ->
29-
let envOut =
30-
if attachSeq then
31-
let entry = sctx.EventLog.Append(sid, env)
32-
entry.Envelope
33-
else
34-
Envelope.withSessionId sid env
29+
// §8.3: hold the per-session gate across seq assignment and
30+
// the send so concurrent emitters cannot reorder events.
31+
do! sctx.SendGate.WaitAsync()
3532

36-
do! sctx.Transport.SendAsync(envOut, CancellationToken.None)
33+
try
34+
let envOut =
35+
if attachSeq then
36+
let entry = sctx.EventLog.Append(sid, env)
37+
entry.Envelope
38+
else
39+
Envelope.withSessionId sid env
40+
41+
do! sctx.Transport.SendAsync(envOut, CancellationToken.None)
42+
finally
43+
sctx.SendGate.Release() |> ignore
3744
| _ -> ()
3845
}
3946
:> Task

src/Arcp.Runtime/Internal/JobLauncher.fs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,12 +79,15 @@ module internal JobLauncher =
7979

8080
let emit (body: JobEventBody) : Task = jobs.EmitEventAsync(record, body)
8181

82-
let rotateCredential (credentialId: string, newValue: string, ct: CancellationToken) : Task =
82+
let rotateCredential (credentialId: string, newValue: string, _ct: CancellationToken) : Task =
8383
task {
8484
// §14: the new value goes only to the submitting session;
8585
// subscribers receive a redacted status (id only).
8686
do! jobs.EmitCredentialRotatedAsync(record, credentialId, newValue)
87-
do! credentialRegistry.RevokeCredentialAsync(credentialId, ct)
87+
// §9.8.2 / #107: the credential id stays outstanding so the
88+
// rotated (new) value is revoked at job termination. We do not
89+
// erase the registry entry here (which would orphan the new
90+
// value), nor revoke by id (which would invalidate it).
8891
}
8992
:> Task
9093

src/Arcp.Runtime/SessionContext.fs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ type internal ServerSessionContext =
2626
ResumeWindowSec: int
2727
mutable Transport: ITransport
2828
EventLog: EventLog
29+
/// Serializes event_seq assignment with the matching transport
30+
/// send so concurrent emitters cannot interleave out of order
31+
/// (spec §8.3).
32+
SendGate: System.Threading.SemaphoreSlim
2933
mutable LastAckedSeq: int64
3034
mutable LastInboundAt: DateTimeOffset
3135
}
@@ -52,6 +56,7 @@ module internal ServerSessionContext =
5256
ResumeWindowSec = resumeWindow
5357
Transport = transport
5458
EventLog = log
59+
SendGate = new System.Threading.SemaphoreSlim(1, 1)
5560
LastAckedSeq = 0L
5661
LastInboundAt = now
5762
}

0 commit comments

Comments
 (0)