Skip to content

Commit 1efe6fe

Browse files
nficanocursoragent
andcommitted
fix: idempotency, cancel authority, client robustness, lease boundary (#71, #72, #73, #74, #75, #76)
- #71 idempotent job.submit replay no longer re-runs the agent (skip Resolve/RunAsync on replay) - #72 lease glob "/prefix/**" keeps the path-boundary separator so siblings are not authorized - #73 server-rejected job.submit/list_jobs now fault the awaiting client instead of hanging - #74 cancel authority is session-scoped (fail-closed), not principal-scoped - #75 add job.cancelled ack and JOB_NOT_FOUND for unknown jobs - #76 populate session.jobs last_event_seq from a per-job high-water mark Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 43c62d3 commit 1efe6fe

18 files changed

Lines changed: 456 additions & 36 deletions

src/Arcp.Client/ArcpClient.Dispatch.cs

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -130,15 +130,32 @@ private ValueTask RespondToPingAsync(SessionPingPayload p, CancellationToken can
130130

131131
private void PropagateSessionError(SessionErrorPayload err)
132132
{
133+
var jobError = new JobErrorPayload
134+
{
135+
Code = err.Code,
136+
Message = err.Message,
137+
Retryable = err.Retryable,
138+
Detail = err.Detail,
139+
};
140+
133141
foreach (var h in _handles.Values)
134142
{
135-
h.OnError(new JobErrorPayload
136-
{
137-
Code = err.Code,
138-
Message = err.Message,
139-
Retryable = err.Retryable,
140-
Detail = err.Detail,
141-
});
143+
h.OnError(jobError);
144+
}
145+
146+
// A submission rejected before acceptance lives in _pendingSubmits, not _handles, and a
147+
// list_jobs request lives in _listJobsRequests. session.error is not correlated to a
148+
// specific request id, so the safe contract is to fault every outstanding request — leaving
149+
// them pending would hang SubmitAsync/ListJobsAsync until the caller's token fires.
150+
while (_pendingSubmits.TryDequeue(out var pending))
151+
{
152+
pending.OnError(jobError);
153+
}
154+
155+
foreach (var key in _listJobsRequests.Keys)
156+
{
157+
if (_listJobsRequests.TryRemove(key, out var tcs))
158+
tcs.TrySetException(JobHandle.ToException(err.Code, err.Message, err.Detail));
142159
}
143160
}
144161
}

src/Arcp.Client/JobHandle.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,35 @@ internal void OnResult(JobResultPayload payload)
8181

8282
internal void OnError(JobErrorPayload payload)
8383
{
84+
// If the job was rejected before acceptance (e.g. a server session.error for a duplicate
85+
// key or unavailable agent), the awaiter on Accepted must fault rather than hang forever.
86+
// For a post-acceptance terminal error, Accepted is already resolved so this is a no-op.
87+
_accepted.TrySetException(ToException(payload.Code, payload.Message, payload.Detail));
8488
_terminal.TrySetResult(new JobResult(false, null, payload));
8589
_events.Writer.TryComplete();
8690
}
8791

92+
/// <summary>Map a wire error code to the most specific <see cref="ArcpException"/> subtype so
93+
/// callers can <c>catch</c> on the concrete type (e.g. <see cref="DuplicateKeyException"/>).</summary>
94+
internal static ArcpException ToException(string code, string message, string? detail) => code switch
95+
{
96+
ErrorCode.DuplicateKey => new DuplicateKeyException(message, detail),
97+
ErrorCode.AgentNotAvailable => new AgentNotAvailableException(message, detail),
98+
ErrorCode.AgentVersionNotAvailable => new AgentVersionNotAvailableException(message, detail),
99+
ErrorCode.LeaseSubsetViolation => new LeaseSubsetViolationException(message, detail),
100+
ErrorCode.PermissionDenied => new PermissionDeniedException(message, detail),
101+
ErrorCode.JobNotFound => new JobNotFoundException(message, detail),
102+
ErrorCode.InvalidRequest => new InvalidRequestException(message, detail),
103+
ErrorCode.Unauthenticated => new UnauthenticatedException(message, detail),
104+
ErrorCode.BudgetExhausted => new BudgetExhaustedException(message, detail),
105+
ErrorCode.LeaseExpired => new LeaseExpiredException(message, detail),
106+
ErrorCode.ResumeWindowExpired => new ResumeWindowExpiredException(message, detail),
107+
ErrorCode.HeartbeatLost => new HeartbeatLostException(message, detail),
108+
ErrorCode.Timeout => new Arcp.Core.Errors.TimeoutException(message, detail),
109+
ErrorCode.Cancelled => new CancelledException(message, detail),
110+
_ => new ArcpException(code, message, detail),
111+
};
112+
88113
/// <summary>Events.</summary>
89114
public async IAsyncEnumerable<JobEvent> Events([EnumeratorCancellation] CancellationToken cancellationToken = default)
90115
{

src/Arcp.Core/Envelope/MessageTypeRegistry.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public static MessageTypeRegistry CreateCoreCatalog()
5656
r.Register(MessageTypeNames.JobResult, typeof(JobResultPayload));
5757
r.Register(MessageTypeNames.JobError, typeof(JobErrorPayload));
5858
r.Register(MessageTypeNames.JobCancel, typeof(JobCancelPayload));
59+
r.Register(MessageTypeNames.JobCancelled, typeof(JobCancelledPayload));
5960
r.Register(MessageTypeNames.JobSubscribe, typeof(JobSubscribePayload));
6061
r.Register(MessageTypeNames.JobSubscribed, typeof(JobSubscribedPayload));
6162
r.Register(MessageTypeNames.JobUnsubscribe, typeof(JobUnsubscribePayload));
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
using System.Text.Json.Serialization;
3+
4+
namespace Arcp.Core.Messages;
5+
6+
/// <summary>Acknowledgement the runtime sends to the submitting session when a <c>job.cancel</c> is
7+
/// accepted (spec §7.4). The terminal <c>job.error{CANCELLED, final_status:"cancelled"}</c> follows
8+
/// once the run-loop unwinds the agent.</summary>
9+
public sealed record JobCancelledPayload
10+
{
11+
/// <summary>Gets the job id.</summary>
12+
[JsonPropertyName("job_id")] public required string JobId { get; init; }
13+
14+
/// <summary>Gets the reason.</summary>
15+
[JsonPropertyName("reason")] public string? Reason { get; init; }
16+
}

src/Arcp.Core/Messages/MessageTypeNames.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ public static class MessageTypeNames
5151
public const string JobError = "job.error";
5252
/// <summary>Gets the job cancel.</summary>
5353
public const string JobCancel = "job.cancel";
54+
/// <summary>Gets the job cancelled acknowledgement (spec §7.4).</summary>
55+
public const string JobCancelled = "job.cancelled";
5456
/// <summary>Gets the job subscribe.</summary>
5557
public const string JobSubscribe = "job.subscribe";
5658
/// <summary>Gets the job subscribed.</summary>
@@ -63,7 +65,7 @@ public static class MessageTypeNames
6365
{
6466
SessionHello, SessionWelcome, SessionBye, SessionPing, SessionPong, SessionAck,
6567
SessionListJobs, SessionJobs, SessionError, SessionResume,
66-
JobSubmit, JobAccepted, JobEvent, JobResult, JobError, JobCancel,
68+
JobSubmit, JobAccepted, JobEvent, JobResult, JobError, JobCancel, JobCancelled,
6769
JobSubscribe, JobSubscribed, JobUnsubscribe,
6870
}.ToFrozenSet();
6971
}

src/Arcp.Core/PublicAPI.Unshipped.txt

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,19 @@ static Arcp.Core.Messages.ProvisionedCredential.operator ==(Arcp.Core.Messages.P
266266
Arcp.Core.Messages.JobCancelPayload.JobId.init -> void
267267
Arcp.Core.Messages.JobCancelPayload.Reason.get -> string?
268268
Arcp.Core.Messages.JobCancelPayload.Reason.init -> void
269+
Arcp.Core.Messages.JobCancelledPayload
270+
Arcp.Core.Messages.JobCancelledPayload.<Clone>$() -> Arcp.Core.Messages.JobCancelledPayload!
271+
Arcp.Core.Messages.JobCancelledPayload.Equals(Arcp.Core.Messages.JobCancelledPayload? other) -> bool
272+
Arcp.Core.Messages.JobCancelledPayload.JobCancelledPayload() -> void
273+
Arcp.Core.Messages.JobCancelledPayload.JobId.get -> string!
274+
Arcp.Core.Messages.JobCancelledPayload.JobId.init -> void
275+
Arcp.Core.Messages.JobCancelledPayload.Reason.get -> string?
276+
Arcp.Core.Messages.JobCancelledPayload.Reason.init -> void
277+
override Arcp.Core.Messages.JobCancelledPayload.Equals(object? obj) -> bool
278+
override Arcp.Core.Messages.JobCancelledPayload.GetHashCode() -> int
279+
override Arcp.Core.Messages.JobCancelledPayload.ToString() -> string!
280+
static Arcp.Core.Messages.JobCancelledPayload.operator !=(Arcp.Core.Messages.JobCancelledPayload? left, Arcp.Core.Messages.JobCancelledPayload? right) -> bool
281+
static Arcp.Core.Messages.JobCancelledPayload.operator ==(Arcp.Core.Messages.JobCancelledPayload? left, Arcp.Core.Messages.JobCancelledPayload? right) -> bool
269282
Arcp.Core.Messages.JobErrorPayload
270283
Arcp.Core.Messages.JobErrorPayload.<Clone>$() -> Arcp.Core.Messages.JobErrorPayload!
271284
Arcp.Core.Messages.JobErrorPayload.Code.get -> string!
@@ -707,6 +720,7 @@ const Arcp.Core.Messages.EventKinds.ToolCall = "tool_call" -> string!
707720
const Arcp.Core.Messages.EventKinds.ToolResult = "tool_result" -> string!
708721
const Arcp.Core.Messages.MessageTypeNames.JobAccepted = "job.accepted" -> string!
709722
const Arcp.Core.Messages.MessageTypeNames.JobCancel = "job.cancel" -> string!
723+
const Arcp.Core.Messages.MessageTypeNames.JobCancelled = "job.cancelled" -> string!
710724
const Arcp.Core.Messages.MessageTypeNames.JobError = "job.error" -> string!
711725
const Arcp.Core.Messages.MessageTypeNames.JobEvent = "job.event" -> string!
712726
const Arcp.Core.Messages.MessageTypeNames.JobResult = "job.result" -> string!

src/Arcp.Runtime/Job.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ public sealed class Job
2727
private readonly object _eventBufferGate = new();
2828
private readonly List<Envelope> _eventBuffer = [];
2929
private readonly int _eventBufferCapacity;
30+
private long _lastEmittedSeq;
3031

3132
/// <summary>Gets the job id.</summary>
3233
public JobId JobId { get; }
@@ -192,6 +193,18 @@ public async ValueTask EmitEventAsync(string kind, object body, CancellationToke
192193
await _emit(env, cancellationToken).ConfigureAwait(false);
193194
}
194195

196+
/// <summary>Highest <c>event_seq</c> high-water mark emitted by this job, or <see langword="null"/>
197+
/// if it has not emitted any events yet. Surfaced in <c>session.jobs</c> (spec §6.6) so a dashboard
198+
/// can decide where to subscribe from.</summary>
199+
internal long? LastEmittedSeq
200+
{
201+
get
202+
{
203+
var seq = Interlocked.Read(ref _lastEmittedSeq);
204+
return seq > 0 ? seq : null;
205+
}
206+
}
207+
195208
private void BufferEvent(Envelope env)
196209
{
197210
// Spec §7.6: bounded per-job history so a later subscriber with `history: true`
@@ -205,6 +218,9 @@ private void BufferEvent(Envelope env)
205218
_eventBuffer.RemoveRange(0, _eventBuffer.Count - _eventBufferCapacity);
206219
}
207220
}
221+
222+
// Spec §6.6: track a monotonic per-job high-water mark for last_event_seq in the listing.
223+
Interlocked.Increment(ref _lastEmittedSeq);
208224
}
209225

