Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions backend/migrations/audit_log.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
-- Phase 6: append-only audit log.
--
-- Every LLM call and every tool invocation lands here. Required before paid
-- data connectors so we can prove who saw what data and which model handled
-- it. Updates and deletes are blocked at the DB layer — entries are immutable
-- once written.
--
-- user_email is denormalized for forensic retention: if a user row is later
-- deleted, the FK on user_id gets NULLed (or row removed if cascade applies)
-- but the email still appears so investigators can identify the actor.

CREATE TABLE IF NOT EXISTS public.audit_log (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
user_id uuid REFERENCES auth.users(id) ON DELETE SET NULL,
user_email text,
event_type text NOT NULL CHECK (event_type IN (
'llm_call',
'tool_call',
'connector_fetch',
'document_upload',
'document_download'
)),
model text,
provider text,
tool_name text,
connector_id text,
project_id uuid REFERENCES public.projects(id) ON DELETE SET NULL,
document_ids uuid[],
source_license_scopes text[],
routing_policy_applied jsonb,
input_hash text,
output_hash text,
input_tokens integer,
output_tokens integer,
duration_ms integer,
status text NOT NULL CHECK (status IN ('success', 'error', 'blocked')),
error_message text,
created_at timestamptz NOT NULL DEFAULT now()
);

CREATE INDEX IF NOT EXISTS idx_audit_log_user_created
ON public.audit_log(user_id, created_at DESC);

CREATE INDEX IF NOT EXISTS idx_audit_log_project_created
ON public.audit_log(project_id, created_at DESC)
WHERE project_id IS NOT NULL;

CREATE INDEX IF NOT EXISTS idx_audit_log_event_type
ON public.audit_log(event_type);

CREATE OR REPLACE FUNCTION public.prevent_audit_log_modification()
RETURNS TRIGGER AS $$
BEGIN
RAISE EXCEPTION 'audit_log entries are immutable';
END;
$$ LANGUAGE plpgsql;

DROP TRIGGER IF EXISTS audit_log_no_update ON public.audit_log;
CREATE TRIGGER audit_log_no_update BEFORE UPDATE ON public.audit_log
FOR EACH ROW EXECUTE PROCEDURE public.prevent_audit_log_modification();

DROP TRIGGER IF EXISTS audit_log_no_delete ON public.audit_log;
CREATE TRIGGER audit_log_no_delete BEFORE DELETE ON public.audit_log
FOR EACH ROW EXECUTE PROCEDURE public.prevent_audit_log_modification();

REVOKE ALL ON public.audit_log FROM anon, authenticated;
64 changes: 64 additions & 0 deletions backend/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -365,3 +365,67 @@ revoke all on public.tabular_cells from anon, authenticated;
revoke all on public.tabular_review_chats from anon, authenticated;
revoke all on public.tabular_review_chat_messages from anon, authenticated;
revoke all on public.user_api_keys from anon, authenticated;

-- ---------------------------------------------------------------------------
-- Audit log (Phase 6)
-- ---------------------------------------------------------------------------
--
-- Append-only record of LLM calls, tool invocations, and connector fetches.
-- See backend/migrations/audit_log.sql for the canonical migration and the
-- rationale for immutability + denormalized user_email.

create table if not exists public.audit_log (
id uuid primary key default gen_random_uuid(),
user_id uuid references auth.users(id) on delete set null,
user_email text,
event_type text not null check (event_type in (
'llm_call',
'tool_call',
'connector_fetch',
'document_upload',
'document_download'
)),
model text,
provider text,
tool_name text,
connector_id text,
project_id uuid references public.projects(id) on delete set null,
document_ids uuid[],
source_license_scopes text[],
routing_policy_applied jsonb,
input_hash text,
output_hash text,
input_tokens integer,
output_tokens integer,
duration_ms integer,
status text not null check (status in ('success', 'error', 'blocked')),
error_message text,
created_at timestamptz not null default now()
);

create index if not exists idx_audit_log_user_created
on public.audit_log(user_id, created_at desc);

create index if not exists idx_audit_log_project_created
on public.audit_log(project_id, created_at desc)
where project_id is not null;

