Skip to content

Commit d3c3ffb

Browse files
nficanoclaude
andcommitted
refactor: reduce complexity across SDK core and runtime
Phase 4 of refactor-plan.md. Splits the four flagged hot files and extracts methods so every src/ file is now under the 300-line hard cap and methods stay near the 30-line target. - src/Arcp.Core/Messages/MessageTypes.cs (421 lines, 36 types) is split one-type-per-file. Mechanical; no behaviour change. - SessionState (475 lines) becomes partial: Dispatch (inbound switch + ping/ack helpers), Handshake (welcome flow split into VerifyAuth / TryResume / NegotiateFeatures / BuildWelcome / Replay), Jobs (submit + subscribe), Outbound (sender + heartbeat + fan-out). Each file <=140 lines; the long handshake method becomes 5 helpers. - ArcpClient (348 lines): JobSubscription moves to its own file (style guide section 1: one public type per file). ArcpClient itself splits into primary / Dispatch / Jobs / Subscriptions partials; the dispatch switch's per-case work is hoisted into ApplyWelcome / RespondToPing / PropagateSessionError helpers. - JobManager.RunAsync (118 lines) breaks into EmitSuccessResultAsync, EmitJobErrorAsync, BuildInlineResultPayload, BuildEnvelope, StartLeaseWatchdog, AwaitWatchdogAsync. The 50-line List method moves to JobManager.Listing.cs partial with FilterByPrincipal / ApplyFilter / ToListEntry helpers and the cursor base64 codecs. - EnvelopeJsonConverter.Read (82 lines) split into ParseFields, ValidateHeader, DeserializePayload over a private EnvelopeFields state record; Write splits into WriteHeader / WritePayload / WriteExtensions. Build clean, 41 tests pass, dotnet format clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 4286701 commit d3c3ffb

50 files changed

