diff --git a/.beads/issues.jsonl b/.beads/issues.jsonl index 87e7428..3bb5dd0 100644 --- a/.beads/issues.jsonl +++ b/.beads/issues.jsonl @@ -40,7 +40,7 @@ {"id":"eventcore-0r4","title":"Refactor: Improve mutex error context","status":"tombstone","priority":4,"issue_type":"task","created_at":"2025-12-16T14:49:22.844527718-08:00","updated_at":"2025-12-28T10:49:58.260318957-08:00","dependencies":[{"issue_id":"eventcore-0r4","depends_on_id":"eventcore-017","type":"parent-child","created_at":"2025-12-16T15:29:14.434936008-08:00","created_by":"daemon","metadata":"{}"}],"deleted_at":"2025-12-28T10:49:58.260318957-08:00","deleted_by":"batch delete","delete_reason":"batch delete","original_type":"task"} {"id":"eventcore-1","title":"EventStore Contract Test Suite","description":"Provide reusable test suite that EventStore implementors can run against their implementation to verify contract compliance (version conflict detection, basic read/write, etc.) without copy/pasting tests.","design":"**Public Test API**: New `eventcore::testing` module with public test functions\n**Contract Tests**: test_concurrent_version_conflicts, test_basic_read_write, test_stream_isolation\n**Test Domain**: Minimal ContractTestEvent type for generic testing\n**Usage Pattern**: Implementors call test functions with their EventStore instance\n**Documentation**: Clear examples showing how to use in separate crates\n**Optional Enhancement**: Macro-generated test suite for ergonomics","acceptance_criteria":"Feature: EventStore implementor uses contract test suite\n\nScenario 1: Developer tests InMemoryEventStore with contract suite\n- Imports eventcore::testing::event_store_contract_tests\n- Creates InMemoryEventStore instance\n- Calls test_concurrent_version_conflicts(store)\n- Test passes (verifies version conflict detection works)\n- All other contract tests available and documented\n\nScenario 2: Developer discovers their custom EventStore violates contract\n- Implements custom EventStore with naive append (no version checking)\n- Runs test_concurrent_version_conflicts(store)\n- Test FAILS with clear error message\n- Error explains version conflict wasn't detected\n- Developer fixes implementation, test passes\n\nScenario 3: Future PostgreSQL implementor uses contract suite\n- In eventcore-postgres crate tests\n- Imports public test functions\n- Runs full contract suite against PostgreSQL backend\n- All tests pass, contract verified\n- No need to copy/paste or rewrite tests","status":"tombstone","priority":3,"issue_type":"feature","created_at":"2025-10-29T12:30:04.264036084-07:00","updated_at":"2025-12-28T10:49:58.260318957-08:00","deleted_at":"2025-12-28T10:49:58.260318957-08:00","deleted_by":"batch delete","delete_reason":"batch delete","original_type":"feature"} {"id":"eventcore-2","title":"Single-Stream Command End-to-End","description":"Enable library consumer to create and execute a complete single-stream command with validated domain types, proper error handling, and in-memory event storage. Provides working, testable command execution system.","design":"**Domain Types**: StreamId, EventId, CorrelationId, CausationId (nutype)\n**Error Handling**: Structured hierarchy (EventStoreError, CommandError, ValidationError, ConcurrencyError)\n**Storage**: InMemoryEventStore with optimistic concurrency\n**Command System**: CommandStreams and CommandLogic traits (manual, no macro)\n**Executor**: CommandExecutor orchestrating read → apply → handle → write (NO retry)","acceptance_criteria":"Feature: Developer executes complete single-stream command end-to-end\n\nScenario 1: Developer implements and executes bank account command\n- Creates BankAccount command with StreamId using nutype\n- Implements CommandLogic with apply() and handle()\n- Creates InMemoryEventStore\n- Executes Deposit(account_id, amount: 100)\n- Command succeeds\n- AccountDeposited event is stored with correct metadata\n- Developer can read events from the stream\n- Event contains amount of 100\n\nScenario 2: Developer handles business rule violations with proper errors\n- Account has balance of 50\n- Executes Withdraw command with amount 100\n- CommandError::BusinessRuleViolation is returned\n- Error message explains insufficient funds: balance 50, withdrawal 100\n- Error includes context (account_id, current balance, attempted withdrawal)\n- State is reconstructed via apply() to determine current balance\n\nScenario 3: Developer handles version conflict manually\n- Executes two concurrent Deposit commands on same account\n- Both commands read account at version 0\n- First command writes event, advancing to version 1\n- Second command attempts write expecting version 1\n- ConcurrencyError is returned to developer\n- Developer must handle retry manually (or wait for eventcore-002)\n- No automatic retry occurs\n- Developer can inspect error details (expected vs actual version)","status":"tombstone","priority":1,"issue_type":"feature","assignee":"jwilger","created_at":"2025-10-29T14:18:50.661348377-07:00","updated_at":"2025-12-28T10:49:58.260318957-08:00","deleted_at":"2025-12-28T10:49:58.260318957-08:00","deleted_by":"batch delete","delete_reason":"batch delete","original_type":"feature"} -{"id":"eventcore-20d","title":"Add database triggers to enforce event log immutability","description":"**Background:** Event sourcing requires the event log to be immutable - events are facts that have already occurred and must never be modified or deleted. Currently, this constraint is enforced only at the application level through the EventStore API.\n\n**Goal:** Add PostgreSQL database-level enforcement using triggers/rules to prevent UPDATE and DELETE operations on event log tables. This provides defense-in-depth against:\n- Bugs in application code\n- Direct database access (accidental or malicious)\n- SQL injection vulnerabilities\n- Human error during operations/debugging\n\n**Implementation:** The eventcore-postgres library should create database triggers that:\n1. PREVENT UPDATE operations on event log table(s)\n2. PREVENT DELETE operations on event log table(s)\n3. Raise clear, descriptive errors when violations are attempted\n4. Are created automatically during schema initialization\n\n**References:**\n- Event sourcing immutability principle\n- Defense-in-depth security practice","acceptance_criteria":"Feature: Database enforces event log immutability\n\nScenario: Database prevents UPDATE on event log\n Given PostgresEventStore has been initialized with schema\n And events have been appended to a stream\n When user attempts UPDATE on events table via SQL\n Then database raises error preventing the update\n And error message clearly indicates immutability violation\n\nScenario: Database prevents DELETE on event log\n Given PostgresEventStore has been initialized with schema\n And events have been appended to a stream\n When user attempts DELETE on events table via SQL\n Then database raises error preventing the deletion\n And error message clearly indicates immutability violation\n\nScenario: Triggers are created during schema initialization\n Given fresh PostgreSQL database\n When PostgresEventStore initializes schema\n Then immutability triggers exist on events table\n And trigger metadata indicates purpose (prevent UPDATE/DELETE)\n\nScenario: Application-level appends still work\n Given database with immutability triggers enabled\n When EventStore.append() is called with new events\n Then events are successfully inserted\n And no immutability violations occur","status":"open","priority":1,"issue_type":"feature","created_at":"2025-12-27T10:59:50.366209484-08:00","updated_at":"2025-12-27T11:00:10.209928022-08:00"} +{"id":"eventcore-20d","title":"Add database triggers to enforce event log immutability","description":"**Background:** Event sourcing requires the event log to be immutable - events are facts that have already occurred and must never be modified or deleted. Currently, this constraint is enforced only at the application level through the EventStore API.\n\n**Goal:** Add PostgreSQL database-level enforcement using triggers/rules to prevent UPDATE and DELETE operations on event log tables. This provides defense-in-depth against:\n- Bugs in application code\n- Direct database access (accidental or malicious)\n- SQL injection vulnerabilities\n- Human error during operations/debugging\n\n**Implementation:** The eventcore-postgres library should create database triggers that:\n1. PREVENT UPDATE operations on event log table(s)\n2. PREVENT DELETE operations on event log table(s)\n3. Raise clear, descriptive errors when violations are attempted\n4. Are created automatically during schema initialization\n\n**References:**\n- Event sourcing immutability principle\n- Defense-in-depth security practice","acceptance_criteria":"Feature: Database enforces event log immutability\n\nScenario: Database prevents UPDATE on event log\n Given PostgresEventStore has been initialized with schema\n And events have been appended to a stream\n When user attempts UPDATE on events table via SQL\n Then database raises error preventing the update\n And error message clearly indicates immutability violation\n\nScenario: Database prevents DELETE on event log\n Given PostgresEventStore has been initialized with schema\n And events have been appended to a stream\n When user attempts DELETE on events table via SQL\n Then database raises error preventing the deletion\n And error message clearly indicates immutability violation\n\nScenario: Triggers are created during schema initialization\n Given fresh PostgreSQL database\n When PostgresEventStore initializes schema\n Then immutability triggers exist on events table\n And trigger metadata indicates purpose (prevent UPDATE/DELETE)\n\nScenario: Application-level appends still work\n Given database with immutability triggers enabled\n When EventStore.append() is called with new events\n Then events are successfully inserted\n And no immutability violations occur","status":"closed","priority":1,"issue_type":"feature","created_at":"2025-12-27T10:59:50.366209484-08:00","updated_at":"2025-12-28T11:35:39.33705651-08:00","closed_at":"2025-12-28T11:35:39.33705651-08:00","close_reason":"Implemented PostgreSQL triggers to enforce event log immutability. Added UPDATE and DELETE prevention triggers with tests and documentation."} {"id":"eventcore-26v","title":"Refactor: Extract PostgreSQL query builder","status":"tombstone","priority":3,"issue_type":"task","created_at":"2025-12-16T14:49:15.26678424-08:00","updated_at":"2025-12-28T10:49:58.260318957-08:00","dependencies":[{"issue_id":"eventcore-26v","depends_on_id":"eventcore-017","type":"parent-child","created_at":"2025-12-16T15:29:13.939109498-08:00","created_by":"daemon","metadata":"{}"}],"deleted_at":"2025-12-28T10:49:58.260318957-08:00","deleted_by":"batch delete","delete_reason":"batch delete","original_type":"task"} {"id":"eventcore-2n5","title":"Heartbeat and Liveness Detection","description":"Implement heartbeat-based liveness detection per ADR-024: projectors send heartbeats to maintain leadership, and stale leaders are detected.\n\n## Core Components\n\n- `HeartbeatConfig` with heartbeat_interval and heartbeat_timeout\n- `guard.heartbeat()` method for leadership renewal\n- `guard.is_valid()` incorporates heartbeat validity\n- Integration with ProjectorCoordinator implementations\n\n## Acceptance Criteria\n\nFeature: Developer detects and recovers from hung projectors\n\nScenario: Healthy projector sends heartbeats\n Given projector holds leadership with heartbeat_interval of 5 seconds\n When projector processes events normally\n Then runner calls guard.heartbeat() at least every heartbeat_interval\n And guard.is_valid() continues to return true\n\nScenario: Hung projector loses leadership\n Given projector holds leadership with heartbeat_timeout of 30 seconds\n When projector hangs (infinite loop, deadlock) and stops calling heartbeat()\n Then after heartbeat_timeout, guard.is_valid() returns false\n And runner detects invalid guard and stops processing\n And leadership becomes available for other instances\n\nScenario: New instance takes over from hung projector\n Given first instance is hung and missed heartbeat timeout\n When second instance calls coordinator.try_acquire()\n Then second instance acquires leadership\n And second instance resumes processing from last checkpoint\n And no events are skipped or duplicated\n\nScenario: Developer configures heartbeat interval\n Given developer creates HeartbeatConfig with heartbeat_interval of 10 seconds\n When projector runs\n Then runner automatically calls guard.heartbeat() every 10 seconds\n And interval is respected by the runner loop\n\nScenario: Developer configures heartbeat timeout\n Given developer creates HeartbeatConfig with heartbeat_timeout of 60 seconds\n When projector stops sending heartbeats\n Then leadership is not revoked until 60 seconds elapse\n And timeout provides grace period for slow processing\n\nScenario: Invalid heartbeat configuration is rejected\n Given developer creates HeartbeatConfig\n When heartbeat_timeout is less than or equal to heartbeat_interval\n Then builder returns descriptive error\n And message explains timeout must exceed interval\n When heartbeat_interval is zero or negative\n Then builder returns descriptive error\n\nScenario: PostgresCoordinator implements heartbeat\n Given PostgresCoordinator with heartbeat configuration\n When guard.heartbeat() is called\n Then coordinator updates last-heartbeat timestamp in database\n When guard.is_valid() is called\n Then coordinator checks if heartbeat is within timeout window\n\nScenario: Runner handles slow event processing\n Given event processing may take longer than heartbeat_interval\n When apply() takes 20 seconds but heartbeat_interval is 5 seconds\n Then runner sends heartbeat before calling apply()\n And runner sends heartbeat after apply() completes\n And long-running events do not cause spurious leadership loss","status":"tombstone","priority":2,"issue_type":"feature","created_at":"2025-12-20T22:07:29.365694095-08:00","updated_at":"2025-12-28T11:07:13.983518123-08:00","dependencies":[{"issue_id":"eventcore-2n5","depends_on_id":"eventcore-dvp","type":"blocks","created_at":"2025-12-20T22:22:59.820669376-08:00","created_by":"daemon","metadata":"{}"},{"issue_id":"eventcore-2n5","depends_on_id":"eventcore-5ku","type":"blocks","created_at":"2025-12-20T22:22:59.931463458-08:00","created_by":"daemon","metadata":"{}"}],"deleted_at":"2025-12-28T11:07:13.983518123-08:00","deleted_by":"daemon","delete_reason":"delete","original_type":"feature"} {"id":"eventcore-2tq","title":"Set up crates.io account ownership and CI token","description":"Prerequisites for automated publishing: (1) Verify ownership of all 5 crate names on crates.io, (2) Generate CRATES_IO_TOKEN with publish permissions, (3) Add token to GitHub repository secrets, (4) Document credential rotation procedures in RELEASE_PROCESS.md","status":"tombstone","priority":2,"issue_type":"task","created_at":"2025-12-23T15:55:27.329170906-08:00","updated_at":"2025-12-28T10:49:58.260318957-08:00","dependencies":[{"issue_id":"eventcore-2tq","depends_on_id":"eventcore-3if","type":"blocks","created_at":"2025-12-23T15:56:10.040440007-08:00","created_by":"daemon","metadata":"{}"}],"deleted_at":"2025-12-28T10:49:58.260318957-08:00","deleted_by":"batch delete","delete_reason":"batch delete","original_type":"task"} @@ -63,7 +63,7 @@ {"id":"eventcore-6v0","title":"Move InMemoryEventStore to separate eventcore-memory crate","description":"Extract InMemoryEventStore to eventcore-memory crate for consistency with eventcore-postgres organization pattern.","status":"tombstone","priority":2,"issue_type":"task","created_at":"2025-12-24T14:29:13.224767878-08:00","updated_at":"2025-12-28T10:49:58.260318957-08:00","deleted_at":"2025-12-28T10:49:58.260318957-08:00","deleted_by":"batch delete","delete_reason":"batch delete","original_type":"task"} {"id":"eventcore-6ze","title":"Align all workspace crate versions to 0.2.0 baseline","description":"Current state: eventcore at 0.1.8, all other crates (eventcore-types, eventcore-postgres, eventcore-macros, eventcore-testing) at 0.1.0. This violates the lockstep versioning requirement. All crates must be aligned to a common baseline version (recommend 0.2.0) before automated release tooling can be configured.","status":"tombstone","priority":1,"issue_type":"task","created_at":"2025-12-23T15:55:27.064536585-08:00","updated_at":"2025-12-28T10:49:58.260318957-08:00","dependencies":[{"issue_id":"eventcore-6ze","depends_on_id":"eventcore-3if","type":"blocks","created_at":"2025-12-23T15:56:09.835004801-08:00","created_by":"daemon","metadata":"{}"}],"deleted_at":"2025-12-28T10:49:58.260318957-08:00","deleted_by":"batch delete","delete_reason":"batch delete","original_type":"task"} {"id":"eventcore-6zm","title":"CRITICAL: Add logging for silent database poll errors in Postgres subscription","status":"tombstone","priority":0,"issue_type":"bug","created_at":"2025-12-17T10:41:50.185808638-08:00","updated_at":"2025-12-28T10:49:58.260318957-08:00","dependencies":[{"issue_id":"eventcore-6zm","depends_on_id":"eventcore-017","type":"discovered-from","created_at":"2025-12-17T10:41:50.186726064-08:00","created_by":"daemon","metadata":"{}"}],"deleted_at":"2025-12-28T10:49:58.260318957-08:00","deleted_by":"batch delete","delete_reason":"batch delete","original_type":"bug"} -{"id":"eventcore-74y","title":"Unify contract test macros into single backend_contract_tests! macro","description":"## Problem\n\nContract tests are fragmented across multiple macros:\n- `event_store_contract_tests!` - EventStore trait only\n- `event_reader_contract_tests!` - EventReader trait only\n- `event_store_suite!` - combines both but lists tests explicitly\n\nBackend test files must invoke multiple macros and be updated whenever new contract test categories are added. This violates the principle that adding a new contract test should automatically run for all implementations.\n\n## Current State\n\n`eventcore-postgres/tests/i010_contract_suite_test.rs`:\n```rust\nevent_store_contract_tests! { suite = postgres_contract, ... }\nevent_reader_contract_tests! { suite = postgres_reader_contract, ... }\n```\n\n## Desired State\n\nONE macro that runs ALL contract tests:\n```rust\nbackend_contract_tests! {\n suite = postgres,\n make_store = || { ... },\n}\n```\n\nWhen new contract tests are added (e.g., CheckpointStore), they automatically run for all backends without touching backend test files.\n\n## Acceptance Criteria\n\n1. Single `backend_contract_tests!` macro exported from eventcore-testing\n2. Macro runs ALL contract tests (EventStore, EventReader, future CheckpointStore)\n3. Both InMemory and Postgres test files use only this one macro\n4. Adding a new test function to eventcore-testing automatically runs for both backends\n5. Deprecated macros removed or made private","status":"in_progress","priority":0,"issue_type":"task","created_at":"2025-12-28T12:55:49.699057854-08:00","created_by":"jwilger","updated_at":"2025-12-28T12:56:29.713389522-08:00"} +{"id":"eventcore-74y","title":"Unify contract test macros into single backend_contract_tests! macro","description":"## Problem\n\nContract tests are fragmented across multiple macros:\n- `event_store_contract_tests!` - EventStore trait only\n- `event_reader_contract_tests!` - EventReader trait only\n- `event_store_suite!` - combines both but lists tests explicitly\n\nBackend test files must invoke multiple macros and be updated whenever new contract test categories are added. This violates the principle that adding a new contract test should automatically run for all implementations.\n\n## Current State\n\n`eventcore-postgres/tests/i010_contract_suite_test.rs`:\n```rust\nevent_store_contract_tests! { suite = postgres_contract, ... }\nevent_reader_contract_tests! { suite = postgres_reader_contract, ... }\n```\n\n## Desired State\n\nONE macro that runs ALL contract tests:\n```rust\nbackend_contract_tests! {\n suite = postgres,\n make_store = || { ... },\n}\n```\n\nWhen new contract tests are added (e.g., CheckpointStore), they automatically run for all backends without touching backend test files.\n\n## Acceptance Criteria\n\n1. Single `backend_contract_tests!` macro exported from eventcore-testing\n2. Macro runs ALL contract tests (EventStore, EventReader, future CheckpointStore)\n3. Both InMemory and Postgres test files use only this one macro\n4. Adding a new test function to eventcore-testing automatically runs for both backends\n5. Deprecated macros removed or made private","status":"closed","priority":0,"issue_type":"task","created_at":"2025-12-28T12:55:49.699057854-08:00","created_by":"jwilger","updated_at":"2025-12-28T13:02:49.842784617-08:00","closed_at":"2025-12-28T13:02:49.842784617-08:00","close_reason":"PR #233 created - unified backend_contract_tests! macro"} {"id":"eventcore-765","title":"Fix README subscription example error handling","description":"**Location:** `README.md:171-178`\n\n**Issue:** The subscription example uses `.expect()` for error handling:\n\n```rust\nlet balance = subscription\n .map(|result| result.expect(\"event should deserialize\"))\n .fold(...)\n```\n\nThis contradicts ADR-018/ADR-019's error handling guidance about Fatal/Skip/Retry strategies.\n\n**Fix:** Either:\n1. Show proper error handling pattern, or\n2. Add a comment acknowledging this is simplified for demonstration\n\nExample with proper handling:\n```rust\nlet balance = subscription\n .filter_map(|result| async {\n match result {\n Ok(event) =\u003e Some(event),\n Err(e) =\u003e {\n tracing::warn\\!(error = %e, \"skipping event\");\n None\n }\n }\n })\n .fold(...)\n```","status":"tombstone","priority":2,"issue_type":"bug","created_at":"2025-12-17T13:59:51.907740777-08:00","updated_at":"2025-12-28T10:49:58.260318957-08:00","dependencies":[{"issue_id":"eventcore-765","depends_on_id":"eventcore-017","type":"parent-child","created_at":"2025-12-17T13:59:51.908567528-08:00","created_by":"daemon","metadata":"{}"}],"deleted_at":"2025-12-28T10:49:58.260318957-08:00","deleted_by":"batch delete","delete_reason":"batch delete","original_type":"bug"} {"id":"eventcore-7ci","title":"Add test for Batch mode single-pass exit to eliminate mutation timeout","status":"tombstone","priority":1,"issue_type":"task","created_at":"2025-12-22T10:06:32.354835838-08:00","updated_at":"2025-12-28T10:49:58.260318957-08:00","deleted_at":"2025-12-28T10:49:58.260318957-08:00","deleted_by":"batch delete","delete_reason":"batch delete","original_type":"task"} {"id":"eventcore-7kr","title":"CRITICAL: Guarantee zero event loss under broadcast channel lag","description":"**Location:** `src/store.rs:963-969` and `eventcore-postgres/src/lib.rs:545-551`\n\n**Issue:** When a subscriber falls behind and the tokio broadcast channel overflows (buffer size 1024), events are DROPPED. The current code logs a warning and continues, leaving the subscriber with a gap in their event stream.\n\n**Current behavior:**\n```rust\nSome(Err(RecvError::Lagged(skipped))) =\u003e {\n tracing::warn!(skipped_events = skipped, \"broadcast receiver lagged behind, events may be lost\");\n continue; // Events are GONE\n}\n```\n\n**This is unacceptable for projections.** A projection that misses events will have incorrect state with no way to detect or recover.\n\n**Required behavior:** Zero event loss, even under lag. Options:\n1. **Database catchup on lag:** When lag detected, note the last delivered sequence, then re-query the database to fetch missed events before resuming from broadcast\n2. **Unbounded channel:** Memory concerns, but guarantees delivery\n3. **Backpressure:** Slow down publishers (inappropriate for event store)\n4. **Hybrid:** Small broadcast buffer for fast path, automatic database catchup when lag detected\n\n**Architectural consideration:** This may require fundamental changes to how subscriptions work. The current architecture assumes broadcast channel is sufficient, but it is not for guaranteed delivery.\n\n**Acceptance criteria:**\n- A slow subscriber MUST receive all events in order\n- No events may be silently dropped\n- Lag recovery must be automatic (subscriber does not need special handling)","status":"tombstone","priority":0,"issue_type":"bug","created_at":"2025-12-17T13:59:44.368096232-08:00","updated_at":"2025-12-28T10:49:58.260318957-08:00","dependencies":[{"issue_id":"eventcore-7kr","depends_on_id":"eventcore-017","type":"parent-child","created_at":"2025-12-17T13:59:44.378225273-08:00","created_by":"daemon","metadata":"{}"}],"deleted_at":"2025-12-28T10:49:58.260318957-08:00","deleted_by":"batch delete","delete_reason":"batch delete","original_type":"bug"} diff --git a/eventcore-memory/src/lib.rs b/eventcore-memory/src/lib.rs index aaadc75..061678a 100644 --- a/eventcore-memory/src/lib.rs +++ b/eventcore-memory/src/lib.rs @@ -4,11 +4,12 @@ //! storage backend for EventCore integration tests and development. use std::collections::HashMap; +use std::sync::{Arc, RwLock}; use eventcore_types::{ - Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError, EventStreamReader, - EventStreamSlice, Operation, StreamId, StreamPosition, StreamVersion, StreamWriteEntry, - StreamWrites, + CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError, + EventStreamReader, EventStreamSlice, Operation, StreamId, StreamPosition, StreamVersion, + StreamWriteEntry, StreamWrites, }; use uuid::Uuid; @@ -210,6 +211,77 @@ impl EventReader for InMemoryEventStore { } } +/// In-memory checkpoint store for tracking projection progress. +/// +/// `InMemoryCheckpointStore` stores checkpoint positions in memory using a +/// thread-safe `Arc>`. It is primarily useful for testing +/// and single-process deployments where persistence across restarts is not required. +/// +/// For production deployments requiring durability, use a persistent +/// checkpoint store implementation. +/// +/// # Example +/// +/// ```ignore +/// use eventcore_memory::InMemoryCheckpointStore; +/// +/// let checkpoint_store = InMemoryCheckpointStore::new(); +/// // Use with ProjectionRunner +/// ``` +#[derive(Debug, Clone, Default)] +pub struct InMemoryCheckpointStore { + checkpoints: Arc>>, +} + +impl InMemoryCheckpointStore { + /// Create a new in-memory checkpoint store. + pub fn new() -> Self { + Self::default() + } +} + +/// Error type for in-memory checkpoint store operations. +/// +/// Since the in-memory store uses an `RwLock`, the only possible error +/// is a poisoned lock from a panic in another thread. +#[derive(Debug, Clone)] +pub struct InMemoryCheckpointError { + message: String, +} + +impl std::fmt::Display for InMemoryCheckpointError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.message) + } +} + +impl std::error::Error for InMemoryCheckpointError {} + +impl CheckpointStore for InMemoryCheckpointStore { + type Error = InMemoryCheckpointError; + + async fn load(&self, name: &str) -> Result, Self::Error> { + let guard = self + .checkpoints + .read() + .map_err(|e| InMemoryCheckpointError { + message: format!("failed to acquire read lock: {}", e), + })?; + Ok(guard.get(name).copied()) + } + + async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> { + let mut guard = self + .checkpoints + .write() + .map_err(|e| InMemoryCheckpointError { + message: format!("failed to acquire write lock: {}", e), + })?; + let _ = guard.insert(name.to_string(), position); + Ok(()) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/eventcore-postgres/migrations/0006_add_subscription_versions.sql b/eventcore-postgres/migrations/0006_add_subscription_versions.sql new file mode 100644 index 0000000..9cb9c5c --- /dev/null +++ b/eventcore-postgres/migrations/0006_add_subscription_versions.sql @@ -0,0 +1,11 @@ +-- Checkpoint positions for event subscriptions/projectors per ADR-026 +CREATE TABLE IF NOT EXISTS eventcore_subscription_versions ( + subscription_name TEXT PRIMARY KEY, + last_position UUID NOT NULL, + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS eventcore_subscription_versions_updated_at_idx + ON eventcore_subscription_versions (updated_at); + +COMMENT ON TABLE eventcore_subscription_versions IS 'Tracks checkpoint positions for event subscriptions/projectors per ADR-026'; diff --git a/eventcore-postgres/src/lib.rs b/eventcore-postgres/src/lib.rs index d830040..aa397c4 100644 --- a/eventcore-postgres/src/lib.rs +++ b/eventcore-postgres/src/lib.rs @@ -1,8 +1,9 @@ use std::time::Duration; use eventcore_types::{ - Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError, EventStreamReader, - EventStreamSlice, Operation, StreamId, StreamPosition, StreamWriteEntry, StreamWrites, + CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError, + EventStreamReader, EventStreamSlice, Operation, StreamId, StreamPosition, StreamWriteEntry, + StreamWrites, }; use nutype::nutype; use serde_json::{Value, json}; @@ -333,3 +334,111 @@ fn map_sqlx_error(error: sqlx::Error, operation: Operation) -> EventStoreError { ); EventStoreError::StoreFailure { operation } } + +/// Error type for PostgresCheckpointStore operations. +#[derive(Debug, Error)] +pub enum PostgresCheckpointError { + /// Failed to create connection pool. + #[error("failed to create postgres connection pool")] + ConnectionFailed(#[source] sqlx::Error), + + /// Database operation failed. + #[error("database operation failed: {0}")] + DatabaseError(#[source] sqlx::Error), +} + +/// Postgres-backed checkpoint store for tracking projection progress. +/// +/// `PostgresCheckpointStore` stores checkpoint positions in a PostgreSQL table, +/// providing durability across process restarts. It implements the `CheckpointStore` +/// trait from eventcore-types. +/// +/// # Schema +/// +/// The store uses the `eventcore_subscription_versions` table with: +/// - `subscription_name`: Unique identifier for each projector/subscription +/// - `last_position`: UUID7 representing the global stream position +/// - `updated_at`: Timestamp of the last checkpoint update +#[derive(Debug, Clone)] +pub struct PostgresCheckpointStore { + pool: Pool, +} + +impl PostgresCheckpointStore { + /// Create a new PostgresCheckpointStore with default configuration. + pub async fn new>( + connection_string: S, + ) -> Result { + Self::with_config(connection_string, PostgresConfig::default()).await + } + + /// Create a new PostgresCheckpointStore with custom configuration. + pub async fn with_config>( + connection_string: S, + config: PostgresConfig, + ) -> Result { + let connection_string = connection_string.into(); + let max_connections: std::num::NonZeroU32 = config.max_connections.into(); + let pool = PgPoolOptions::new() + .max_connections(max_connections.get()) + .acquire_timeout(config.acquire_timeout) + .idle_timeout(config.idle_timeout) + .connect(&connection_string) + .await + .map_err(PostgresCheckpointError::ConnectionFailed)?; + + // Run migrations to ensure table exists + sqlx::migrate!("./migrations") + .run(&pool) + .await + .map_err(|e| { + PostgresCheckpointError::DatabaseError(sqlx::Error::Migrate(Box::new(e))) + })?; + + Ok(Self { pool }) + } + + /// Create a PostgresCheckpointStore from an existing connection pool. + /// + /// Use this when you need full control over pool configuration or want to + /// share a pool across multiple components. + pub fn from_pool(pool: Pool) -> Self { + Self { pool } + } +} + +impl CheckpointStore for PostgresCheckpointStore { + type Error = PostgresCheckpointError; + + async fn load(&self, name: &str) -> Result, Self::Error> { + let row = query("SELECT last_position FROM eventcore_subscription_versions WHERE subscription_name = $1") + .bind(name) + .fetch_optional(&self.pool) + .await + .map_err(PostgresCheckpointError::DatabaseError)?; + + match row { + Some(row) => { + let position: Uuid = row.get("last_position"); + Ok(Some(StreamPosition::new(position))) + } + None => Ok(None), + } + } + + async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> { + let position_uuid: Uuid = position.into_inner(); + query( + "INSERT INTO eventcore_subscription_versions (subscription_name, last_position, updated_at) + VALUES ($1, $2, NOW()) + ON CONFLICT (subscription_name) DO UPDATE SET last_position = $2, updated_at = NOW()", + ) + .bind(name) + .bind(position_uuid) + .execute(&self.pool) + .await + .map_err(PostgresCheckpointError::DatabaseError)?; + + Ok(()) + } +} diff --git a/eventcore-postgres/tests/i010_contract_suite_test.rs b/eventcore-postgres/tests/i010_contract_suite_test.rs index f0df8db..cc03188 100644 --- a/eventcore-postgres/tests/i010_contract_suite_test.rs +++ b/eventcore-postgres/tests/i010_contract_suite_test.rs @@ -6,7 +6,7 @@ mod common; mod postgres_contract_suite { - use eventcore_postgres::PostgresEventStore; + use eventcore_postgres::{PostgresCheckpointStore, PostgresEventStore}; use eventcore_testing::contract::backend_contract_tests; use crate::common::IsolatedPostgresFixture; @@ -25,10 +25,25 @@ mod postgres_contract_suite { }) } + fn make_checkpoint_store() -> PostgresCheckpointStore { + // Use block_in_place to allow blocking within multi-threaded tokio runtime + tokio::task::block_in_place(|| { + tokio::runtime::Handle::current().block_on(async { + let fixture = IsolatedPostgresFixture::new().await; + PostgresCheckpointStore::new(fixture.connection_string) + .await + .expect("should connect to isolated test database") + }) + }) + } + backend_contract_tests! { suite = postgres, make_store = || { crate::postgres_contract_suite::make_store() }, + make_checkpoint_store = || { + crate::postgres_contract_suite::make_checkpoint_store() + }, } } diff --git a/eventcore-testing/src/contract.rs b/eventcore-testing/src/contract.rs index 4edb791..e48ae34 100644 --- a/eventcore-testing/src/contract.rs +++ b/eventcore-testing/src/contract.rs @@ -1,6 +1,6 @@ use eventcore_types::{ - BatchSize, Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError, StreamId, - StreamPrefix, StreamVersion, StreamWrites, + BatchSize, CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore, + EventStoreError, StreamId, StreamPosition, StreamPrefix, StreamVersion, StreamWrites, }; use std::fmt; @@ -418,19 +418,24 @@ where /// backend_contract_tests! { /// suite = my_backend, /// make_store = || MyEventStore::new(), +/// make_checkpoint_store = || MyCheckpointStore::new(), /// } /// ``` /// /// # Requirements /// /// The store type must implement both `EventStore` and `EventReader` traits. +/// The checkpoint store type must implement `CheckpointStore` trait. #[macro_export] macro_rules! backend_contract_tests { - (suite = $suite:ident, make_store = $make_store:expr $(,)?) => { + (suite = $suite:ident, make_store = $make_store:expr, make_checkpoint_store = $make_checkpoint_store:expr $(,)?) => { #[allow(non_snake_case)] mod $suite { use $crate::contract::{ - test_basic_read_write, test_batch_limiting, test_concurrent_version_conflicts, + test_basic_read_write, test_batch_limiting, + test_checkpoint_independent_subscriptions, + test_checkpoint_load_missing_returns_none, test_checkpoint_save_and_load, + test_checkpoint_update_overwrites, test_concurrent_version_conflicts, test_conflict_preserves_atomicity, test_event_ordering_across_streams, test_missing_stream_reads, test_position_based_resumption, test_stream_isolation, test_stream_prefix_filtering, test_stream_prefix_requires_prefix_match, @@ -505,6 +510,35 @@ macro_rules! backend_contract_tests { .await .expect("event reader contract failed"); } + + // CheckpointStore contract tests + #[tokio::test(flavor = "multi_thread")] + async fn checkpoint_save_and_load_contract() { + test_checkpoint_save_and_load($make_checkpoint_store) + .await + .expect("checkpoint store contract failed"); + } + + #[tokio::test(flavor = "multi_thread")] + async fn checkpoint_update_overwrites_contract() { + test_checkpoint_update_overwrites($make_checkpoint_store) + .await + .expect("checkpoint store contract failed"); + } + + #[tokio::test(flavor = "multi_thread")] + async fn checkpoint_load_missing_returns_none_contract() { + test_checkpoint_load_missing_returns_none($make_checkpoint_store) + .await + .expect("checkpoint store contract failed"); + } + + #[tokio::test(flavor = "multi_thread")] + async fn checkpoint_independent_subscriptions_contract() { + test_checkpoint_independent_subscriptions($make_checkpoint_store) + .await + .expect("checkpoint store contract failed"); + } } }; } @@ -952,3 +986,193 @@ where Ok(()) } + +// ============================================================================ +// CheckpointStore Contract Tests +// ============================================================================ + +/// Contract test: Save a checkpoint and load it back +pub async fn test_checkpoint_save_and_load(make_checkpoint_store: F) -> ContractTestResult +where + F: Fn() -> CS + Send + Sync + Clone + 'static, + CS: CheckpointStore + Send + Sync + 'static, +{ + const SCENARIO: &str = "checkpoint_save_and_load"; + + let store = make_checkpoint_store(); + + // Given: A subscription name and position + let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7()); + let position = StreamPosition::new(Uuid::now_v7()); + + // When: Saving the checkpoint + store + .save(&subscription_name, position) + .await + .map_err(|_| ContractTestFailure::assertion(SCENARIO, "save failed"))?; + + // Then: Loading returns the saved position + let loaded = store + .load(&subscription_name) + .await + .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load failed"))?; + + if loaded != Some(position) { + return Err(ContractTestFailure::assertion( + SCENARIO, + format!( + "expected loaded position {:?} but got {:?}", + Some(position), + loaded + ), + )); + } + + Ok(()) +} + +/// Contract test: Saving a checkpoint overwrites the previous value +pub async fn test_checkpoint_update_overwrites( + make_checkpoint_store: F, +) -> ContractTestResult +where + F: Fn() -> CS + Send + Sync + Clone + 'static, + CS: CheckpointStore + Send + Sync + 'static, +{ + const SCENARIO: &str = "checkpoint_update_overwrites"; + + let store = make_checkpoint_store(); + + // Given: A subscription with an initial checkpoint + let subscription_name = format!("contract::{}::{}", SCENARIO, Uuid::now_v7()); + let first_position = StreamPosition::new(Uuid::now_v7()); + + store + .save(&subscription_name, first_position) + .await + .map_err(|_| ContractTestFailure::assertion(SCENARIO, "first save failed"))?; + + // When: Saving a new position + let second_position = StreamPosition::new(Uuid::now_v7()); + store + .save(&subscription_name, second_position) + .await + .map_err(|_| ContractTestFailure::assertion(SCENARIO, "second save failed"))?; + + // Then: Loading returns the new position, not the old one + let loaded = store + .load(&subscription_name) + .await + .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load failed"))?; + + if loaded != Some(second_position) { + return Err(ContractTestFailure::assertion( + SCENARIO, + format!( + "expected updated position {:?} but got {:?}", + Some(second_position), + loaded + ), + )); + } + + Ok(()) +} + +/// Contract test: Loading a non-existent checkpoint returns None +pub async fn test_checkpoint_load_missing_returns_none( + make_checkpoint_store: F, +) -> ContractTestResult +where + F: Fn() -> CS + Send + Sync + Clone + 'static, + CS: CheckpointStore + Send + Sync + 'static, +{ + const SCENARIO: &str = "checkpoint_load_missing_returns_none"; + + let store = make_checkpoint_store(); + + // Given: A subscription name that has never been saved + let subscription_name = format!("contract::{}::ghost::{}", SCENARIO, Uuid::now_v7()); + + // When: Loading the checkpoint + let loaded = store + .load(&subscription_name) + .await + .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load failed"))?; + + // Then: None is returned + if loaded.is_some() { + return Err(ContractTestFailure::assertion( + SCENARIO, + format!("expected None for missing checkpoint but got {:?}", loaded), + )); + } + + Ok(()) +} + +/// Contract test: Different subscription names have independent checkpoints +pub async fn test_checkpoint_independent_subscriptions( + make_checkpoint_store: F, +) -> ContractTestResult +where + F: Fn() -> CS + Send + Sync + Clone + 'static, + CS: CheckpointStore + Send + Sync + 'static, +{ + const SCENARIO: &str = "checkpoint_independent_subscriptions"; + + let store = make_checkpoint_store(); + + // Given: Two subscription names + let subscription_a = format!("contract::{}::sub-a::{}", SCENARIO, Uuid::now_v7()); + let subscription_b = format!("contract::{}::sub-b::{}", SCENARIO, Uuid::now_v7()); + + let position_a = StreamPosition::new(Uuid::now_v7()); + let position_b = StreamPosition::new(Uuid::now_v7()); + + // When: Saving different positions for each + store + .save(&subscription_a, position_a) + .await + .map_err(|_| ContractTestFailure::assertion(SCENARIO, "save A failed"))?; + + store + .save(&subscription_b, position_b) + .await + .map_err(|_| ContractTestFailure::assertion(SCENARIO, "save B failed"))?; + + // Then: Each subscription loads its own position + let loaded_a = store + .load(&subscription_a) + .await + .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load A failed"))?; + + let loaded_b = store + .load(&subscription_b) + .await + .map_err(|_| ContractTestFailure::assertion(SCENARIO, "load B failed"))?; + + if loaded_a != Some(position_a) { + return Err(ContractTestFailure::assertion( + SCENARIO, + format!( + "subscription A: expected {:?} but got {:?}", + Some(position_a), + loaded_a + ), + )); + } + + if loaded_b != Some(position_b) { + return Err(ContractTestFailure::assertion( + SCENARIO, + format!( + "subscription B: expected {:?} but got {:?}", + Some(position_b), + loaded_b + ), + )); + } + + Ok(()) +} diff --git a/eventcore-testing/tests/event_store_contract_suite.rs b/eventcore-testing/tests/event_store_contract_suite.rs index 628d48d..db9a9f4 100644 --- a/eventcore-testing/tests/event_store_contract_suite.rs +++ b/eventcore-testing/tests/event_store_contract_suite.rs @@ -8,4 +8,5 @@ use eventcore_testing::contract::backend_contract_tests; backend_contract_tests! { suite = in_memory, make_store = eventcore_memory::InMemoryEventStore::new, + make_checkpoint_store = eventcore_memory::InMemoryCheckpointStore::new, } diff --git a/eventcore-types/src/lib.rs b/eventcore-types/src/lib.rs index 3fb2e39..6d76f77 100644 --- a/eventcore-types/src/lib.rs +++ b/eventcore-types/src/lib.rs @@ -61,8 +61,8 @@ pub use command::{ }; pub use errors::CommandError; pub use projection::{ - AttemptNumber, BackoffMultiplier, BatchSize, DelayMilliseconds, EventFilter, EventPage, - EventReader, FailureContext, FailureStrategy, MaxConsecutiveFailures, MaxRetries, + AttemptNumber, BackoffMultiplier, BatchSize, CheckpointStore, DelayMilliseconds, EventFilter, + EventPage, EventReader, FailureContext, FailureStrategy, MaxConsecutiveFailures, MaxRetries, MaxRetryAttempts, Projector, RetryCount, StreamPosition, }; pub use store::{ diff --git a/eventcore-types/src/projection.rs b/eventcore-types/src/projection.rs index 8e82b25..a393a32 100644 --- a/eventcore-types/src/projection.rs +++ b/eventcore-types/src/projection.rs @@ -621,6 +621,75 @@ impl EventReader for &T { } } +/// Trait for persisting and retrieving projection checkpoints. +/// +/// CheckpointStore provides durable storage for projection progress, enabling +/// projections to resume from their last known position after restarts or failures. +/// +/// # Type Parameters +/// +/// - `Error`: The error type returned by checkpoint operations +/// +/// # Required Methods +/// +/// - `load`: Retrieve the last saved position for a subscription +/// - `save`: Persist the current position for a subscription +/// +/// # Example +/// +/// ```ignore +/// impl CheckpointStore for MyCheckpointStore { +/// type Error = MyError; +/// +/// async fn load(&self, name: &str) -> Result, Self::Error> { +/// // Load from database, file, etc. +/// } +/// +/// async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> { +/// // Persist to database, file, etc. +/// } +/// } +/// ``` +pub trait CheckpointStore: Send + Sync { + /// Error type returned by checkpoint operations. + type Error: std::error::Error + Send + Sync + 'static; + + /// Load the last saved checkpoint position for a subscription. + /// + /// # Parameters + /// + /// - `name`: The unique name identifying the subscription/projector + /// + /// # Returns + /// + /// - `Ok(Some(position))`: The last saved position + /// - `Ok(None)`: No checkpoint exists for this subscription + /// - `Err(Self::Error)`: If the load operation fails + fn load( + &self, + name: &str, + ) -> impl Future, Self::Error>> + Send; + + /// Save a checkpoint position for a subscription. + /// + /// This overwrites any previously saved position for the same subscription. + /// + /// # Parameters + /// + /// - `name`: The unique name identifying the subscription/projector + /// - `position`: The stream position to save + /// + /// # Returns + /// + /// - `Ok(())`: The checkpoint was saved successfully + /// - `Err(Self::Error)`: If the save operation fails + fn save( + &self, + name: &str, + position: StreamPosition, + ) -> impl Future> + Send; +} + #[cfg(test)] mod tests { use super::*; diff --git a/eventcore/src/lib.rs b/eventcore/src/lib.rs index c3d2f12..f63e66a 100644 --- a/eventcore/src/lib.rs +++ b/eventcore/src/lib.rs @@ -41,18 +41,18 @@ mod projection; // Re-export all types from eventcore-types for backward compatibility pub use eventcore_types::{ - AttemptNumber, BackoffMultiplier, BatchSize, CommandError, CommandLogic, CommandStreams, - DelayMilliseconds, Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError, - EventStreamReader, EventStreamSlice, FailureContext, FailureStrategy, MaxConsecutiveFailures, - MaxRetries, MaxRetryAttempts, NewEvents, Operation, Projector, RetryCount, StreamDeclarations, - StreamDeclarationsError, StreamId, StreamPosition, StreamPrefix, StreamResolver, StreamVersion, - StreamWriteEntry, StreamWrites, + AttemptNumber, BackoffMultiplier, BatchSize, CheckpointStore, CommandError, CommandLogic, + CommandStreams, DelayMilliseconds, Event, EventFilter, EventPage, EventReader, EventStore, + EventStoreError, EventStreamReader, EventStreamSlice, FailureContext, FailureStrategy, + MaxConsecutiveFailures, MaxRetries, MaxRetryAttempts, NewEvents, Operation, Projector, + RetryCount, StreamDeclarations, StreamDeclarationsError, StreamId, StreamPosition, + StreamPrefix, StreamResolver, StreamVersion, StreamWriteEntry, StreamWrites, }; // Re-export projection runtime components pub use projection::{ - CoordinatorGuard, EventRetryConfig, InMemoryCheckpointStore, LocalCoordinator, PollConfig, - PollMode, ProjectionRunner, + CoordinatorGuard, EventRetryConfig, LocalCoordinator, NoCheckpointStore, PollConfig, PollMode, + ProjectionError, ProjectionRunner, }; // Re-export Command derive macro when the "macros" feature is enabled (default) diff --git a/eventcore/src/projection.rs b/eventcore/src/projection.rs index 6f04e5b..10fcae6 100644 --- a/eventcore/src/projection.rs +++ b/eventcore/src/projection.rs @@ -5,8 +5,8 @@ //! - `ProjectionRunner`: Orchestrates projector execution with event polling use crate::{ - BackoffMultiplier, BatchSize, Event, EventFilter, EventPage, EventReader, FailureStrategy, - MaxConsecutiveFailures, MaxRetryAttempts, Projector, StreamPosition, + BackoffMultiplier, BatchSize, CheckpointStore, Event, EventFilter, EventPage, EventReader, + FailureStrategy, MaxConsecutiveFailures, MaxRetryAttempts, Projector, StreamPosition, }; use std::time::Duration; @@ -106,60 +106,6 @@ pub enum PollMode { Continuous, } -/// In-memory checkpoint store for tracking projection progress. -/// -/// `InMemoryCheckpointStore` stores checkpoint positions in memory. It is -/// primarily useful for testing and single-process deployments where -/// persistence across restarts is not required. -/// -/// For production deployments requiring durability, use a persistent -/// checkpoint store implementation. -/// -/// # Example -/// -/// ```ignore -/// let checkpoint_store = InMemoryCheckpointStore::new(); -/// let runner = ProjectionRunner::new(projector, coordinator, &store) -/// .with_checkpoint_store(checkpoint_store); -/// ``` -#[derive(Debug, Clone, Default)] -pub struct InMemoryCheckpointStore { - checkpoints: - std::sync::Arc>>, -} - -impl InMemoryCheckpointStore { - /// Create a new in-memory checkpoint store. - pub fn new() -> Self { - Self::default() - } - - /// Load checkpoint for the given projector name. - pub fn load(&self, projector_name: &str) -> Option { - self.checkpoints - .lock() - .ok() - .and_then(|guard| guard.get(projector_name).copied()) - } - - /// Save checkpoint for the given projector name. - pub fn save(&self, projector_name: &str, position: StreamPosition) { - match self.checkpoints.lock() { - Ok(mut guard) => { - let _ = guard.insert(projector_name.to_string(), position); - } - Err(e) => { - tracing::warn!( - projector = projector_name, - position = %position, - error = %e, - "Failed to save checkpoint due to poisoned mutex" - ); - } - } - } -} - /// Guard representing acquired leadership from a coordinator. /// /// `CoordinatorGuard` uses RAII pattern to automatically release leadership @@ -270,9 +216,10 @@ impl Default for LocalCoordinator { /// /// # Type Parameters /// +/// - `E`: The event type implementing [`Event`] +/// - `R`: The event reader type implementing [`EventReader`] /// - `P`: The projector type implementing [`Projector`] -/// - `C`: The coordinator type (e.g., `LocalCoordinator`) -/// - `S`: The event store type implementing [`EventReader`] +/// - `C`: The checkpoint store type implementing [`CheckpointStore`] /// /// # Example /// @@ -287,28 +234,61 @@ impl Default for LocalCoordinator { /// let runner = ProjectionRunner::new(projector, coordinator, &store); /// runner.run().await?; /// ``` -pub struct ProjectionRunner +pub struct ProjectionRunner where - P: Projector, - S: EventReader, + E: Event, + R: EventReader, + P: Projector, + C: CheckpointStore, { projector: P, - _coordinator: C, - store: S, - checkpoint_store: Option, + _coordinator: LocalCoordinator, + store: R, + checkpoint_store: Option, poll_mode: PollMode, poll_config: PollConfig, event_retry_config: EventRetryConfig, + _event: std::marker::PhantomData, } -impl ProjectionRunner +/// A no-op checkpoint store that never saves or loads checkpoints. +/// +/// Used as the default checkpoint store type when no checkpoint store is configured. +#[derive(Debug, Clone, Copy, Default)] +pub struct NoCheckpointStore; + +/// Error type for NoCheckpointStore (never actually returned). +#[derive(Debug, Clone, Copy)] +pub struct NoCheckpointError; + +impl std::fmt::Display for NoCheckpointError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "no checkpoint store configured") + } +} + +impl std::error::Error for NoCheckpointError {} + +impl CheckpointStore for NoCheckpointStore { + type Error = NoCheckpointError; + + async fn load(&self, _name: &str) -> Result, Self::Error> { + Ok(None) + } + + async fn save(&self, _name: &str, _position: StreamPosition) -> Result<(), Self::Error> { + Ok(()) + } +} + +impl ProjectionRunner where P: Projector, P::Event: Event + Clone, P::Context: Default, - S: EventReader, + R: EventReader, { - /// Create a new projection runner. + /// Create a new projection runner without checkpoint support. /// /// # Parameters /// @@ -319,7 +299,7 @@ where /// # Returns /// /// A new `ProjectionRunner` ready to be started with `run()`. - pub fn new(projector: P, coordinator: C, store: S) -> Self { + pub fn new(projector: P, coordinator: LocalCoordinator, store: R) -> Self { Self { projector, _coordinator: coordinator, @@ -328,6 +308,7 @@ where poll_mode: PollMode::Batch, poll_config: PollConfig::default(), event_retry_config: EventRetryConfig::default(), + _event: std::marker::PhantomData, } } @@ -344,12 +325,32 @@ where /// /// # Returns /// - /// Self for method chaining. - pub fn with_checkpoint_store(mut self, checkpoint_store: InMemoryCheckpointStore) -> Self { - self.checkpoint_store = Some(checkpoint_store); - self + /// A new runner with the checkpoint store configured. + pub fn with_checkpoint_store( + self, + checkpoint_store: C, + ) -> ProjectionRunner { + ProjectionRunner { + projector: self.projector, + _coordinator: self._coordinator, + store: self.store, + checkpoint_store: Some(checkpoint_store), + poll_mode: self.poll_mode, + poll_config: self.poll_config, + event_retry_config: self.event_retry_config, + _event: std::marker::PhantomData, + } } +} +impl ProjectionRunner +where + E: Event + Clone, + R: EventReader, + P: Projector, + P::Context: Default, + C: CheckpointStore, +{ /// Configure the polling mode for event processing. /// /// Controls whether the runner processes events once (batch mode) or @@ -456,10 +457,10 @@ where P::Error: std::fmt::Debug, { // Load checkpoint if checkpoint store is configured - let mut last_checkpoint = self - .checkpoint_store - .as_ref() - .and_then(|cs| cs.load(self.projector.name())); + let mut last_checkpoint = match &self.checkpoint_store { + Some(cs) => cs.load(self.projector.name()).await.ok().flatten(), + None => None, + }; let mut ctx = P::Context::default(); let mut consecutive_failures = 0u32; @@ -515,7 +516,8 @@ where // Event processed successfully - update and save checkpoint last_checkpoint = Some(position); if let Some(cs) = &self.checkpoint_store { - cs.save(self.projector.name(), position); + // Ignore checkpoint save errors - checkpoint is best-effort + let _ = cs.save(self.projector.name(), position).await; } break; // Move to next event } @@ -553,7 +555,8 @@ where // be retried (e.g., malformed data, unrecoverable errors). last_checkpoint = Some(position); if let Some(cs) = &self.checkpoint_store { - cs.save(self.projector.name(), position); + // Ignore checkpoint save errors - checkpoint is best-effort + let _ = cs.save(self.projector.name(), position).await; } break; // Move to next event } diff --git a/eventcore/tests/projection_poll_retry_test.rs b/eventcore/tests/projection_poll_retry_test.rs index 4ab0f6c..c0ee127 100644 --- a/eventcore/tests/projection_poll_retry_test.rs +++ b/eventcore/tests/projection_poll_retry_test.rs @@ -8,11 +8,10 @@ //! - And caller can decide recovery strategy (restart, alert, etc) use eventcore::{ - Event, EventFilter, EventPage, EventReader, EventStore, InMemoryCheckpointStore, - LocalCoordinator, PollConfig, PollMode, ProjectionRunner, Projector, StreamId, StreamPosition, - StreamVersion, StreamWrites, + Event, EventFilter, EventPage, EventReader, EventStore, LocalCoordinator, PollConfig, PollMode, + ProjectionRunner, Projector, StreamId, StreamPosition, StreamVersion, StreamWrites, }; -use eventcore_memory::InMemoryEventStore; +use eventcore_memory::{InMemoryCheckpointStore, InMemoryEventStore}; use serde::{Deserialize, Serialize}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; diff --git a/eventcore/tests/projection_runner_test.rs b/eventcore/tests/projection_runner_test.rs index 2b25e1f..9cf581a 100644 --- a/eventcore/tests/projection_runner_test.rs +++ b/eventcore/tests/projection_runner_test.rs @@ -12,10 +12,10 @@ use eventcore::{ BatchSize, Event, EventFilter, EventPage, EventReader, EventStore, FailureContext, - FailureStrategy, InMemoryCheckpointStore, LocalCoordinator, PollConfig, PollMode, - ProjectionRunner, Projector, StreamId, StreamPosition, StreamVersion, StreamWrites, + FailureStrategy, LocalCoordinator, PollConfig, PollMode, ProjectionRunner, Projector, StreamId, + StreamPosition, StreamVersion, StreamWrites, }; -use eventcore_memory::InMemoryEventStore; +use eventcore_memory::{InMemoryCheckpointStore, InMemoryEventStore}; use serde::{Deserialize, Serialize}; use std::future::Future; use std::sync::Arc;