@@ -32,6 +32,12 @@ type JobContext
3232 streamResultBegin: unit -> ResultId,
3333 onCostMetric: string * decimal -> unit
3434 ) =
35+ // Per-result_id chunk ordering state (§8.4): next expected chunk_seq
36+ // and whether the stream was closed by a `more=false` chunk.
37+ let chunkNext = System.Collections.Generic.Dictionary< string, int64>()
38+ let chunkClosed = System.Collections.Generic.HashSet< string>()
39+ let chunkLock = obj ()
40+
3541 member _.JobId : JobId = jobId
3642 member _.SessionId : SessionId = sessionId
3743 member _.ParentJobId : JobId option = parentJobId
@@ -63,10 +69,21 @@ type JobContext
6369 member _.RotateCredentialAsync ( credentialId : string , newValue : string , ct : CancellationToken ) : Task =
6470 rotateCredential ( credentialId, newValue, ct)
6571
72+ /// Emit a ` progress ` event (§8.2.1). ` current ` MUST be non-negative
73+ /// (rejected with INVALID_REQUEST otherwise); when ` total ` is present
74+ /// ` current ` is clamped to ` total ` so the wire invariant holds.
6675 member _.EmitProgressAsync
6776 ( current : decimal , total : decimal option , units : string option , message : string option , _ct : CancellationToken )
6877 : Task =
69- emit ( JobEventBody.Progress( current, total, units, message))
78+ if current < 0 m then
79+ raise ( ArcpException( ARCPError.InvalidRequest( " progress.current must be non-negative" , None)))
80+
81+ let clamped =
82+ match total with
83+ | Some t when current > t -> t
84+ | _ -> current
85+
86+ emit ( JobEventBody.Progress( clamped, total, units, message))
7087
7188 /// Emit a ` metric ` event. Names starting with ` cost. ` and a
7289 /// budgeted ` unit ` decrement the matching budget counter
@@ -80,7 +97,12 @@ type JobContext
8097 _ct : CancellationToken
8198 ) : Task =
8299 if value < 0 m then
83- Task.CompletedTask
100+ // §9.6: negative cost metrics are rejected; other negative
101+ // metrics are not governed by §9.6 and still flow through.
102+ if name.StartsWith( " cost." ) then
103+ raise ( ArcpException( ARCPError.InvalidRequest( " cost metric value must be non-negative" , None)))
104+ else
105+ emit ( JobEventBody.Metric( name, value, unit, dimensions))
84106 else
85107 // §9.6: `cost.budget.*` is budget telemetry (e.g.
86108 // `cost.budget.remaining`), not a charge — it must not
@@ -135,6 +157,36 @@ type JobContext
135157 more : bool ,
136158 _ct : CancellationToken
137159 ) : Task =
160+ // §8.4: chunk_seq is 0-based monotonic per result_id and chunks
161+ // MUST be emitted in order; nothing may follow a `more=false`
162+ // chunk. Enforce both before anything reaches the wire.
163+ lock chunkLock ( fun () ->
164+ if chunkClosed.Contains resultId.Value then
165+ raise (
166+ ArcpException(
167+ ARCPError.InternalError( sprintf " result_id %s already completed; no further chunks allowed" resultId.Value)
168+ )
169+ )
170+
171+ let expected =
172+ match chunkNext.TryGetValue resultId.Value with
173+ | true , n -> n
174+ | _ -> 0 L
175+
176+ if chunkSeq <> expected then
177+ raise (
178+ ArcpException(
179+ ARCPError.InternalError(
180+ sprintf " out-of-order chunk_seq %d (expected %d ) for result_id %s " chunkSeq expected resultId.Value
181+ )
182+ )
183+ )
184+
185+ chunkNext.[ resultId.Value] <- expected + 1 L
186+
187+ if not more then
188+ chunkClosed.Add resultId.Value |> ignore)
189+
138190 let encoded =
139191 match encoding with
140192 | ChunkEncoding.Utf8 -> Encoding.UTF8.GetString( data.Span)
0 commit comments