Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 223 additions & 0 deletions docs/feature_units/completed/FU-112/FU-112_spec.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
# Feature Unit: FU-112 Storage Infrastructure

**Status:** In Progress
**Priority:** P0 (Critical)
**Risk Level:** Medium
**Target Release:** v0.2.0
**Owner:** Engineering Team
**Reviewers:** Tech Lead
**Created:** 2025-12-18
**Last Updated:** 2025-12-18

---

## Overview

**Brief Description:**
Build deterministic storage infrastructure for ingestion by persisting failed uploads into an `upload_queue` table, wiring the HTTP upload path to enqueue retries, and tracking per-user storage/interpretation usage via `storage_usage`. This lays the foundation for the async upload worker (FU-130) and strict quota enforcement (FU-136).

**User Value:**
Users can rely on uploads completing even when object storage is briefly unavailable, and they gain transparent quota management once usage surfaces are connected. This prevents silent failures and offers predictable enforcement before monetisation tiers roll out.

**Defensible Differentiation:**
Deterministic retry queues plus explicit quota tracking reinforce Neotoma’s privacy-first/deterministic positioning—every byte is accounted for with provenance, no background ingestion occurs without user intent, and cross-platform agents can trust consistent retry semantics.

**Technical Approach:**
- Extend the schema (per `docs/subsystems/schema.md` + payload model) with `upload_queue` and `storage_usage`, including helper functions for cumulative counters.
- Persist failed uploads to disk (`UPLOAD_QUEUE_DIR`) and mirror metadata in `upload_queue` so FU-130 can replay.
- Update `upload_file` HTTP action to hash files, enqueue failures, and increment usage on success.
- Emit metrics/logs to observe retry depth, failures, and quota hits.
- Keep logic user-scoped (default single-user UUID today) to remain compatible with upcoming multi-user/RLS work.

---

## Requirements

### Functional Requirements
1. **Upload Queue Persistence:** Create `upload_queue` rows with deterministic `content_hash`, `temp_file_path`, `bucket`, `object_path`, `byte_size`, `user_id`, retry bookkeeping fields, and metadata needed to rebuild payload context.
2. **HTTP Upload Integration:** `POST /upload_file` MUST enqueue a retry when Supabase Storage upload fails instead of returning a raw 500. Response must include queue ID and `storage_status: "pending_retry"`.
3. **Storage Usage Tracking:** Create `storage_usage` table plus `increment_storage_usage(p_user_id, p_bytes)` and `increment_interpretation_count(p_user_id)` functions.
4. **Automatic Usage Increment:** Successful upload + record creation MUST invoke `increment_storage_usage` with byte size to keep quotas authoritative. Failures log warnings without blocking ingestion.
5. **Quota Surfaces:** Provide RPC-level enforcement hooks so FU-136 can reject requests once `total_bytes` or `interpretation_count_month` exceed configured limits.

### Non-Functional Requirements
1. **Performance:** Queue insert ≤ 10 ms (local disk write + DB insert). Usage RPC ≤ 5 ms.
2. **Determinism:** `content_hash` uses SHA-256 of raw bytes; queue metadata derived solely from request payload.
3. **Consistency:** Strong consistency—queue rows and usage counters written in same request transaction scope; no eventual consistency.
4. **Reliability:** Queue directory must survive process restarts (path configurable). Files cleaned after successful retry.
5. **Security/Privacy:** Temp files stored locally with restrictive permissions; queue metadata contains no raw PII beyond IDs/hash.

### Invariants (MUST/MUST NOT)
**MUST**
- Hash every upload and store `content_hash` + byte size before enqueue.
- Attach `user_id` to every queue/usage row (default single-user UUID until FU-701 introduces true multi-user).
- Enable RLS on new tables with service-role only policies.
- Maintain sorted retry schedule via `next_retry_at`.

**MUST NOT**
- MUST NOT delete original upload metadata after enqueue; worker depends on it.
- MUST NOT auto-retry uploads synchronously (queue + worker only).
- MUST NOT mutate `storage_usage.total_bytes` outside stored procedures/RPC.
- MUST NOT log file contents or queue temp-file paths in plaintext user-facing logs.

---

## Affected Subsystems

- **Ingestion / Storage:** `upload_file` HTTP route, payload compilation (future), Supabase Storage buckets.
- **Database Schema:** New tables/functions per `docs/subsystems/schema.md`.
- **Observability:** Metrics + logs defined in `docs/architecture/ingestion/sources-first_ingestion_v12_final.md`.
- **Quota Enforcement:** `storage_usage` feeds FU-136 strict rejection logic.

