Skip to content

Commit d363332

Browse files
Nick Ficanoclaude
andcommitted
refactor: lean on F# idioms in transport, server, and event-log internals
- WebSocket.receiveOne: drop the malformed-envelope fallthrough and recurse instead of fabricating a sentinel from a re-decoded payload. Frame reassembly stays imperative (BCL ReceiveAsync gives no functional aggregator) but is now isolated behind a Choice. - Glob.compile: replace mutable StringBuilder + index walk with a tail recursive translator over the char list. - EventLog.EvictExpired: switch the per-session prune to a recursive drop inside the lock and aggregate counts via Seq.sumBy. - ArcpServer.registerHandler: replace the silent default-of JsonElement placeholder with one that raises if the inventory adapter is ever routed through, plus a comment naming the cycle the ref cell breaks. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 6cc69d8 commit d363332

4 files changed

Lines changed: 66 additions & 55 deletions

File tree

src/Arcp.Client/Transport/WebSocket.fs

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -32,30 +32,38 @@ type WebSocketClientTransport(socket: WebSocket, ownsSocket: bool) =
3232
socket.SendAsync(ArraySegment<byte>(bytes), WebSocketMessageType.Text, true, ct))
3333
} :> Task
3434

35-
let receiveOne (ct: CancellationToken) : Task<Envelope option> =
35+
let rec receiveOne (ct: CancellationToken) : Task<Envelope option> =
3636
task {
3737
let buffer = ArrayPool<byte>.Shared.Rent(8192)
38-
try
39-
use ms = new MemoryStream()
40-
let mutable endOfMessage = false
41-
let mutable closedRemotely = false
42-
while not endOfMessage && not closedRemotely do
43-
let! result = socket.ReceiveAsync(ArraySegment<byte>(buffer), ct)
44-
if result.MessageType = WebSocketMessageType.Close then
45-
closedRemotely <- true
46-
else
47-
ms.Write(buffer, 0, result.Count)
48-
endOfMessage <- result.EndOfMessage
49-
if closedRemotely then return None
50-
else
51-
let text = Encoding.UTF8.GetString(ms.ToArray())
52-
match Codec.readEnvelope text with
53-
| Ok env -> return Some env
54-
| Error _ -> return Some (Codec.readEnvelope text |> function Ok e -> e | _ -> Unchecked.defaultof<_>)
55-
// Fall back to ignoring malformed messages by returning a sentinel; we'll
56-
// re-loop on the caller side. Simpler: re-call receiveOne when malformed.
57-
finally
58-
ArrayPool<byte>.Shared.Return(buffer)
38+
let! frame =
39+
task {
40+
try
41+
use ms = new MemoryStream()
42+
// Mutation here drives WebSocket frame reassembly:
43+
// BCL `ReceiveAsync` returns one frame at a time and
44+
// there is no functional aggregator for it.
45+
let mutable endOfMessage = false
46+
let mutable closedRemotely = false
47+
while not endOfMessage && not closedRemotely do
48+
let! result = socket.ReceiveAsync(ArraySegment<byte>(buffer), ct)
49+
if result.MessageType = WebSocketMessageType.Close then
50+
closedRemotely <- true
51+
else
52+
ms.Write(buffer, 0, result.Count)
53+
endOfMessage <- result.EndOfMessage
54+
if closedRemotely then return Choice1Of2 ()
55+
else return Choice2Of2 (Encoding.UTF8.GetString(ms.ToArray()))
56+
finally
57+
ArrayPool<byte>.Shared.Return(buffer)
58+
}
59+
match frame with
60+
| Choice1Of2 () -> return None
61+
| Choice2Of2 text ->
62+
match Codec.readEnvelope text with
63+
| Ok env -> return Some env
64+
// Malformed envelopes are dropped at the transport boundary
65+
// per spec §4 — clients must not surface them to dispatch.
66+
| Error _ -> return! receiveOne ct
5967
}
6068

6169
interface ITransport with

