Skip to content

Commit c97fa23

Browse files
nficanocursoragent
andcommitted
test: cover audit fixes; update samples/recipes for deny-by-default leases
- Add unit tests for the budget authorization gate (#42) and AgentRegistry concurrency (#68). - Add integration tests for job survival across session teardown (#37), session.close/closed (#41), INTERNAL_ERROR on unexpected dispatch failure (#40), fail-closed empty-principal listing (#45), model.use advertisement (#46), keyset pagination stability (#67), deny-by-default tool.call (#43), strictly-monotonic event_seq under concurrent emitters (#39), exactly-once subscribe boundary (#44), and client event_seq gap detection (#47). - Grant tool.call/agent.delegate leases in the CostBudget, Delegate, and multi-agent-budget samples/recipes so they remain runnable under deny-by-default (#43); enable permissive mode in the event round-trip test which exercises event kinds rather than lease enforcement. Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent bfb3e90 commit c97fa23

8 files changed

Lines changed: 581 additions & 1 deletion

File tree

recipes/multi-agent-budget/Program.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ await ctx.LogAsync("warn",
108108
var plannerLease = new Lease(new Dictionary<string, IReadOnlyList<string>>
109109
{
110110
["cost.budget"] = new[] { "USD:5.00" },
111+
// Spec §9.3 deny-by-default: the planner must hold agent.delegate to delegate to workers.
112+
["agent.delegate"] = new[] { "*" },
111113
});
112114

113115
var handle = await client.SubmitAsync(

samples/CostBudget/Program.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
var lease = new Lease(new Dictionary<string, IReadOnlyList<string>>
3131
{
3232
["cost.budget"] = new[] { "USD:1.00" },
33+
// Spec §9.3 deny-by-default: tool.call must be covered by the lease for the agent to emit it.
34+
["tool.call"] = new[] { "*" },
3335
});
3436
var handle = await client.SubmitAsync("research", leaseRequest: lease);
3537
Console.WriteLine($"initial budget: {string.Join(",", handle.Budget!)}");

samples/Delegate/Program.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// samples/Delegate: parent agent submits a child job and emits a `delegate` event linking them.
33
// Spec: §10, §13.2.
44
using Arcp.Client;
5+
using Arcp.Core.Leases;
56
using Arcp.Core.Messages;
67
using Arcp.Core.Transport;
78
using Arcp.Runtime;
@@ -24,7 +25,12 @@
2425
{
2526
Client = new ClientInfo { Name = "delegate-client", Version = "1.0.0" },
2627
});
27-
var handle = await client.SubmitAsync("parent");
28+
// Spec §9.3 deny-by-default: agent.delegate must be covered by the lease for the parent to delegate.
29+
var lease = new Lease(new Dictionary<string, IReadOnlyList<string>>
30+
{
31+
["agent.delegate"] = new[] { "*" },
32+
});
33+
var handle = await client.SubmitAsync("parent", leaseRequest: lease);
2834
var res = await handle.Result;
2935
Console.WriteLine($"parent: {res.FinalStatus}");
3036
return 0;
Lines changed: 288 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,288 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
using System;
3+
using System.Collections.Generic;
4+
using System.Linq;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using Arcp.Client;
8+
using Arcp.Core.Auth;
9+
using Arcp.Core.Caps;
10+
using Arcp.Core.Errors;
11+
using Arcp.Core.Leases;
12+
using Arcp.Core.Messages;
13+
using Arcp.Core.Transport;
14+
using Arcp.Core.Wire;
15+
using Arcp.Runtime;
16+
using Arcp.Runtime.Authorization;
17+
using FluentAssertions;
18+
using Xunit;
19+
20+
namespace Arcp.IntegrationTests;
21+
22+
public class AuditFixesTests
23+
{
24+
private static (ArcpServer server, MemoryTransport clientT) StartServer(Action<ArcpServer> configure,
25+
IBearerVerifier? auth = null, IJobAuthorizationPolicy? policy = null, TimeProvider? time = null)
26+
{
27+
var opts = new ArcpServerOptions
28+
{
29+
Runtime = new RuntimeInfo { Name = "test-runtime", Version = "1.0.0" },
30+
Auth = auth,
31+
AuthorizationPolicy = policy ?? new SamePrincipalPolicy(),
32+
TimeProvider = time ?? TimeProvider.System,
33+
};
34+
var server = new ArcpServer(opts);
35+
configure(server);
36+
var (client, srv) = MemoryTransport.Pair();
37+
_ = Task.Run(() => server.AcceptAsync(srv));
38+
return (server, client);
39+
}
40+
41+
// ── #41: session.close / session.closed wire types (spec §6.7) ──────────────────────────────
42+
[Fact]
43+
public void SessionClose_and_SessionClosed_are_registered_wire_types()
44+
{
45+
MessageTypeNames.All.Should().Contain(new[] { MessageTypeNames.SessionClose, MessageTypeNames.SessionClosed });
46+
MessageTypeRegistry.Default.TryGet(MessageTypeNames.SessionClose, out _).Should().BeTrue();
47+
MessageTypeRegistry.Default.TryGet(MessageTypeNames.SessionClosed, out _).Should().BeTrue();
48+
}
49+
50+
[Fact]
51+
public async Task Runtime_accepts_session_close_and_replies_session_closed()
52+
{
53+
var (_, t) = StartServer(s => s.RegisterAgent("noop", (ctx, ct) => Task.FromResult<object?>(null)));
54+
await t.SendAsync(new Envelope
55+
{
56+
Type = MessageTypeNames.SessionHello,
57+
Payload = new SessionHelloPayload
58+
{
59+
Client = new ClientInfo { Name = "t", Version = "1" },
60+
Capabilities = new Capabilities { Encodings = new[] { "json" }, Features = Array.Empty<string>() },
61+
},
62+
});
63+
64+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
65+
Envelope? closed = null;
66+
await foreach (var env in t.ReceiveAsync(cts.Token))
67+
{
68+
if (env.Type == MessageTypeNames.SessionWelcome)
69+
{
70+
await t.SendAsync(new Envelope
71+
{
72+
Type = MessageTypeNames.SessionClose,
73+
SessionId = env.SessionId,
74+
Payload = new SessionByePayload { Reason = "done" },
75+
});
76+
}
77+
else if (env.Type == MessageTypeNames.SessionClosed)
78+
{
79+
closed = env;
80+
break;
81+
}
82+
}
83+
84+
closed.Should().NotBeNull("the runtime must acknowledge session.close with session.closed");
85+
}
86+
87+
// ── #46: model.use advertised independently of credential provisioning (spec §9.7) ──────────
88+
[Fact]
89+
public async Task ModelUse_is_advertised_without_a_credential_provisioner()
90+
{
91+
var (_, t) = StartServer(s => s.RegisterAgent("noop", (ctx, ct) => Task.FromResult<object?>(null)));
92+
await using var c = await ArcpClient.ConnectAsync(t, new ArcpClientOptions
93+
{
94+
Client = new ClientInfo { Name = "t", Version = "1" },
95+
});
96+
c.EffectiveFeatures.Should().Contain(FeatureFlags.ModelUse);
97+
}
98+
99+
// ── #40: unexpected dispatch exception surfaces INTERNAL_ERROR (spec §12) ────────────────────
100+
private sealed class ThrowingPolicy : IJobAuthorizationPolicy
101+
{
102+
public bool CanObserve(string? jobSubmitterPrincipal, AuthPrincipal? requestor) =>
103+
throw new InvalidOperationException("boom");
104+
}
105+
106+
// ── #45 + #40 with two principals on one runtime ────────────────────────────────────────────
107+
[Fact]
108+
public async Task Empty_principal_sees_no_jobs_and_throwing_policy_surfaces_INTERNAL_ERROR()
109+
{
110+
var server = new ArcpServer(new ArcpServerOptions
111+
{
112+
Runtime = new RuntimeInfo { Name = "test-runtime", Version = "1.0.0" },
113+
Auth = new MappingVerifier(),
114+
AuthorizationPolicy = new SamePrincipalPolicy(),
115+
});
116+
server.RegisterAgent("sleeper", async (ctx, ct) => { await Task.Delay(3000, ct); return null; });
117+
118+
var (aliceT, aliceSrv) = MemoryTransport.Pair();
119+
_ = Task.Run(() => server.AcceptAsync(aliceSrv));
120+
await using var alice = await ArcpClient.ConnectAsync(aliceT, new ArcpClientOptions
121+
{
122+
Client = new ClientInfo { Name = "alice", Version = "1" },
123+
Token = "alice",
124+
});
125+
await alice.SubmitAsync("sleeper");
126+
127+
// alice sees her own job.
128+
(await alice.ListJobsAsync()).Jobs.Should().ContainSingle();
129+
130+
// A session whose principal subject is empty must see nothing (fail-closed, spec §6.6/§14).
131+
var (ghostT, ghostSrv) = MemoryTransport.Pair();
132+
_ = Task.Run(() => server.AcceptAsync(ghostSrv));
133+
await using var ghost = await ArcpClient.ConnectAsync(ghostT, new ArcpClientOptions
134+
{
135+
Client = new ClientInfo { Name = "ghost", Version = "1" },
136+
Token = "ghost", // MappingVerifier maps this to an EMPTY subject
137+
});
138+
(await ghost.ListJobsAsync()).Jobs.Should().BeEmpty();
139+
}
140+
141+
private sealed class MappingVerifier : IBearerVerifier
142+
{
143+
public ValueTask<AuthPrincipal?> VerifyAsync(string? token, CancellationToken cancellationToken = default) =>
144+
ValueTask.FromResult<AuthPrincipal?>(token switch
145+
{
146+
"ghost" => new AuthPrincipal(string.Empty),
147+
null or "" => null,
148+
_ => new AuthPrincipal(token),
149+
});
150+
}
151+
152+
[Fact]
153+
public async Task ListJobs_rejected_by_throwing_policy_surfaces_INTERNAL_ERROR()
154+
{
155+
var server = new ArcpServer(new ArcpServerOptions
156+
{
157+
Runtime = new RuntimeInfo { Name = "test-runtime", Version = "1.0.0" },
158+
Auth = new AllowAnyBearerVerifier(),
159+
AuthorizationPolicy = new ThrowingPolicy(),
160+
});
161+
server.RegisterAgent("sleeper", async (ctx, ct) => { await Task.Delay(3000, ct); return null; });
162+
163+
var (aliceT, aliceSrv) = MemoryTransport.Pair();
164+
_ = Task.Run(() => server.AcceptAsync(aliceSrv));
165+
await using var alice = await ArcpClient.ConnectAsync(aliceT, new ArcpClientOptions
166+
{
167+
Client = new ClientInfo { Name = "alice", Version = "1" }, Token = "alice",
168+
});
169+
await alice.SubmitAsync("sleeper");
170+
171+
var (bobT, bobSrv) = MemoryTransport.Pair();
172+
_ = Task.Run(() => server.AcceptAsync(bobSrv));
173+
await using var bob = await ArcpClient.ConnectAsync(bobT, new ArcpClientOptions
174+
{
175+
Client = new ClientInfo { Name = "bob", Version = "1" }, Token = "bob",
176+
});
177+
178+
// bob listing forces ThrowingPolicy.CanObserve over alice's job → unexpected exception →
179+
// session.error{INTERNAL_ERROR}; #73 makes the awaiting ListJobsAsync throw it.
180+
var act = async () => await bob.ListJobsAsync().WaitAsync(TimeSpan.FromSeconds(3));
181+
(await act.Should().ThrowAsync<ArcpException>()).Which.Code.Should().Be(ErrorCode.InternalError);
182+
}
183+
184+
// ── #37: a job survives session teardown and is not cancelled (spec §6.4, §6.7) ──────────────
185+
[Fact]
186+
public async Task Job_keeps_running_after_its_session_transport_drops()
187+
{
188+
var started = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
189+
var release = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
190+
var finished = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
191+
192+
var (_, t) = StartServer(s => s.RegisterAgent("long", async (ctx, ct) =>
193+
{
194+
started.TrySetResult();
195+
// Honors the job token: if the job were cancelled on session close this would throw.
196+
await release.Task.WaitAsync(ct);
197+
finished.TrySetResult();
198+
return "ok";
199+
}));
200+
201+
var c = await ArcpClient.ConnectAsync(t, new ArcpClientOptions { Client = new ClientInfo { Name = "t", Version = "1" } });
202+
await c.SubmitAsync("long");
203+
await started.Task.WaitAsync(TimeSpan.FromSeconds(3));
204+
205+
// Drop the session.
206+
await c.DisposeAsync();
207+
208+
// The job must still be alive: releasing it now lets it complete (it would have thrown on a
209+
// cancelled token if session teardown had terminated it).
210+
release.TrySetResult();
211+
await finished.Task.WaitAsync(TimeSpan.FromSeconds(3));
212+
}
213+
214+
// ── #43: deny-by-default for uncovered tool.call / agent.delegate (spec §9.3) ─────────────────
215+
[Fact]
216+
public async Task ToolCall_without_a_lease_namespace_is_denied_by_default()
217+
{
218+
var (_, t) = StartServer(s => s.RegisterAgent("tooler", async (ctx, ct) =>
219+
{
220+
await ctx.ToolCallAsync("fs.write", "c1", new { path = "/x" }, ct);
221+
return "done";
222+
}));
223+
await using var c = await ArcpClient.ConnectAsync(t, new ArcpClientOptions { Client = new ClientInfo { Name = "t", Version = "1" } });
224+
var handle = await c.SubmitAsync("tooler");
225+
var result = await handle.Result.WaitAsync(TimeSpan.FromSeconds(3));
226+
result.Success.Should().BeFalse();
227+
result.Error!.Code.Should().Be(ErrorCode.PermissionDenied);
228+
}
229+
230+
[Fact]
231+
public async Task ToolCall_is_allowed_when_PermissiveUnleasedOperations_is_enabled()
232+
{
233+
var server = new ArcpServer(new ArcpServerOptions
234+
{
235+
Runtime = new RuntimeInfo { Name = "test-runtime", Version = "1.0.0" },
236+
PermissiveUnleasedOperations = true,
237+
});
238+
server.RegisterAgent("tooler", async (ctx, ct) =>
239+
{
240+
await ctx.ToolCallAsync("fs.write", "c1", new { path = "/x" }, ct);
241+
return "done";
242+
});
243+
var (clientT, srv) = MemoryTransport.Pair();
244+
_ = Task.Run(() => server.AcceptAsync(srv));
245+
await using var c = await ArcpClient.ConnectAsync(clientT, new ArcpClientOptions { Client = new ClientInfo { Name = "t", Version = "1" } });
246+
var handle = await c.SubmitAsync("tooler");
247+
var result = await handle.Result.WaitAsync(TimeSpan.FromSeconds(3));
248+
result.Success.Should().BeTrue();
249+
}
250+
251+
// ── #67: keyset pagination is stable and bounded, even with identical CreatedAt ──────────────
252+
private sealed class FixedTimeProvider : TimeProvider
253+
{
254+
private readonly DateTimeOffset _now;
255+
public FixedTimeProvider(DateTimeOffset now) { _now = now; }
256+
public override DateTimeOffset GetUtcNow() => _now;
257+
}
258+
259+
[Fact]
260+
public async Task ListJobs_pages_stably_through_jobs_that_share_a_CreatedAt()
261+
{
262+
var fixedTime = new FixedTimeProvider(DateTimeOffset.Parse("2026-06-11T00:00:00Z"));
263+
var (_, t) = StartServer(
264+
s => s.RegisterAgent("sleeper", async (ctx, ct) => { await Task.Delay(4000, ct); return null; }),
265+
time: fixedTime);
266+
await using var c = await ArcpClient.ConnectAsync(t, new ArcpClientOptions { Client = new ClientInfo { Name = "t", Version = "1" } });
267+
268+
const int total = 5;
269+
for (var i = 0; i < total; i++) await c.SubmitAsync("sleeper");
270+
271+
// Page through with a small limit; collect every job id exactly once.
272+
var seen = new List<string>();
273+
string? cursor = null;
274+
var pages = 0;
275+
do
276+
{
277+
var page = await c.ListJobsAsync(limit: 2, cursor: cursor);
278+
page.Jobs.Count.Should().BeLessThanOrEqualTo(2, "limit must bound page size");
279+
seen.AddRange(page.Jobs.Select(j => j.JobId));
280+
cursor = page.NextCursor;
281+
pages++;
282+
}
283+
while (cursor is not null && pages < 10);
284+
285+
seen.Should().HaveCount(total);
286+
seen.Distinct().Should().HaveCount(total, "pagination must not duplicate or drop jobs across pages");
287+
}
288+
}

0 commit comments

Comments
 (0)