**Dependencies:**
- FU-110 (Sources table / payload onboarding) supplies canonical payload metadata.
- FU-135 (interpretation timeout columns) shares release batch but independent.
- FU-130 (Upload Queue Processor) and FU-136 (Strict Quota Enforcement) depend on this FU.

**Documentation to Load:**
- `docs/subsystems/schema.md` (schema canon)
- `docs/subsystems/sources.md` (ingestion + quotas)
- `docs/architecture/ingestion/sources-first_ingestion_v12_final.md` (queue/worker flow)
- `docs/architecture/payload_model.md` (payload-first ingestion)
- `docs/testing/testing_standard.md` (required coverage)

---

## Schema Changes

### Tables

- **`upload_queue`**
- `id UUID PRIMARY KEY DEFAULT gen_random_uuid()`
- `temp_file_path TEXT NOT NULL` (absolute path in `UPLOAD_QUEUE_DIR`)
- `bucket TEXT NOT NULL` (Supabase bucket name)
- `object_path TEXT NOT NULL` (intended storage key)
- `content_hash TEXT NOT NULL` (SHA-256)
- `byte_size BIGINT NOT NULL`
- `retry_count INTEGER NOT NULL DEFAULT 0`
- `max_retries INTEGER NOT NULL DEFAULT 5`
- `next_retry_at TIMESTAMPTZ NOT NULL DEFAULT NOW()`
- `error_message TEXT`
- `metadata JSONB NOT NULL DEFAULT '{}'` (payload + request context until `sources` table lands)
- `user_id UUID NOT NULL`
- `payload_submission_id UUID REFERENCES payload_submissions(id) ON DELETE SET NULL` (optional future linkage)
- Indexes: `idx_upload_queue_next_retry` (partial) + `idx_upload_queue_user`.
- RLS: enable; allow service_role full access only.

- **`storage_usage`**
- `user_id UUID PRIMARY KEY`
- `total_bytes BIGINT NOT NULL DEFAULT 0`
- `total_sources INTEGER NOT NULL DEFAULT 0`
- `last_calculated TIMESTAMPTZ NOT NULL DEFAULT NOW()`
- `interpretation_count_month INTEGER NOT NULL DEFAULT 0`
- `interpretation_limit_month INTEGER NOT NULL DEFAULT 100`
- `billing_month TEXT NOT NULL DEFAULT to_char(NOW(), 'YYYY-MM')`
- Index: primary key suffices (lookup by user).
- RLS: enable; service_role full access (future user policies once auth lands).

### Functions
- `increment_storage_usage(p_user_id UUID, p_bytes BIGINT)`
- Inserts or updates row atomically (`ON CONFLICT DO UPDATE`).
- Increments `total_bytes`, `total_sources`, refreshes `last_calculated`.

- `increment_interpretation_count(p_user_id UUID)`
- Maintains rolling monthly counter keyed by `billing_month`.
- Resets count automatically when month changes.

---

## API / MCP Changes

- **`POST /upload_file` (Actions server)**
- On storage success: call `increment_storage_usage`, log `storage.upload.success_total`.
- On storage failure:
- Persist buffer to `UPLOAD_QUEUE_DIR`.
- Insert row into `upload_queue` with deterministic metadata + captured request context.
- Return `202 Accepted` payload:
```json
{
"status": "pending_retry",
"queue_id": "<uuid>",
"storage_status": "pending_retry"
}
```
- Existing success payload remains unchanged.
- Behavior is deterministic; queue ID surfaces for status tracking (future MCP tool).

No new MCP action is introduced, but `upload_file`’s contract is now explicit about eventual completion semantics and queue hand-offs.

---

## Observability

- **Metrics:**
- `storage.upload.success_total` (counter, labels: bucket)
- `storage.upload.failure_total` (counter, labels: bucket, reason)
- `storage.queue.depth` (gauge, pending rows)
- `storage.queue.processed_total` (counter, labels: status)
- `storage_usage.bytes` (gauge exported from table snapshots)
- `quota.interpretation.exceeded_total` (counter, emitted when FU-136 rejects)

- **Logs:**
- `UploadQueueEnqueue` (info) – fields: queue_id, bucket, object_path, byte_size, retry_count=0.
- `UploadQueueEnqueueFailed` (error) – fields: bucket, error_code, trace_id.
- `StorageUsageIncrementFailed` (warn) – fields: user_id, byte_size, reason.