create index if not exists idx_audit_log_event_type
on public.audit_log(event_type);

create or replace function public.prevent_audit_log_modification()
returns trigger as $$
begin
raise exception 'audit_log entries are immutable';
end;
$$ language plpgsql;

drop trigger if exists audit_log_no_update on public.audit_log;
create trigger audit_log_no_update before update on public.audit_log
for each row execute procedure public.prevent_audit_log_modification();

drop trigger if exists audit_log_no_delete on public.audit_log;
create trigger audit_log_no_delete before delete on public.audit_log
for each row execute procedure public.prevent_audit_log_modification();

revoke all on public.audit_log from anon, authenticated;
2 changes: 2 additions & 0 deletions backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { tabularRouter } from "./routes/tabular";
import { workflowsRouter } from "./routes/workflows";
import { userRouter } from "./routes/user";
import { downloadsRouter } from "./routes/downloads";
import { auditRouter } from "./routes/audit";

try {
encryptionKey();
Expand Down Expand Up @@ -126,6 +127,7 @@ app.use("/workflows", workflowsRouter);
app.use("/user", userRouter);
app.use("/users", userRouter);
app.use("/download", downloadsRouter);
app.use("/audit-log", auditRouter);

app.get("/health", (_req, res) => res.json({ ok: true }));

Expand Down
90 changes: 90 additions & 0 deletions backend/src/lib/audit.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import { createHash } from "crypto";
import type { createServerSupabase } from "./supabase";

export type AuditEventType =
| "llm_call"
| "tool_call"
| "connector_fetch"
| "document_upload"
| "document_download";

export type AuditStatus = "success" | "error" | "blocked";

export type LicenseScope = "public" | "licensed" | "internal";

export interface AuditEntry {
eventType: AuditEventType;
userId: string;
userEmail?: string;
model?: string;
provider?: string;
toolName?: string;
connectorId?: string;
projectId?: string | null;
documentIds?: string[];
sourceLicenseScopes?: LicenseScope[];
routingPolicyApplied?: Record<string, unknown>;
inputHash?: string;
outputHash?: string;
inputTokens?: number;
outputTokens?: number;
durationMs?: number;
status: AuditStatus;
errorMessage?: string;
}

type Db = ReturnType<typeof createServerSupabase>;

export function hashContent(content: string): string {
return createHash("sha256").update(content, "utf8").digest("hex");
}

function isAuditEnabled(): boolean {
const raw = process.env.AUDIT_LOG_ENABLED;
if (raw === undefined) return true;
return !/^(0|false|no|off)$/i.test(raw.trim());
}

/**
* Insert a single audit_log row. Fire-and-forget semantics: errors are
* logged but never propagated — audit failures must not break the user
* request. Callers may still `await` to ensure ordering within a turn.
*/
export async function recordAudit(entry: AuditEntry, db: Db): Promise<void> {
if (!isAuditEnabled()) return;

try {
const row = {
user_id: entry.userId,
user_email: entry.userEmail ?? null,
event_type: entry.eventType,
model: entry.model ?? null,
provider: entry.provider ?? null,
tool_name: entry.toolName ?? null,
connector_id: entry.connectorId ?? null,
project_id: entry.projectId ?? null,
document_ids: entry.documentIds?.length ? entry.documentIds : null,
source_license_scopes: entry.sourceLicenseScopes?.length
? entry.sourceLicenseScopes
: null,
routing_policy_applied: entry.routingPolicyApplied ?? null,
input_hash: entry.inputHash ?? null,
output_hash: entry.outputHash ?? null,
input_tokens: entry.inputTokens ?? null,
output_tokens: entry.outputTokens ?? null,
duration_ms: entry.durationMs ?? null,
status: entry.status,
error_message: entry.errorMessage ?? null,
};

const { error } = await db.from("audit_log").insert(row);
if (error) {
console.error("[audit] insert failed:", error.message);
}
} catch (err) {
console.error(
"[audit] unexpected failure:",
err instanceof Error ? err.message : String(err),
);
}
}
9 changes: 9 additions & 0 deletions backend/src/lib/chatTools.ts
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@ export async function runLLMStream(params: {
docStore: DocStore;
docIndex: DocIndex;
userId: string;
userEmail?: string;
db: ReturnType<typeof createServerSupabase>;
write: (s: string) => void;
workflowStore?: WorkflowStore;
Expand All @@ -435,6 +436,7 @@ export async function runLLMStream(params: {
docStore,
docIndex,
userId,
userEmail,
db,
write,
workflowStore,
Expand All @@ -453,6 +455,7 @@ export async function runLLMStream(params: {

const toolCtx: ToolContext = {
userId,
userEmail,
db,
docStore,
docIndex,
Expand Down Expand Up @@ -550,6 +553,12 @@ export async function runLLMStream(params: {
maxIterations: 10,
apiKeys,
enableThinking: true,
audit: {
userId,
userEmail,
projectId: projectId ?? null,
db,
},
callbacks: {
onContentDelta: (delta) => {
iterText += delta;
Expand Down
59 changes: 56 additions & 3 deletions backend/src/lib/llm/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,70 @@ import { streamGemini, completeGeminiText } from "./gemini";
import { streamOpenAI, completeOpenAIText } from "./openai";
import { providerForModel } from "./models";
import type { StreamChatParams, StreamChatResult, UserApiKeys } from "./types";
import { recordAudit, hashContent } from "../audit";

export * from "./types";
export * from "./models";

function summarizeInputForAudit(params: StreamChatParams): string {
const messageSummary = params.messages
.map((m) => `${m.role}:${m.content}`)
.join("\n---\n");
return `${params.systemPrompt}\n---\n${messageSummary}`;
}

export async function streamChatWithTools(
params: StreamChatParams,
): Promise<StreamChatResult> {
const provider = providerForModel(params.model);
if (provider === "claude") return streamClaude(params);
if (provider === "openai") return streamOpenAI(params);
return streamGemini(params);
const startedAt = Date.now();
try {
const result =
provider === "claude"
? await streamClaude(params)
: provider === "openai"
? await streamOpenAI(params)
: await streamGemini(params);

if (params.audit) {
await recordAudit(
{
eventType: "llm_call",
userId: params.audit.userId,
userEmail: params.audit.userEmail,
projectId: params.audit.projectId ?? null,
model: params.model,
provider,
inputHash: hashContent(summarizeInputForAudit(params)),
outputHash: hashContent(result.fullText ?? ""),
durationMs: Date.now() - startedAt,
status: "success",
},
params.audit.db,
);
}
return result;
} catch (err) {
if (params.audit) {
await recordAudit(
{
eventType: "llm_call",
userId: params.audit.userId,
userEmail: params.audit.userEmail,
projectId: params.audit.projectId ?? null,
model: params.model,
provider,
inputHash: hashContent(summarizeInputForAudit(params)),
durationMs: Date.now() - startedAt,
status: "error",
errorMessage:
err instanceof Error ? err.message : String(err),
},
params.audit.db,
);
}
throw err;
}
}

export async function completeText(params: {
Expand Down
17 changes: 17 additions & 0 deletions backend/src/lib/llm/types.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import type { createServerSupabase } from "../supabase";

// Shared types for the LLM provider adapter.
// Callers always speak OpenAI-style tools + { role, content } messages; each
// provider translates internally.
Expand Down Expand Up @@ -65,6 +67,21 @@ export type StreamChatParams = {
* the reintroduction plan).
*/
documentFilenames?: string[];
/**
* Optional audit metadata. When provided, the LLM adapter records one
* audit_log row per streamChatWithTools call (success or error). Omit in
* call sites that have no user context (rare — most callers should pass
* it through). The tool dispatcher logs its own tool_call rows
* independently, so missing this only suppresses the llm_call row.
*/
audit?: LlmAuditContext;
};

export type LlmAuditContext = {
userId: string;
userEmail?: string;
projectId?: string | null;
db: ReturnType<typeof createServerSupabase>;
};

export type StreamChatResult = {
Expand Down
Loading
Loading