Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .beads/issues.jsonl

Large diffs are not rendered by default.

78 changes: 75 additions & 3 deletions eventcore-memory/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
//! storage backend for EventCore integration tests and development.

use std::collections::HashMap;
use std::sync::{Arc, RwLock};

use eventcore_types::{
Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError, EventStreamReader,
EventStreamSlice, Operation, StreamId, StreamPosition, StreamVersion, StreamWriteEntry,
StreamWrites,
CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError,
EventStreamReader, EventStreamSlice, Operation, StreamId, StreamPosition, StreamVersion,
StreamWriteEntry, StreamWrites,
};
use uuid::Uuid;

Expand Down Expand Up @@ -210,6 +211,77 @@ impl EventReader for InMemoryEventStore {
}
}

/// In-memory checkpoint store for tracking projection progress.
///
/// `InMemoryCheckpointStore` stores checkpoint positions in memory using a
/// thread-safe `Arc<RwLock<HashMap>>`. It is primarily useful for testing
/// and single-process deployments where persistence across restarts is not required.
///
/// For production deployments requiring durability, use a persistent
/// checkpoint store implementation.
///
/// # Example
///
/// ```ignore
/// use eventcore_memory::InMemoryCheckpointStore;
///
/// let checkpoint_store = InMemoryCheckpointStore::new();
/// // Use with ProjectionRunner
/// ```
#[derive(Debug, Clone, Default)]
pub struct InMemoryCheckpointStore {
checkpoints: Arc<RwLock<HashMap<String, StreamPosition>>>,
}

impl InMemoryCheckpointStore {
/// Create a new in-memory checkpoint store.
pub fn new() -> Self {
Self::default()
}
}

/// Error type for in-memory checkpoint store operations.
///
/// Since the in-memory store uses an `RwLock`, the only possible error
/// is a poisoned lock from a panic in another thread.
#[derive(Debug, Clone)]
pub struct InMemoryCheckpointError {
message: String,
}

impl std::fmt::Display for InMemoryCheckpointError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.message)
}
}

impl std::error::Error for InMemoryCheckpointError {}

impl CheckpointStore for InMemoryCheckpointStore {
type Error = InMemoryCheckpointError;

async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
let guard = self
.checkpoints
.read()
.map_err(|e| InMemoryCheckpointError {
message: format!("failed to acquire read lock: {}", e),
})?;
Ok(guard.get(name).copied())
}

async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
let mut guard = self
.checkpoints
.write()
.map_err(|e| InMemoryCheckpointError {
message: format!("failed to acquire write lock: {}", e),
})?;
let _ = guard.insert(name.to_string(), position);
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
11 changes: 11 additions & 0 deletions eventcore-postgres/migrations/0006_add_subscription_versions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- Checkpoint positions for event subscriptions/projectors per ADR-026
CREATE TABLE IF NOT EXISTS eventcore_subscription_versions (
subscription_name TEXT PRIMARY KEY,
last_position UUID NOT NULL,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX IF NOT EXISTS eventcore_subscription_versions_updated_at_idx
ON eventcore_subscription_versions (updated_at);

COMMENT ON TABLE eventcore_subscription_versions IS 'Tracks checkpoint positions for event subscriptions/projectors per ADR-026';
113 changes: 111 additions & 2 deletions eventcore-postgres/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::time::Duration;

use eventcore_types::{
Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError, EventStreamReader,
EventStreamSlice, Operation, StreamId, StreamPosition, StreamWriteEntry, StreamWrites,
CheckpointStore, Event, EventFilter, EventPage, EventReader, EventStore, EventStoreError,
EventStreamReader, EventStreamSlice, Operation, StreamId, StreamPosition, StreamWriteEntry,
StreamWrites,
};
use nutype::nutype;
use serde_json::{Value, json};
Expand Down Expand Up @@ -333,3 +334,111 @@ fn map_sqlx_error(error: sqlx::Error, operation: Operation) -> EventStoreError {
);
EventStoreError::StoreFailure { operation }
}