- **Events:**
- `storage.upload.enqueued`
- `storage.upload.retry_failed` (after max retries, consumed by FU-130)
- `quota.storage.exceeded` (future once FU-136 enforces)

---

## Testing Strategy

**Unit Tests**
1. `enqueueFailedUpload` writes file, inserts DB row, cleans temp file on insert failure.
2. `incrementStorageUsage` calls Supabase RPC with correct params and surfaces errors.
3. SHA-256 hashing helper produces deterministic output (100-run property test).

**Integration Tests** (extend `docs/releases/in_progress/v0.2.0/integration_tests.md`)
- **IT-007 Upload Queue Retry:** Force storage failure → queue row created → future worker drains row → source marked uploaded.
- **IT-010 Interpretation Quota Enforcement:** Use `storage_usage` counters to reject reinterpretation when limit reached (once FU-136 plugs in).
- Mock Supabase storage to simulate transient errors and confirm HTTP 202 path.

**E2E Tests**
- Browser-driven upload (Playwright) verifying user receives “processing” state when queue engaged.
- Bulk upload scenario verifying usage counters increase and appear in UI (future surface).
- Negative path verifying user sees actionable messaging when queue exhausted after max retries.

Coverage expectations: >85 % lines for new services/routes, 100 % of queue logic branches, deterministic hashing property test (100 iterations).

---

## Error Scenarios

| Scenario | Error Code | Message | Recovery |
| --- | --- | --- | --- |
| Supabase Storage down | `STORAGE_PENDING_RETRY` (HTTP 202) | "Upload deferred and will retry automatically" | Worker retries; user can poll status |
| Queue insert fails | `STORAGE_QUEUE_FAILED` (HTTP 500) | "Unable to persist upload retry" | User retries upload manually |
| Usage RPC fails | `USAGE_TRACKING_FAILED` (hidden) | Logged warning only | Ops follow-up; upload succeeds |
| Quota exceeded (future) | `STORAGE_QUOTA_EXCEEDED` | "Monthly storage quota reached" | User upgrades plan or waits next month |

---

## Rollout & Deployment

- **Feature Flags:** None; behavior gates on whether storage upload succeeds. (Future: optional flag to bypass queue in dev).
- **Deployment Order:** Apply migrations → redeploy API server (new services + route behavior).
- **Rollback Plan:**
1. Revert API changes to previous commit (no queue/usage).
2. Leave DB tables in place (safe, additive).
3. Manually delete temp files in `UPLOAD_QUEUE_DIR` if rollback lasts >24h.
- **Monitoring:** Track `storage.queue.depth` and `storage.upload.failure_total` after deploy; alert if depth grows > threshold (e.g., >100 pending for >15 min).

---

## Notes & Open Questions

- `upload_queue.source_id` in older docs maps to `payload_submission_id` once payload model fully replaces legacy records. Until FU-110 lands, queue metadata stores ingestion context in `metadata` JSON.
- Worker implementation (FU-130) must delete temp files and update `payload_submissions`/records accordingly.
- Quota readouts (UI + MCP) will be handled in FU-136 / FU-305 dashboards.
- Need follow-up doc update in `docs/subsystems/sources.md` once payload terminology fully replaces `sources`.

133 changes: 133 additions & 0 deletions docs/feature_units/completed/FU-112/manifest.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
feature_id: "fu_112"
version: "1.0.0"
status: "in_progress"
priority: "p0"
risk_level: "medium"

tags:
- "storage"
- "ingestion"
- "quota"
- "supabase"

metadata:
title: "Storage Infrastructure"
description: "Introduce upload queue retries and per-user storage usage tracking for ingestion resiliency."
owner: "[email protected]"
reviewers:
- "[email protected]"
created_at: "2025-12-18"
target_release: "v0.2.0"

subsystems:
- name: "database"
changes: "New upload_queue + storage_usage tables with helper functions"
- name: "ingestion"
changes: "upload_file route enqueues failed uploads and increments usage on success"
- name: "observability"
changes: "New metrics/logs for queue depth, upload failures, and quota tracking"

