Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/policy-audit-denominator.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'nexus-agents': minor
---

feat(observability): durable per-evaluation policy-audit summary + would-block rate (#3727)

The pipeline policy gate's durable audit log recorded only per-violation records (the numerator) — a clean/allowed evaluation wrote nothing, so the would-block RATE had no denominator and couldn't be computed from the durable log. `evaluatePipelinePolicy` now appends ONE per-evaluation summary record (`recordKind: 'summary'`, carrying `violationCount`) on EVERY evaluation including clean ones, in addition to the existing per-violation records (`recordKind: 'violation'`) — preserving the #3710 count-parity invariant. A new `computePolicyWouldBlockRate(events)` helper computes the rate from the summary records (denominator) and those with violations (numerator). The discriminator + count round-trip through the audit bridge into the persisted record. Decided by a 7/7 higher_order vote (capture-now, since warn-mode soak data is non-backfillable). Live-routing use of the rate stays gated on the #3769-enforce readiness gate.
22 changes: 16 additions & 6 deletions packages/nexus-agents/src/pipeline/dev-pipeline.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -791,11 +791,17 @@ describe('runDevPipeline — durable policy-audit persistence (#3710)', () => {
expect(result.completed).toBe(true);
const events = storage.getAll();
const policyGate = events.filter((e) => e.action === 'security.policy_gate');
// Exactly one durable policy_gate record persisted, with mode/ruleIds/stageType.
expect(policyGate).toHaveLength(1);
expect(policyGate[0]!.metadata?.['mode']).toBe('warn');
expect(policyGate[0]!.metadata?.['ruleIds']).toEqual(['trust-tier']);
expect(policyGate[0]!.metadata?.['stageType']).toBe('execute');
// #3727: one per-violation record (#3710) + one per-evaluation summary record.
const violationRec = policyGate.filter((e) => e.metadata?.['recordKind'] === 'violation');
const summaryRec = policyGate.filter((e) => e.metadata?.['recordKind'] === 'summary');
expect(violationRec).toHaveLength(1);
expect(summaryRec).toHaveLength(1);
// The per-violation record carries mode/ruleIds/stageType.
expect(violationRec[0]!.metadata?.['mode']).toBe('warn');
expect(violationRec[0]!.metadata?.['ruleIds']).toEqual(['trust-tier']);
expect(violationRec[0]!.metadata?.['stageType']).toBe('execute');
// The summary record carries the per-evaluation denominator signal.
expect(summaryRec[0]!.metadata?.['violationCount']).toBe(1);
// The persisted chain verifies.
expect(verifyChain(events).ok).toBe(true);

Expand All @@ -818,7 +824,11 @@ describe('runDevPipeline — durable policy-audit persistence (#3710)', () => {

const events = storage.getAll();
const policyGate = events.filter((e) => e.action === 'security.policy_gate');
expect(policyGate).toHaveLength(6); // one per run, no drops or dupes
// #3727: each run now appends one violation record + one summary record.
const violationRec = policyGate.filter((e) => e.metadata?.['recordKind'] === 'violation');
const summaryRec = policyGate.filter((e) => e.metadata?.['recordKind'] === 'summary');
expect(violationRec).toHaveLength(6); // one per run, no drops or dupes
expect(summaryRec).toHaveLength(6); // one summary per run
expect(verifyChain(events).ok).toBe(true);

await auditLogger.close();
Expand Down
111 changes: 101 additions & 10 deletions packages/nexus-agents/src/pipeline/policy-evaluator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { PolicyEngine } from './policy-engine.js';
import { EventBus } from './event-bus.js';
import { evaluatePipelinePolicy, getPolicyMode } from './policy-evaluator.js';
import type { PolicyContext, PolicyRule } from './policy-engine.js';
import { createAuditTrail } from '../security/audit-trail.js';
import { createAuditTrail, computePolicyWouldBlockRate } from '../security/audit-trail.js';
import { securityAuditEventToInput } from '../security/audit-bridge.js';
import type { AuditEvent as SecurityAuditEvent } from '../security/audit-trail.js';

Expand Down Expand Up @@ -215,9 +215,13 @@ describe('evaluatePipelinePolicy — durable dual-emit (#3710)', () => {

// Bus emit unchanged (back-compat).
expect(eventBus.query({ type: 'policy.evaluated' })).toHaveLength(1);
// Durable sink also received exactly one policy_gate event.
expect(events).toHaveLength(1);
// Durable sink: the per-violation record (#3710) + the #3727 summary record.
const violationRecs = events.filter(
(e) => e.type === 'policy_gate' && e.recordKind === 'violation'
);
expect(violationRecs).toHaveLength(1);
expect(events[0]!.type).toBe('policy_gate');
expect(events).toHaveLength(2); // 1 violation + 1 summary (#3727)
});

it('mode/ruleIds/stageType ROUND-TRIP into the persisted durable AuditEvent (warn)', () => {
Expand Down Expand Up @@ -264,8 +268,16 @@ describe('evaluatePipelinePolicy — durable dual-emit (#3710)', () => {

const busCount = eventBus.query({ type: 'policy.evaluated' }).length;
expect(busCount).toBe(3);
expect(events).toHaveLength(3); // exactly one durable record per violation
expect(trail.size).toBe(3); // no duplicate appends
// #3710 parity is now scoped to the per-violation records (recordKind).
const violationRecs = events.filter(
(e) => e.type === 'policy_gate' && e.recordKind === 'violation'
);
expect(violationRecs).toHaveLength(3); // exactly one durable record per violation
const summaryRecs = events.filter(
(e) => e.type === 'policy_gate' && e.recordKind === 'summary'
);
expect(summaryRecs).toHaveLength(1); // #3727: one per-evaluation summary
expect(trail.size).toBe(4); // 3 violations + 1 summary; no duplicate appends
});

it('no-sink path is byte-identical: omitting auditTrail produces no durable side effect', () => {
Expand All @@ -281,16 +293,95 @@ describe('evaluatePipelinePolicy — durable dual-emit (#3710)', () => {

// Returned result is identical regardless of the sink.
expect(noTrail).toEqual(withTrail);
// The no-sink run produced no durable events at all.
expect(events).toHaveLength(1); // only the with-trail run appended
// The no-sink run produced no durable events at all; the with-trail run
// appended 1 violation + 1 summary (#3727).
expect(events).toHaveLength(2); // only the with-trail run appended
});

it('no violations: durable trail receives nothing', () => {
it('no violations: durable trail receives ONE summary record (#3727 denominator), no violation records', () => {
engine.registerRule(createPassingRule('ok'));
const { trail, events } = captureTrail();
evaluatePipelinePolicy({ engine, eventBus, mode: 'warn', auditTrail: trail }, ctx);
expect(events).toHaveLength(0);
expect(trail.size).toBe(0);
// #3727: a CLEAN evaluation now writes exactly one per-evaluation SUMMARY
// record (the denominator) where it previously wrote nothing.
expect(events).toHaveLength(1);
const rec = events[0]!;
if (rec.type !== 'policy_gate') throw new Error('unreachable');
expect(rec.recordKind).toBe('summary');
expect(rec.violationCount).toBe(0);
expect(rec.allowed).toBe(true);
expect(trail.size).toBe(1);
});
});

describe('evaluatePipelinePolicy — per-evaluation summary + would-block rate (#3727)', () => {
const ctx = createContext({ stageType: 'execute', stageId: 'consensus-to-execute' });

function captureTrail(): {
trail: ReturnType<typeof createAuditTrail>;
events: SecurityAuditEvent[];
} {
const events: SecurityAuditEvent[] = [];
const trail = createAuditTrail((e) => events.push(e));
return { trail, events };
}

it('an N-violation evaluation writes N violation records + 1 summary(violationCount=N)', () => {
const engine = new PolicyEngine();
engine.registerRule(createBlockingRule('a'));
engine.registerRule(createBlockingRule('b'));
const { trail, events } = captureTrail();

evaluatePipelinePolicy(
{ engine, eventBus: new EventBus(), mode: 'warn', auditTrail: trail },
ctx
);

const violations = events.filter(
(e) => e.type === 'policy_gate' && e.recordKind === 'violation'
);
const summaries = events.filter((e) => e.type === 'policy_gate' && e.recordKind === 'summary');
expect(violations).toHaveLength(2); // #3710 parity preserved
expect(summaries).toHaveLength(1);
const summary = summaries[0]!;
if (summary.type !== 'policy_gate') throw new Error('unreachable');
expect(summary.violationCount).toBe(2);
expect(summary.allowed).toBe(true); // warn mode continues
// The discriminator + count MUST round-trip into the PERSISTED durable record
// (the readiness gate reads persisted records, not in-memory events).
const durable = securityAuditEventToInput(summary);
expect(durable.metadata?.['recordKind']).toBe('summary');
expect(durable.metadata?.['violationCount']).toBe(2);
});

it('computePolicyWouldBlockRate counts the denominator from summary records only', () => {
const { trail, events } = captureTrail();
// Two clean evaluations + one with a violation.
const clean = new PolicyEngine();
clean.registerRule(createPassingRule('ok'));
evaluatePipelinePolicy(
{ engine: clean, eventBus: new EventBus(), mode: 'warn', auditTrail: trail },
ctx
);
evaluatePipelinePolicy(
{ engine: clean, eventBus: new EventBus(), mode: 'warn', auditTrail: trail },
ctx
);
const blocking = new PolicyEngine();
blocking.registerRule(createBlockingRule('x'));
evaluatePipelinePolicy(
{ engine: blocking, eventBus: new EventBus(), mode: 'warn', auditTrail: trail },
ctx
);

const rate = computePolicyWouldBlockRate(events);
expect(rate.evaluations).toBe(3); // 3 summary records (the denominator) — NOT the violation record
expect(rate.wouldBlock).toBe(1); // one evaluation had a violation
expect(rate.rate).toBeCloseTo(1 / 3);
});

it('rate is 0 with no evaluations (no spurious signal)', () => {
expect(computePolicyWouldBlockRate([])).toEqual({ evaluations: 0, wouldBlock: 0, rate: 0 });
});
});

Expand Down
39 changes: 39 additions & 0 deletions packages/nexus-agents/src/pipeline/policy-evaluator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,12 @@ export function evaluatePipelinePolicy(
logViolations(context, violations, mode);
}

// #3727: ALWAYS append one durable per-evaluation summary record (the
// denominator), including clean evaluations, so the would-block rate is
// computable from the durable log over the soak window. Supplement to the
// violation-gated emits above — the in-memory bus emit stays violation-only.
emitDurablePolicyEvaluationSummary(options.auditTrail, context, violations.length, mode);

const allowed = mode === 'warn' || violations.length === 0;
return { allowed, violations, mode };
}
Expand Down Expand Up @@ -308,10 +314,43 @@ function emitDurablePolicyEvents(
mode,
ruleIds: [v.ruleId],
stageType: context.stageType,
// #3727: the existing per-violation records (the NUMERATOR detail). The
// #3710 count-parity assertion scopes to this kind.
recordKind: 'violation',
});
}
}

/**
* Appends ONE durable per-EVALUATION summary record per call — including CLEAN
* (no-violation) evaluations (#3727). This is the DENOMINATOR the would-block
* rate needs: a clean evaluation otherwise writes nothing (the dual-emit above is
* violation-gated), so the soak log could record only the numerator. Emitted in
* ADDITION to the per-violation records (supplement, not replace — preserves the
* #3710 parity invariant). No-op when no trail is wired. Denominator =
* count(recordKind==='summary'); numerator = summaries with violationCount > 0.
*/
function emitDurablePolicyEvaluationSummary(
auditTrail: AuditTrail | undefined,
context: PolicyContext,
violationCount: number,
mode: PolicyMode
): void {
if (auditTrail === undefined) return;
emitPipelinePolicyEvent(auditTrail, {
// Run-level verdict: warn always continues; otherwise allowed iff no violations.
allowed: mode === 'warn' || violationCount === 0,
requiresApproval: false,
inputTrustTier: '4',
violationRules: [],
mode,
ruleIds: [],
stageType: context.stageType,
recordKind: 'summary',
violationCount,
});
}

/** Logs violations at appropriate level based on mode. */
function logViolations(
context: PolicyContext,
Expand Down
31 changes: 23 additions & 8 deletions packages/nexus-agents/src/security/audit-bridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,30 @@ function mapTrust(e: TrustEvent): AuditEventInput {
};
}

/**
* Optional pipeline-policy metadata fields, absent on the security path. Extracted
* so {@link mapPolicyGate} stays under the complexity cap. `actionType`/`mode`/
* `ruleIds`/`stageType` are the #3710 round-trip fields; `recordKind`/
* `violationCount` are #3727 (the summary/violation discriminator + per-evaluation
* count the durable would-block rate needs).
*/
function pipelinePolicyMetadata(e: PolicyEvent): Record<string, unknown> {
return {
...(e.actionType !== undefined ? { actionType: e.actionType } : {}),
...(e.mode !== undefined ? { mode: e.mode } : {}),
...(e.ruleIds !== undefined ? { ruleIds: e.ruleIds } : {}),
...(e.stageType !== undefined ? { stageType: e.stageType } : {}),
...(e.recordKind !== undefined ? { recordKind: e.recordKind } : {}),
...(e.violationCount !== undefined ? { violationCount: e.violationCount } : {}),
};
}

function mapPolicyGate(e: PolicyEvent): AuditEventInput {
const outcome: AuditOutcome = e.allowed ? 'success' : 'denied';
// #3710: the pipeline-policy path carries `mode`/`ruleIds`/`stageType` and no
// `actionType`; these MUST round-trip into the durable metadata so the
// persisted record distinguishes soak(warn) from enforce(block). The security
// path leaves them undefined and keeps its `actionType`-keyed shape.
// #3710/#3727: the pipeline-policy path carries mode/ruleIds/stageType +
// recordKind/violationCount and no actionType; these MUST round-trip into the
// durable metadata (see pipelinePolicyMetadata). The security path leaves them
// undefined and keeps its actionType-keyed shape.
return {
category: 'authorization',
severity: e.allowed ? 'info' : 'warning',
Expand All @@ -79,13 +97,10 @@ function mapPolicyGate(e: PolicyEvent): AuditEventInput {
policyDecision: e.allowed ? 'allow' : 'deny',
...(e.violationRules.length > 0 ? { violationType: e.violationRules.join(',') } : {}),
metadata: {
...(e.actionType !== undefined ? { actionType: e.actionType } : {}),
requiresApproval: e.requiresApproval,
inputTrustTier: e.inputTrustTier,
violationRules: e.violationRules,
...(e.mode !== undefined ? { mode: e.mode } : {}),
...(e.ruleIds !== undefined ? { ruleIds: e.ruleIds } : {}),
...(e.stageType !== undefined ? { stageType: e.stageType } : {}),
...pipelinePolicyMetadata(e),
},
};
}
Expand Down
41 changes: 41 additions & 0 deletions packages/nexus-agents/src/security/audit-trail.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,17 @@ export interface PolicyGateEvent extends AuditEventBase {
readonly ruleIds?: readonly string[];
/** Type of the stage the gate guarded (e.g. `execute`). */
readonly stageType?: string;
/**
* #3727: discriminates a per-EVALUATION SUMMARY record (`'summary'` — emitted
* once per pipeline policy evaluation INCLUDING clean ones, the DENOMINATOR for
* the would-block rate) from a per-VIOLATION record (`'violation'` — the
* existing #3710 per-violation records). Absent for the security policy-gate
* path. Denominator = count(recordKind==='summary'); numerator = summaries with
* `violationCount > 0`. Scope the #3710 count-parity assertion to `'violation'`.
*/
readonly recordKind?: 'summary' | 'violation';
/** #3727: number of violations in THIS evaluation (set on the summary record). */
readonly violationCount?: number;
}

/** Corroboration validation result. */
Expand Down Expand Up @@ -369,6 +380,36 @@ export function emitPipelinePolicyEvent(
});
}

/** The pipeline-policy would-block rate over a window (#3727). */
export interface PolicyWouldBlockRate {
/** Total pipeline-policy evaluations (the denominator — summary records). */
readonly evaluations: number;
/** Evaluations that had ≥1 violation (what enforce mode WOULD block). */
readonly wouldBlock: number;
/** `wouldBlock / evaluations`; 0 when there are no evaluations. */
readonly rate: number;
}

/**
* Compute the pipeline-policy would-block RATE from durable audit events (#3727)
* — the read-side of the per-evaluation summary records, and the signal the
* #3769-enforce soak-readiness gate needs. Denominator = per-evaluation SUMMARY
* records (`recordKind === 'summary'`); numerator = summaries with
* `violationCount > 0` (the would-block fraction, independent of warn/block mode).
* Per-violation records (`recordKind === 'violation'`) are intentionally NOT
* counted — counting them would double-count and break the denominator.
*/
export function computePolicyWouldBlockRate(events: readonly AuditEvent[]): PolicyWouldBlockRate {
let evaluations = 0;
let wouldBlock = 0;
for (const e of events) {
if (e.type !== 'policy_gate' || e.recordKind !== 'summary') continue;
evaluations += 1;
if ((e.violationCount ?? 0) > 0) wouldBlock += 1;
}
return { evaluations, wouldBlock, rate: evaluations > 0 ? wouldBlock / evaluations : 0 };
}

/**
* Records a corroboration validation.
*/
Expand Down
Loading