src/Arcp.Core/Lease.fs

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -46,26 +46,15 @@ module Glob =
4646
/// `**` matches any path segment including `/`; `*` matches
4747
/// any character except `/`; `?` matches a single non-`/` char.
4848
let compile (pattern: string) : Regex =
49-
let sb = System.Text.StringBuilder()
50-
sb.Append "^" |> ignore
51-
let mutable i = 0
52-
let n = pattern.Length
53-
while i < n do
54-
let c = pattern.[i]
55-
if c = '*' && i + 1 < n && pattern.[i + 1] = '*' then
56-
sb.Append ".*" |> ignore
57-
i <- i + 2
58-
elif c = '*' then
59-
sb.Append "[^/]*" |> ignore
60-
i <- i + 1
61-
elif c = '?' then
62-
sb.Append "[^/]" |> ignore
63-
i <- i + 1
64-
else
65-
sb.Append(Regex.Escape(string c)) |> ignore
66-
i <- i + 1
67-
sb.Append "$" |> ignore
68-
Regex(sb.ToString(), RegexOptions.Compiled ||| RegexOptions.CultureInvariant)
49+
let rec translate (chars: char list) (acc: string list) : string list =
50+
match chars with
51+
| [] -> List.rev acc
52+
| '*' :: '*' :: rest -> translate rest (".*" :: acc)
53+
| '*' :: rest -> translate rest ("[^/]*" :: acc)
54+
| '?' :: rest -> translate rest ("[^/]" :: acc)
55+
| c :: rest -> translate rest (Regex.Escape(string c) :: acc)
56+
let body = pattern |> List.ofSeq |> (fun cs -> translate cs []) |> String.concat ""
57+
Regex("^" + body + "$", RegexOptions.Compiled ||| RegexOptions.CultureInvariant)
6958

7059
let isMatch (pattern: string) (target: string) : bool =
7160
// Amount strings used in `cost.budget` and any non-glob

src/Arcp.Runtime/ArcpServer.fs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,10 @@ type ArcpServer(options: ArcpServerOptions) =
4949
let sessions = ConcurrentDictionary<string, ServerSessionContext>()
5050
let agentHandlers = ConcurrentDictionary<string, ArcpAgentHandler>()
5151

52-
// The outbox is built per-session and stored in a ref cell so
53-
// `JobManager` can route emits back through it without circular
54-
// construction.
52+
// `JobManager` and the real outbox are mutually dependent: the
53+
// outbox needs `jobs` (for Subscribers / Terminate) and `jobs`
54+
// needs the outbox. We break the cycle with a ref cell that
55+
// `BuildOutbox` assigns into; mutation is unavoidable here.
5556
let outbox : IJobOutbox ref = ref Unchecked.defaultof<IJobOutbox>
5657
let jobs =
5758
JobManager(
@@ -63,8 +64,16 @@ type ArcpServer(options: ArcpServerOptions) =
6364

6465
let registerHandler (name: string) (version: string) (h: ArcpAgentHandler) =
6566
agentHandlers.[name + "@" + version] <- h
66-
let adapter : AgentHandler = fun _ -> task { return Unchecked.defaultof<JsonElement> }
67-
inventory.Register(name, version, adapter)
67+
// The inventory stores an `AgentHandler` purely as a presence
68+
// marker — `JobSubmitFlow` discards it and dispatches via
69+
// `agentHandlers` keyed by `name@version`. The placeholder
70+
// raises so any regression that routes through it surfaces
71+
// loudly instead of returning a garbage JsonElement.
72+
let placeholder : AgentHandler =
73+
fun _ ->
74+
raise (InvalidOperationException
75+
(sprintf "ARCP runtime invariant: inventory placeholder invoked for %s@%s" name version))
76+
inventory.Register(name, version, placeholder)
6877

6978
/// Register an agent under the default version (`default`).
7079
member _.RegisterAgent(name: string, handler: ArcpAgentHandler) : unit =

src/Arcp.Runtime/Store/EventLog.fs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -105,11 +105,16 @@ type internal EventLog(options: EventLogOptions) =
105105
member _.EvictExpired() : int =
106106
let now = options.TimeProvider.GetUtcNow()
107107
let cutoff = now.AddSeconds(-float options.ResumeWindowSec)
108-
let mutable evicted = 0
109-
for kvp in perSession do
110-
lock kvp.Value (fun () ->
111-
let mutable i = 0
112-
while i < kvp.Value.Count && kvp.Value.[i].Timestamp < cutoff do
113-
kvp.Value.RemoveAt 0
114-
evicted <- evicted + 1)
115-
evicted
108+
// The per-session buffer is a mutable `List<T>` from the BCL
109+
// (chosen for O(1) Add + RemoveAt 0 amortised semantics under
110+
// a lock); eviction has to mutate it in place.
111+
let evictOne (list: List<EventLogEntry>) : int =
112+
lock list (fun () ->
113+
let rec drop removed =
114+
if list.Count > 0 && list.[0].Timestamp < cutoff then
115+
list.RemoveAt 0
116+
drop (removed + 1)
117+
else removed
118+
drop 0)
119+
perSession
120+
|> Seq.sumBy (fun kvp -> evictOne kvp.Value)

0 commit comments

Comments
 (0)