Lines changed: 1771 additions & 1251 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
using System;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
using Arcp.Core.Caps;
6+
using Arcp.Core.Errors;
7+
using Arcp.Core.Ids;
8+
using Arcp.Core.Messages;
9+
using Arcp.Core.Wire;
10+
11+
namespace Arcp.Client;
12+
13+
public sealed partial class ArcpClient
14+
{
15+
private async Task ReaderLoop(CancellationToken cancellationToken)
16+
{
17+
try
18+
{
19+
await foreach (var env in _transport.ReceiveAsync(cancellationToken).ConfigureAwait(false))
20+
{
21+
if (env.EventSeq is { } seq) Interlocked.Exchange(ref _lastReceivedSeq, seq);
22+
Dispatch(env, cancellationToken);
23+
}
24+
}
25+
catch (OperationCanceledException)
26+
{
27+
// Expected on shutdown; reader loop exits silently.
28+
}
29+
catch (Exception)
30+
{
31+
FailAllInFlight();
32+
}
33+
}
34+
35+
private void FailAllInFlight()
36+
{
37+
foreach (var h in _handles.Values)
38+
{
39+
h.OnError(new JobErrorPayload
40+
{
41+
Code = ErrorCode.InternalError,
42+
Message = "Transport closed",
43+
Retryable = true,
44+
});
45+
}
46+
}
47+
48+
private void Dispatch(Envelope env, CancellationToken cancellationToken)
49+
{
50+
switch (env.Type)
51+
{
52+
case MessageTypeNames.SessionWelcome:
53+
if (env.Payload is SessionWelcomePayload w) ApplyWelcome(env, w);
54+
break;
55+
case MessageTypeNames.SessionPing:
56+
if (env.Payload is SessionPingPayload p) RespondToPing(p, cancellationToken);
57+
break;
58+
case MessageTypeNames.SessionError:
59+
if (env.Payload is SessionErrorPayload err) PropagateSessionError(err);
60+
break;
61+
case MessageTypeNames.SessionJobs:
62+
if (env.Payload is SessionJobsPayload jobs && jobs.RequestId is { } reqId)
63+
{
64+
if (_listJobsRequests.TryRemove(reqId, out var tcs)) tcs.TrySetResult(jobs);
65+
}
66+
break;
67+
case MessageTypeNames.JobAccepted:
68+
if (env.Payload is JobAcceptedPayload accepted && env.JobId is { } jaJid && JobId.TryParse(jaJid, null, out var jaId))
69+
{
70+
if (_pendingSubmits.TryDequeue(out var pending))
71+
{
72+
pending.OnAccepted(accepted);
73+
_handles[jaId] = pending;
74+
}
75+
}
76+
break;
77+
case MessageTypeNames.JobSubscribed:
78+
if (env.Payload is JobSubscribedPayload subbed && env.JobId is { } jsJid && JobId.TryParse(jsJid, null, out var jsId))
79+
{
80+
if (_subscriptions.TryGetValue(jsId, out var s2)) s2.OnSubscribed(subbed);
81+
}
82+
break;
83+
case MessageTypeNames.JobEvent:
84+
if (env.JobId is { } jeId && JobId.TryParse(jeId, null, out var jeJid))
85+
{
86+
if (_handles.TryGetValue(jeJid, out var h2)) h2.OnEvent(env);
87+
if (_subscriptions.TryGetValue(jeJid, out var sub2)) sub2.OnEvent(env);
88+
}
89+
break;
90+
case MessageTypeNames.JobResult:
91+
if (env.Payload is JobResultPayload res && env.JobId is { } jrJid && JobId.TryParse(jrJid, null, out var jrId))
92+
{
93+
if (_handles.TryGetValue(jrId, out var h3)) h3.OnResult(res);
94+
if (_subscriptions.TryGetValue(jrId, out var sub3)) sub3.OnTerminal();
95+
}
96+
break;
97+
case MessageTypeNames.JobError:
98+
if (env.Payload is JobErrorPayload jerr && env.JobId is { } jerrJid && JobId.TryParse(jerrJid, null, out var jerrId))
99+
{
100+
if (_handles.TryGetValue(jerrId, out var h4)) h4.OnError(jerr);
101+
if (_subscriptions.TryGetValue(jerrId, out var sub4)) sub4.OnTerminal();
102+
}
103+
break;
104+
}
105+
}
106+
107+
private void ApplyWelcome(Envelope env, SessionWelcomePayload w)
108+
{
109+
if (env.SessionId is { } sid && SessionId.TryParse(sid, null, out var s)) SessionId = s;
110+
EffectiveFeatures = FeatureSet.Intersect(_options.Features, w.Capabilities.Features);
111+
ResumeToken = w.ResumeToken;
112+
Agents = w.Capabilities.Agents ?? Array.Empty<AgentInventoryEntry>();
113+
Runtime = w.Runtime;
114+
HeartbeatIntervalSec = w.HeartbeatIntervalSec;
115+
_welcomeTcs?.TrySetResult(w);
116+
}
117+
118+
private void RespondToPing(SessionPingPayload p, CancellationToken cancellationToken)
119+
{
120+
_ = _transport.SendAsync(new Envelope
121+
{
122+
Type = MessageTypeNames.SessionPong,
123+
SessionId = SessionId.Value,
124+
Payload = new SessionPongPayload
125+
{
126+
PingNonce = p.Nonce,
127+
ReceivedAt = _options.TimeProvider.GetUtcNow(),
128+
},
129+
}, cancellationToken).AsTask();
130+
}
131+
132+
private void PropagateSessionError(SessionErrorPayload err)
133+
{
134+
foreach (var h in _handles.Values)
135+
{
136+
h.OnError(new JobErrorPayload
137+
{
138+
Code = err.Code,
139+
Message = err.Message,
140+
Retryable = err.Retryable,
141+
Detail = err.Detail,
142+
});
143+
}
144+
}
145+
}