schema_changes:
- table: "upload_queue"
operation: "create_table"
columns:
- { name: "id", type: "UUID", primary_key: true, default_value: "gen_random_uuid()" }
- { name: "temp_file_path", type: "TEXT", nullable: false }
- { name: "bucket", type: "TEXT", nullable: false }
- { name: "object_path", type: "TEXT", nullable: false }
- { name: "content_hash", type: "TEXT", nullable: false }
- { name: "byte_size", type: "BIGINT", nullable: false }
- { name: "retry_count", type: "INTEGER", nullable: false, default_value: "0" }
- { name: "max_retries", type: "INTEGER", nullable: false, default_value: "5" }
- { name: "next_retry_at", type: "TIMESTAMPTZ", nullable: false, default_value: "NOW()" }
- { name: "error_message", type: "TEXT", nullable: true }
- { name: "metadata", type: "JSONB", nullable: false, default_value: "'{}'::jsonb" }
- { name: "user_id", type: "UUID", nullable: false }
- { name: "payload_submission_id", type: "UUID", nullable: true, foreign_key: "payload_submissions(id)" }
- { name: "created_at", type: "TIMESTAMPTZ", nullable: false, default_value: "NOW()" }
indexes:
- { name: "idx_upload_queue_next_retry", columns: ["next_retry_at"], where_clause: "retry_count < max_retries" }
- { name: "idx_upload_queue_user", columns: ["user_id"] }
rls:
- policy: "Service role full access"
command: "ALL"
role: "service_role"

- table: "storage_usage"
operation: "create_table"
columns:
- { name: "user_id", type: "UUID", primary_key: true }
- { name: "total_bytes", type: "BIGINT", nullable: false, default_value: "0" }
- { name: "total_sources", type: "INTEGER", nullable: false, default_value: "0" }
- { name: "last_calculated", type: "TIMESTAMPTZ", nullable: false, default_value: "NOW()" }
- { name: "interpretation_count_month", type: "INTEGER", nullable: false, default_value: "0" }
- { name: "interpretation_limit_month", type: "INTEGER", nullable: false, default_value: "100" }
- { name: "billing_month", type: "TEXT", nullable: false, default_value: "to_char(NOW(), 'YYYY-MM')" }
rls:
- policy: "Service role full access"
command: "ALL"
role: "service_role"

- function: "increment_storage_usage"
operation: "create_function"
language: "plpgsql"
body: |
INSERT INTO storage_usage (user_id, total_bytes, total_sources, last_calculated)
VALUES (p_user_id, p_bytes, 1, NOW())
ON CONFLICT (user_id) DO UPDATE SET
total_bytes = storage_usage.total_bytes + p_bytes,
total_sources = storage_usage.total_sources + 1,
last_calculated = NOW();

- function: "increment_interpretation_count"
operation: "create_function"
language: "plpgsql"
body: |
INSERT INTO storage_usage (user_id, interpretation_count_month, billing_month)
VALUES (p_user_id, 1, current_billing_month)
ON CONFLICT (user_id) DO UPDATE SET
interpretation_count_month = CASE
WHEN storage_usage.billing_month = current_billing_month
THEN storage_usage.interpretation_count_month + 1
ELSE 1
END,
billing_month = current_billing_month;

api_changes:
endpoints:
- path: "/upload_file"
method: "POST"
change_type: "modified"
description: "Enqueue failed uploads (202 response) and increment storage usage on success."

observability:
metrics:
- { name: "storage.upload.success_total", type: "counter", labels: ["bucket"] }
- { name: "storage.upload.failure_total", type: "counter", labels: ["bucket", "reason"] }
- { name: "storage.queue.depth", type: "gauge" }
- { name: "storage.queue.processed_total", type: "counter", labels: ["status"] }
- { name: "storage_usage.bytes", type: "gauge" }
logs:
- { level: "info", event: "UploadQueueEnqueue", fields: ["queue_id", "bucket", "object_path", "byte_size"] }
- { level: "warn", event: "StorageUsageIncrementFailed", fields: ["user_id", "byte_size", "reason"] }
events:
- { type: "storage.upload.enqueued", emitted_when: "Upload added to queue" }
- { type: "storage.upload.retry_failed", emitted_when: "Retry count exceeded max" }

testing:
unit_tests:
- { name: "upload_queue.test.ts", description: "Enqueue writes file and DB row, cleans up on error." }
- { name: "storage_usage.test.ts", description: "RPC invoked with correct params and propagates failures." }
integration_tests:
- { id: "IT-007", description: "Upload queue retry flow (docs/releases/in_progress/v0.2.0/integration_tests.md)." }
- { id: "IT-010", description: "Interpretation quota enforcement using storage_usage." }
e2e_tests:
- { name: "Playwright Upload Pending", description: "UI reflects pending retry state when queue engaged." }
- { name: "Bulk upload quota surfaces", description: "Usage counters visible after imports." }

dependencies:
requires:
- "FU-110"
blocks:
- "FU-130"
- "FU-136"
Loading
Loading