/// Error type for PostgresCheckpointStore operations.
#[derive(Debug, Error)]
pub enum PostgresCheckpointError {
/// Failed to create connection pool.
#[error("failed to create postgres connection pool")]
ConnectionFailed(#[source] sqlx::Error),

/// Database operation failed.
#[error("database operation failed: {0}")]
DatabaseError(#[source] sqlx::Error),
}

/// Postgres-backed checkpoint store for tracking projection progress.
///
/// `PostgresCheckpointStore` stores checkpoint positions in a PostgreSQL table,
/// providing durability across process restarts. It implements the `CheckpointStore`
/// trait from eventcore-types.
///
/// # Schema
///
/// The store uses the `eventcore_subscription_versions` table with:
/// - `subscription_name`: Unique identifier for each projector/subscription
/// - `last_position`: UUID7 representing the global stream position
/// - `updated_at`: Timestamp of the last checkpoint update
#[derive(Debug, Clone)]
pub struct PostgresCheckpointStore {
pool: Pool<Postgres>,
}

impl PostgresCheckpointStore {
/// Create a new PostgresCheckpointStore with default configuration.
pub async fn new<S: Into<String>>(
connection_string: S,
) -> Result<Self, PostgresCheckpointError> {
Self::with_config(connection_string, PostgresConfig::default()).await
}

/// Create a new PostgresCheckpointStore with custom configuration.
pub async fn with_config<S: Into<String>>(
connection_string: S,
config: PostgresConfig,
) -> Result<Self, PostgresCheckpointError> {
let connection_string = connection_string.into();
let max_connections: std::num::NonZeroU32 = config.max_connections.into();
let pool = PgPoolOptions::new()
.max_connections(max_connections.get())
.acquire_timeout(config.acquire_timeout)
.idle_timeout(config.idle_timeout)
.connect(&connection_string)
.await
.map_err(PostgresCheckpointError::ConnectionFailed)?;

// Run migrations to ensure table exists
sqlx::migrate!("./migrations")
.run(&pool)
.await
.map_err(|e| {
PostgresCheckpointError::DatabaseError(sqlx::Error::Migrate(Box::new(e)))
})?;

Ok(Self { pool })
}

/// Create a PostgresCheckpointStore from an existing connection pool.
///
/// Use this when you need full control over pool configuration or want to
/// share a pool across multiple components.
pub fn from_pool(pool: Pool<Postgres>) -> Self {
Self { pool }
}
}

impl CheckpointStore for PostgresCheckpointStore {
type Error = PostgresCheckpointError;

async fn load(&self, name: &str) -> Result<Option<StreamPosition>, Self::Error> {
let row = query("SELECT last_position FROM eventcore_subscription_versions WHERE subscription_name = $1")
.bind(name)
.fetch_optional(&self.pool)
.await
.map_err(PostgresCheckpointError::DatabaseError)?;

match row {
Some(row) => {
let position: Uuid = row.get("last_position");
Ok(Some(StreamPosition::new(position)))
}
None => Ok(None),
}
}

async fn save(&self, name: &str, position: StreamPosition) -> Result<(), Self::Error> {
let position_uuid: Uuid = position.into_inner();
query(
"INSERT INTO eventcore_subscription_versions (subscription_name, last_position, updated_at)
VALUES ($1, $2, NOW())
ON CONFLICT (subscription_name) DO UPDATE SET last_position = $2, updated_at = NOW()",
)
.bind(name)
.bind(position_uuid)
.execute(&self.pool)
.await
.map_err(PostgresCheckpointError::DatabaseError)?;

Ok(())
}
}
17 changes: 16 additions & 1 deletion eventcore-postgres/tests/i010_contract_suite_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
mod common;

mod postgres_contract_suite {
use eventcore_postgres::PostgresEventStore;
use eventcore_postgres::{PostgresCheckpointStore, PostgresEventStore};
use eventcore_testing::contract::backend_contract_tests;

use crate::common::IsolatedPostgresFixture;
Expand All @@ -25,10 +25,25 @@ mod postgres_contract_suite {
})
}

fn make_checkpoint_store() -> PostgresCheckpointStore {
// Use block_in_place to allow blocking within multi-threaded tokio runtime
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
let fixture = IsolatedPostgresFixture::new().await;
PostgresCheckpointStore::new(fixture.connection_string)
.await
.expect("should connect to isolated test database")
})
})
}

backend_contract_tests! {
suite = postgres,
make_store = || {
crate::postgres_contract_suite::make_store()
},
make_checkpoint_store = || {
crate::postgres_contract_suite::make_checkpoint_store()
},
}
}
Loading
Loading