src/Arcp.Client/ArcpClient.Jobs.cs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
using Arcp.Core.Ids;
5+
using Arcp.Core.Leases;
6+
using Arcp.Core.Messages;
7+
using Arcp.Core.Wire;
8+
9+
namespace Arcp.Client;
10+
11+
public sealed partial class ArcpClient
12+
{
13+
public async Task<JobHandle> SubmitAsync(string agent, object? input = null, Lease? leaseRequest = null,
14+
LeaseConstraints? leaseConstraints = null, string? idempotencyKey = null,
15+
int? maxRuntimeSec = null, CancellationToken cancellationToken = default)
16+
{
17+
var handle = new JobHandle(this);
18+
_pendingSubmits.Enqueue(handle);
19+
await _transport.SendAsync(new Envelope
20+
{
21+
Type = MessageTypeNames.JobSubmit,
22+
SessionId = SessionId.Value,
23+
Payload = new JobSubmitPayload
24+
{
25+
Agent = agent,
26+
Input = input is null ? null : ArcpJson.ToJsonElement(input),
27+
LeaseRequest = leaseRequest,
28+
LeaseConstraints = leaseConstraints,
29+
IdempotencyKey = idempotencyKey,
30+
MaxRuntimeSec = maxRuntimeSec,
31+
},
32+
}, cancellationToken).ConfigureAwait(false);
33+
await handle.Accepted.WaitAsync(cancellationToken).ConfigureAwait(false);
34+
return handle;
35+
}
36+
37+
public async Task CancelJobAsync(JobId jobId, string? reason = null, CancellationToken cancellationToken = default)
38+
{
39+
await _transport.SendAsync(new Envelope
40+
{
41+
Type = MessageTypeNames.JobCancel,
42+
SessionId = SessionId.Value,
43+
JobId = jobId.Value,
44+
Payload = new JobCancelPayload { JobId = jobId.Value, Reason = reason },
45+
}, cancellationToken).ConfigureAwait(false);
46+
}
47+
48+
public async Task<SessionJobsPayload> ListJobsAsync(JobListFilter? filter = null, int? limit = null,
49+
string? cursor = null, CancellationToken cancellationToken = default)
50+
{
51+
var id = "msg_" + Ulid.NewUlid();
52+
var tcs = new TaskCompletionSource<SessionJobsPayload>(TaskCreationOptions.RunContinuationsAsynchronously);
53+
_listJobsRequests[id] = tcs;
54+
await _transport.SendAsync(new Envelope
55+
{
56+
Id = id,
57+
Type = MessageTypeNames.SessionListJobs,
58+
SessionId = SessionId.Value,
59+
Payload = new SessionListJobsPayload { Filter = filter, Limit = limit, Cursor = cursor },
60+
}, cancellationToken).ConfigureAwait(false);
61+
return await tcs.Task.WaitAsync(cancellationToken).ConfigureAwait(false);
62+
}
63+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
using Arcp.Core.Ids;
5+
using Arcp.Core.Messages;
6+
using Arcp.Core.Wire;
7+
8+
namespace Arcp.Client;
9+
10+
public sealed partial class ArcpClient
11+
{
12+
public async Task<JobSubscription> SubscribeAsync(JobId jobId, bool history = false,
13+
long? fromEventSeq = null, CancellationToken cancellationToken = default)
14+
{
15+
var sub = new JobSubscription(this, jobId);
16+
_subscriptions[jobId] = sub;
17+
await _transport.SendAsync(new Envelope
18+
{
19+
Type = MessageTypeNames.JobSubscribe,
20+
SessionId = SessionId.Value,
21+
JobId = jobId.Value,
22+
Payload = new JobSubscribePayload { JobId = jobId.Value, History = history, FromEventSeq = fromEventSeq },
23+
}, cancellationToken).ConfigureAwait(false);
24+
await sub.Acknowledged.WaitAsync(cancellationToken).ConfigureAwait(false);
25+
return sub;
26+
}
27+
28+
public async Task UnsubscribeAsync(JobId jobId, CancellationToken cancellationToken = default)
29+
{
30+
_subscriptions.TryRemove(jobId, out _);
31+
await _transport.SendAsync(new Envelope
32+
{
33+
Type = MessageTypeNames.JobUnsubscribe,
34+
SessionId = SessionId.Value,
35+
JobId = jobId.Value,
36+
Payload = new JobUnsubscribePayload { JobId = jobId.Value },
37+
}, cancellationToken).ConfigureAwait(false);
38+
}
39+
40+
public ValueTask AckAsync(long lastProcessedSeq, CancellationToken cancellationToken = default) =>
41+
_transport.SendAsync(new Envelope
42+
{
43+
Type = MessageTypeNames.SessionAck,
44+
SessionId = SessionId.Value,
45+
Payload = new SessionAckPayload { LastProcessedSeq = lastProcessedSeq },
46+
}, cancellationToken);
47+
}

0 commit comments

Comments
 (0)