210226
/// <summary>Snapshot of all events buffered for replay on a new subscription.</summary>

src/Arcp.Runtime/JobManager.Listing.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ private static List<Job> ApplyFilter(List<Job> jobs, JobListFilter? filter)
6060
ParentJobId = j.ParentJobId,
6161
CreatedAt = j.CreatedAt,
6262
TraceId = j.TraceId?.Value,
63+
LastEventSeq = j.LastEmittedSeq,
6364
};
6465

6566
private static string MapStatus(JobStatus s) => s switch

src/Arcp.Runtime/JobManager.cs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public bool TryGet(JobId id, out Job? job)
8585
/// <summary>Submit a job. The caller (SessionState) hands in the envelope; this method returns
8686
/// the <see cref="Job"/> to run asynchronously plus the <c>job.accepted</c> payload.
8787
/// <paramref name="inboundTraceId"/> propagates the envelope's <c>trace_id</c> per spec §11.</summary>
88-
public async Task<(Job Job, JobAcceptedPayload Accepted)> SubmitAsync(
88+
public async Task<(Job Job, JobAcceptedPayload Accepted, bool IsReplay)> SubmitAsync(
8989
JobSubmitPayload submit,
9090
SessionId sessionId,
9191
string? submitterPrincipal,
@@ -113,7 +113,9 @@ public bool TryGet(JobId id, out Job? job)
113113
}
114114
if (_jobs.TryGetValue(existingRecord.JobId, out var existing))
115115
{
116-
return (existing, BuildAccepted(existing));
116+
// Spec §7.2: an idempotent replay returns the *same* job.accepted and MUST
117+
// NOT re-run the agent. Flag it so the caller skips Resolve/RunAsync.
118+
return (existing, BuildAccepted(existing), true);
117119
}
118120
}
119121
else
@@ -153,7 +155,7 @@ public bool TryGet(JobId id, out Job? job)
153155
_idempotency[idemKey] = new IdempotencyRecord(jobId, fingerprint, _time.GetUtcNow());
154156
}
155157

