Skip to content

Commit bfb3e90

Browse files
nficanocursoragent
andcommitted
fix: spec-conformance, concurrency, and security audit fixes (#37, #38, #39, #40, #41, #42, #43, #45, #46, #47, #67, #68)
Runtime / spec-conformance: - #37 root running jobs at a runtime-scoped token so session teardown (heartbeat loss, graceful close, transport drop) no longer terminates in-flight jobs (spec §6.4, §6.7) - #41 add session.close/session.closed wire types; the runtime acks a graceful close with session.closed (session.bye kept as a deprecated alias) (spec §6.7) - #40 dispatch now surfaces session.error{INTERNAL_ERROR} for unexpected exceptions (spec §12) - #46 advertise model.use independently of credential provisioning (spec §9.7) Event delivery / ordering: - #39 serialize event_seq assignment with the outbound enqueue via a per-session emit gate so wire order is strictly monotonic under concurrent emitters (spec §8.3) - #38 subscriber fan-out is back-pressure-aware: on a full channel the subscription is torn down deterministically instead of silently dropping an already-sequenced event (spec §8.3) - #44 make the subscribe history/live-fan-out boundary exact via an atomic register+snapshot and a per-job event index, so a mid-stream subscriber sees each event exactly once (spec §7.6) Authorization / security: - #42 gate lease operations on remaining budget (BUDGET_EXHAUSTED) before the pattern check (spec §9.6) - #43 deny-by-default for uncovered tool.call/agent.delegate, with an explicit PermissiveUnleasedOperations opt-in (spec §9.3) - #45 list_jobs fails closed: an empty/absent principal sees only what the policy permits (spec §6.6, §14) Performance / correctness: - #67 keyset pagination over (created_at, job_id): stable ties and page work bounded to limit+1 - #68 AgentRegistry no longer exposes its mutable version dictionary; ToInventory snapshots under lock - #47 client detects event_seq gaps and raises a broken-session signal (spec §8.3) Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 1efe6fe commit bfb3e90

20 files changed

Lines changed: 459 additions & 127 deletions

src/Arcp.Client/ArcpClient.Dispatch.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,15 @@ private async Task ReaderLoop(CancellationToken cancellationToken)
1818
{
1919
await foreach (var env in _transport.ReceiveAsync(cancellationToken).ConfigureAwait(false))
2020
{
21-
if (env.EventSeq is { } seq) Interlocked.Exchange(ref _lastReceivedSeq, seq);
21+
if (env.EventSeq is { } seq)
22+
{
23+
// Spec §8.3: event_seq is strictly monotonic and gap-free. If the new seq skips
24+
// the expected successor, surface a detectable broken-session signal instead of
25+
// silently accepting the gap.
26+
var prev = Interlocked.Read(ref _lastReceivedSeq);
27+
if (prev > 0 && seq > prev + 1) OnEventSeqGap(prev + 1, seq);
28+
if (seq > prev) Interlocked.Exchange(ref _lastReceivedSeq, seq);
29+
}
2230
await DispatchAsync(env, cancellationToken).ConfigureAwait(false);
2331
}
2432
}

src/Arcp.Client/ArcpClient.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,22 @@ public sealed partial class ArcpClient : IAsyncDisposable
5353
/// <summary>Gets the last received seq.</summary>
5454
public long LastReceivedSeq => Interlocked.Read(ref _lastReceivedSeq);
5555

56+
/// <summary>True once an inbound <c>event_seq</c> has skipped the expected next value, indicating
57+
/// the session stream has a gap and SHOULD be treated as broken (and resumed once resume is
58+
/// wired) per spec §8.3.</summary>
59+
public bool IsSessionBroken { get; private set; }
60+
61+
/// <summary>Raised when an inbound <c>event_seq</c> skips the expected successor (spec §8.3). The
62+
/// arguments are <c>(expectedSeq, receivedSeq)</c>. Handlers run on the reader loop; keep them
63+
/// fast and non-throwing.</summary>
64+
public event Action<long, long>? EventSeqGapDetected;
65+
66+
private void OnEventSeqGap(long expected, long received)
67+
{
68+
IsSessionBroken = true;
69+
EventSeqGapDetected?.Invoke(expected, received);
70+
}
71+
5672
/// <summary>Initializes a new instance of the <see cref="ArcpClient"/> class.</summary>
5773
public ArcpClient(ITransport transport, ArcpClientOptions options)
5874
{

src/Arcp.Client/PublicAPI.Unshipped.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ Arcp.Client.ArcpClient.CancelJobAsync(Arcp.Core.Ids.JobId jobId, string? reason
77
Arcp.Client.ArcpClient.ConnectAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
88
Arcp.Client.ArcpClient.DisposeAsync() -> System.Threading.Tasks.ValueTask
99
Arcp.Client.ArcpClient.EffectiveFeatures.get -> System.Collections.Generic.IReadOnlyList<string!>!
10+
Arcp.Client.ArcpClient.EventSeqGapDetected -> System.Action<long, long>?
1011
Arcp.Client.ArcpClient.HeartbeatIntervalSec.get -> int?
12+
Arcp.Client.ArcpClient.IsSessionBroken.get -> bool
1113
Arcp.Client.ArcpClient.LastReceivedSeq.get -> long
1214
Arcp.Client.ArcpClient.ListJobsAsync(Arcp.Core.Messages.JobListFilter? filter = null, int? limit = null, string? cursor = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<Arcp.Core.Messages.SessionJobsPayload!>!
1315
Arcp.Client.ArcpClient.ResumeToken.get -> string?

src/Arcp.Core/Envelope/Envelope.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,11 @@ public sealed record Envelope
6060
/// unknown fields") so they round-trip without loss.</summary>
6161
[JsonExtensionData]
6262
public IDictionary<string, JsonElement>? Extensions { get; init; }
63+
64+
/// <summary>Runtime-only, NON-serialized per-job monotonic index. Used by the runtime to make the
65+
/// <c>job.subscribe</c> history/live-fan-out boundary exact (spec §7.6) — a subscriber drops a
66+
/// fanned-out event whose index was already covered by its replayed history. Never transmitted
67+
/// on the wire.</summary>
68+
[JsonIgnore]
69+
public long? JobEventIndex { get; init; }
6370
}

src/Arcp.Core/Envelope/MessageTypeRegistry.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ public static MessageTypeRegistry CreateCoreCatalog()
4141
var r = new MessageTypeRegistry();
4242
r.Register(MessageTypeNames.SessionHello, typeof(SessionHelloPayload));
4343
r.Register(MessageTypeNames.SessionWelcome, typeof(SessionWelcomePayload));
44+
r.Register(MessageTypeNames.SessionClose, typeof(SessionByePayload));
45+
r.Register(MessageTypeNames.SessionClosed, typeof(SessionByePayload));
4446
r.Register(MessageTypeNames.SessionBye, typeof(SessionByePayload));
4547
r.Register(MessageTypeNames.SessionPing, typeof(SessionPingPayload));
4648
r.Register(MessageTypeNames.SessionPong, typeof(SessionPongPayload));

src/Arcp.Core/Messages/MessageTypeNames.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,12 @@ public static class MessageTypeNames
1616
public const string SessionHello = "session.hello";
1717
/// <summary>Gets the session welcome.</summary>
1818
public const string SessionWelcome = "session.welcome";
19-
/// <summary>Gets the session bye.</summary>
19+
/// <summary>Client-sent graceful session close (spec §6.7).</summary>
20+
public const string SessionClose = "session.close";
21+
/// <summary>Runtime-sent acknowledgement of <see cref="SessionClose"/> (spec §6.7).</summary>
22+
public const string SessionClosed = "session.closed";
23+
/// <summary>Gets the session bye. Deprecated alias for <see cref="SessionClose"/> kept for
24+
/// back-compat with pre-1.1 peers; the runtime treats it like <c>session.close</c> (spec §6.7).</summary>
2025
public const string SessionBye = "session.bye";
2126
/// <summary>Gets the session ping.</summary>
2227
public const string SessionPing = "session.ping";
@@ -63,7 +68,7 @@ public static class MessageTypeNames
6368
/// <summary>Gets the all.</summary>
6469
public static readonly FrozenSet<string> All = new HashSet<string>(StringComparer.Ordinal)
6570
{
66-
SessionHello, SessionWelcome, SessionBye, SessionPing, SessionPong, SessionAck,
71+
SessionHello, SessionWelcome, SessionClose, SessionClosed, SessionBye, SessionPing, SessionPong, SessionAck,
6772
SessionListJobs, SessionJobs, SessionError, SessionResume,
6873
JobSubmit, JobAccepted, JobEvent, JobResult, JobError, JobCancel, JobCancelled,
6974
JobSubscribe, JobSubscribed, JobUnsubscribe,

src/Arcp.Core/PublicAPI.Unshipped.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -651,6 +651,8 @@ Arcp.Core.Wire.Envelope.Extensions.get -> System.Collections.Generic.IDictionary
651651
Arcp.Core.Wire.Envelope.Extensions.init -> void
652652
Arcp.Core.Wire.Envelope.Id.get -> string!
653653
Arcp.Core.Wire.Envelope.Id.init -> void
654+
Arcp.Core.Wire.Envelope.JobEventIndex.get -> long?
655+
Arcp.Core.Wire.Envelope.JobEventIndex.init -> void
654656
Arcp.Core.Wire.Envelope.JobId.get -> string?
655657
Arcp.Core.Wire.Envelope.JobId.init -> void
656658
Arcp.Core.Wire.Envelope.ParentSpanId.get -> string?
@@ -730,6 +732,8 @@ const Arcp.Core.Messages.MessageTypeNames.JobSubscribed = "job.subscribed" -> st
730732
const Arcp.Core.Messages.MessageTypeNames.JobUnsubscribe = "job.unsubscribe" -> string!
731733
const Arcp.Core.Messages.MessageTypeNames.SessionAck = "session.ack" -> string!
732734
const Arcp.Core.Messages.MessageTypeNames.SessionBye = "session.bye" -> string!
735+
const Arcp.Core.Messages.MessageTypeNames.SessionClose = "session.close" -> string!
736+
const Arcp.Core.Messages.MessageTypeNames.SessionClosed = "session.closed" -> string!
733737
const Arcp.Core.Messages.MessageTypeNames.SessionError = "session.error" -> string!
734738
const Arcp.Core.Messages.MessageTypeNames.SessionHello = "session.hello" -> string!
735739
const Arcp.Core.Messages.MessageTypeNames.SessionJobs = "session.jobs" -> string!

src/Arcp.Runtime/Agents/AgentRegistry.cs

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -63,21 +63,24 @@ public void SetDefaultVersion(string name, string version)
6363

6464
/// <summary>To inventory.</summary>
6565
public IReadOnlyList<AgentInventoryEntry> ToInventory() =>
66-
_byName.Values.Select(e => new AgentInventoryEntry
66+
_byName.Values.Select(e =>
6767
{
68-
Name = e.Name,
69-
Versions = e.Versions.Count > 0 ? e.Versions.Keys.ToArray() : null,
70-
Default = e.DefaultVersion,
68+
var (versions, defaultVersion) = e.SnapshotInventoryFields();
69+
return new AgentInventoryEntry
70+
{
71+
Name = e.Name,
72+
Versions = versions,
73+
Default = defaultVersion,
74+
};
7175
}).ToArray();
7276

7377
private sealed class AgentEntry
7478
{
7579
private readonly object _gate = new();
80+
private readonly Dictionary<string, IAgent> _versions = new(StringComparer.Ordinal);
7681

7782
public string Name { get; }
7883

79-
public Dictionary<string, IAgent> Versions { get; } = new(StringComparer.Ordinal);
80-
8184
public string? DefaultVersion { get; private set; }
8285

8386
public IAgent? UnversionedAgent { get; private set; }
@@ -86,14 +89,14 @@ private sealed class AgentEntry
8689

8790
public bool TryGetVersion(string v, out IAgent? agent)
8891
{
89-
lock (_gate) return Versions.TryGetValue(v, out agent);
92+
lock (_gate) return _versions.TryGetValue(v, out agent);
9093
}
9194

9295
public void AddVersion(string version, IAgent agent)
9396
{
9497
lock (_gate)
9598
{
96-
Versions[version] = agent;
99+
_versions[version] = agent;
97100
DefaultVersion ??= version;
98101
}
99102
}
@@ -107,7 +110,7 @@ public void SetDefault(string version)
107110
{
108111
lock (_gate)
109112
{
110-
if (!Versions.ContainsKey(version))
113+
if (!_versions.ContainsKey(version))
111114
throw new AgentVersionNotAvailableException($"{Name}@{version} not registered");
112115
DefaultVersion = version;
113116
}
@@ -119,10 +122,22 @@ public void SetDefault(string version)
119122
{
120123
lock (_gate)
121124
{
122-
foreach (var kv in Versions) return (kv.Key, kv.Value);
125+
foreach (var kv in _versions) return (kv.Key, kv.Value);
123126
return null;
124127
}
125128
}
126129
}
130+
131+
/// <summary>Atomically snapshot the inventory-visible fields under the same lock that guards
132+
/// mutation, so <see cref="ToInventory"/> never enumerates the mutable dictionary while a
133+
/// concurrent registration is writing to it.</summary>
134+
public (IReadOnlyList<string>? Versions, string? Default) SnapshotInventoryFields()
135+
{
136+
lock (_gate)
137+
{
138+
var versions = _versions.Count > 0 ? _versions.Keys.ToArray() : null;
139+
return (versions, DefaultVersion);
140+
}
141+
}
127142
}
128143
}

src/Arcp.Runtime/ArcpServer.cs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ public ArcpServer(ArcpServerOptions options, ILoggerFactory? loggerFactory = nul
7575
options.IdempotencyWindowSec,
7676
options.EventLogCapacity,
7777
options.TerminalJobRetentionSec,
78-
options.FatalBudgetExhaustion);
78+
options.FatalBudgetExhaustion,
79+
options.PermissiveUnleasedOperations);
7980
if (CredentialManager is not null)
8081
{
8182
_ = Task.Run(() => RevokeOutstandingCredentialsAsync(CancellationToken.None));
@@ -213,6 +214,9 @@ internal bool TryResume(string resumeToken, out SessionState? session)
213214
/// <summary>Dispose (asynchronous).</summary>
214215
public async ValueTask DisposeAsync()
215216
{
217+
// Cancel in-flight jobs only at runtime shutdown — never on individual session teardown
218+
// (spec §6.4, §6.7). Jobs are rooted at the JobManager's runtime token, not session _cts.
219+
JobManager.Dispose();
216220
try { _sweeperCts.Cancel(); } catch (ObjectDisposedException) { /* already disposed */ }
217221
if (_sweeperTask is not null)
218222
{
@@ -240,9 +244,12 @@ private static IReadOnlyList<string> ComputeAdvertisedFeatures(ArcpServerOptions
240244
nameof(options));
241245
}
242246

247+
// Spec §9.7: model.use enforcement is independent of credential provisioning —
248+
// LeaseManager.AuthorizeModelUse enforces it directly, so it MUST stay advertisable even
249+
// when no ICredentialProvisioner is configured. Only provisioned_credentials is coupled to
250+
// having a provisioner.
243251
return requested
244-
.Where(f => !string.Equals(f, FeatureFlags.ProvisionedCredentials, StringComparison.Ordinal) &&
245-
!string.Equals(f, FeatureFlags.ModelUse, StringComparison.Ordinal))
252+
.Where(f => !string.Equals(f, FeatureFlags.ProvisionedCredentials, StringComparison.Ordinal))
246253
.ToArray();
247254
}
248255

src/Arcp.Runtime/ArcpServerOptions.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@ public sealed class ArcpServerOptions
5050
/// <summary>Gets the back pressure threshold.</summary>
5151
public int BackPressureThreshold { get; init; } = 1000;
5252

53+
/// <summary>When <see langword="false"/> (the default, spec §9.1/§9.3 deny-by-default), an
54+
/// authority-bearing operation (<c>tool.call</c>, <c>agent.delegate</c>) whose namespace the
55+
/// job's lease does not declare fails with <c>PERMISSION_DENIED</c>. Set to <see langword="true"/>
56+
/// to opt into the legacy permissive behavior where uncovered namespaces are allowed.</summary>
57+
public bool PermissiveUnleasedOperations { get; init; }
58+
5359
/// <summary>Gets the authorization policy.</summary>
5460
public IJobAuthorizationPolicy AuthorizationPolicy { get; init; } = new SamePrincipalPolicy();
5561

0 commit comments

Comments
 (0)