156-
return (job, BuildAccepted(job));
158+
return (job, BuildAccepted(job), false);
157159
}
158160

159161
private void AssertChildLeaseIsSubset(string parentJobId, Lease child, LeaseConstraints? childConstraints)
@@ -440,15 +442,19 @@ private async Task RunRuntimeWatchdog(Job job, TimeSpan limit, CancellationToken
440442
private static bool IsTerminal(JobStatus s) =>
441443
s is JobStatus.Success or JobStatus.Error or JobStatus.Cancelled or JobStatus.TimedOut;
442444

443-
/// <summary>Cancel a running job. Only the original submitter may cancel; subscribers may not (spec §7.6).</summary>
444-
public bool Cancel(JobId jobId, string? requesterPrincipal, string? reason)
445+
/// <summary>Cancel a running job. Cancellation authority is scoped to the *submitting session*
446+
/// (spec §7.6, §13.3, §14): only the session that submitted the job may cancel it. Subscription —
447+
/// even from another session of the same principal — does NOT confer cancel authority. Returns
448+
/// <see langword="false"/> if the job does not exist; throws <see cref="PermissionDeniedException"/>
449+
/// when the requesting session is not the submitter.</summary>
450+
public bool Cancel(JobId jobId, SessionId requesterSession, string? reason)
445451
{
446452
if (!_jobs.TryGetValue(jobId, out var job)) return false;
447-
// Spec §7.6: subscription does NOT grant cancel authority; only submitter may cancel.
448-
if (requesterPrincipal is not null && job.SubmitterPrincipal is not null &&
449-
!string.Equals(requesterPrincipal, job.SubmitterPrincipal, StringComparison.Ordinal))
453+
// Fail closed: compare the submitting session, never the principal. A null/foreign requester
454+
// must not be able to bypass the check (spec §14: "Subscription MUST NOT confer cancel authority").
455+
if (!job.SessionId.Equals(requesterSession))
450456
{
451-
throw new PermissionDeniedException("Subscribers MUST NOT cancel jobs (spec §7.6)");
457+
throw new PermissionDeniedException("Only the submitting session may cancel a job (spec §7.6, §14)");
452458
}
453459
job.CancellationSource.Cancel();
454460
return true;

src/Arcp.Runtime/Leases/LeaseManager.cs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public void AssertSubset(Lease parent, Lease child, IReadOnlyDictionary<string,
8585
if (childExp > parentExp)
8686
throw new LeaseSubsetViolationException("Child lease_constraints.expires_at MUST NOT exceed parent's (spec §9.4)");
8787
}
88-
// No child constraints child implicitly inherits parent expiry. (spec §9.4)
88+
// No child constraints âÿÿ child implicitly inherits parent expiry. (spec §9.4)
8989
}
9090
}
9191

@@ -153,8 +153,12 @@ internal static bool GlobMatch(string input, string pattern)
153153
// Convert to regex-ish prefix/suffix.
154154
if (pattern.EndsWith("/**", StringComparison.Ordinal))
155155
{
156-
var prefix = pattern[..^3];
157-
return input.StartsWith(prefix, StringComparison.Ordinal);
156+
// Keep the trailing separator so the match respects the path boundary (spec ?9.3):
157+
// "/workspace/myapp/**" authorizes "/workspace/myapp" itself and anything strictly
158+
// beneath "/workspace/myapp/", but NOT siblings like "/workspace/myapp-private/x".
159+
var dir = pattern[..^2]; // "/workspace/myapp/"
160+
return input.StartsWith(dir, StringComparison.Ordinal)
161+
|| input.Equals(dir[..^1], StringComparison.Ordinal); // the directory itself
158162
}
159163
if (pattern.EndsWith("*", StringComparison.Ordinal))
160164
{

0 commit comments

Comments
 (0)