Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
55f9a6e
feat: make redb storage optional with feature-flagged dual-mode archi…
seriouscoderone Feb 18, 2026
661e4dc
feat: add postgres storage backend with SQLx support
zdravko61 Mar 26, 2026
ab20aac
refactor: postgres tests to use helper functions and reduce boilerplate
zdravko61 Mar 26, 2026
121b195
fix: cleanup todos
zdravko61 Mar 26, 2026
f8254e4
fix: double ref
zdravko61 Mar 26, 2026
34cd32f
refactor: remove unused ksn log methods and PostgresWriteTxnMode usage
zdravko61 Mar 26, 2026
7aa536a
feat: postgres config for customizable connection pool size
zdravko61 Mar 26, 2026
d7fb1ce
feat: add Postgres OOBI storage backend and generalize OobiManager
zdravko61 Mar 27, 2026
bb5b6af
fix: unrawps on deserialize
zdravko61 Mar 31, 2026
7f26ba7
feat: generalize OobiManager storage and fix Postgres error handling
zdravko61 Mar 31, 2026
4176be7
fix: align PostgresOobiStorage save_oobi with redb backend
zdravko61 Mar 31, 2026
00e1a45
feat(teliox): abstract storage layer and add PostgreSQL backend
zdravko61 Mar 27, 2026
fff59ce
fix: propagate errors in Postgres TEL storage and escrow processors
zdravko61 Mar 31, 2026
ea91446
refactor(controller): make KnownEvents, Communication, Controller and
zdravko61 Mar 30, 2026
dd2aa44
feat(controller): add PostgreSQL storage backend and integration tests
zdravko61 Mar 30, 2026
82d4aec
fix: add RedbIdentifier and PostgresIdentifier type aliases
zdravko61 Mar 30, 2026
b351833
refactor: add raw_db accessor to RedbDatabase and update usage
zdravko61 Mar 31, 2026
0211da1
feat: add postgres and redb controller implementations
zdravko61 Mar 31, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions components/controller/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ repository.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
default = ["storage-redb"]
storage-redb = ["keri-core/storage-redb", "teliox/storage-redb"]
storage-postgres = ["keri-core/storage-postgres", "teliox/storage-postgres"]
query_cache = ["rusqlite"]

[dependencies]
Expand All @@ -30,6 +33,23 @@ async-trait = "0.1.57"
[dev-dependencies]
witness = { path = "../witness" }
tempfile = { version = "3.1" }
sqlx = { version = "0.8", features = ["postgres", "runtime-tokio"] }

[[test]]
name = "test_kel_managing_postgres"
required-features = ["storage-postgres"]

[[test]]
name = "test_tel_managing_postgres"
required-features = ["storage-postgres"]

[[test]]
name = "test_group_incept_postgres"
required-features = ["storage-postgres"]

[[test]]
name = "test_delegated_incept_postgres"
required-features = ["storage-postgres", "query_cache"]

[package.metadata.release]
pre-release-hook = ["ls"]
Expand Down
22 changes: 17 additions & 5 deletions components/controller/src/communication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@ use std::sync::Arc;
use futures::future::join_all;
use keri_core::{
actor::{error::ActorError, parse_event_stream, possible_response::PossibleResponse},
database::{EscrowCreator, EventDatabase},
event_message::signed_event_message::{Message, Notice, Op, SignedEventMessage},
oobi::{EndRole, LocationScheme, Oobi, Scheme},
oobi_manager::storage::OobiStorageBackend,
prefix::{BasicPrefix, IdentifierPrefix},
query::{
mailbox::SignedMailboxQuery,
query_event::{SignedKelQuery, SignedQueryMessage},
},
transport::{Transport, TransportError},
};
use teliox::{event::verifiable_event::VerifiableEvent, query::SignedTelQuery};
use teliox::{database::TelEventDatabase, event::verifiable_event::VerifiableEvent, query::SignedTelQuery};

use crate::{
error::ControllerError,
Expand Down Expand Up @@ -53,15 +55,25 @@ impl From<TransportError> for SendingError {
}
}

pub struct Communication {
pub events: Arc<KnownEvents>,
pub struct Communication<D, T, S>
where
D: EventDatabase + EscrowCreator + 'static,
T: TelEventDatabase + 'static,
S: OobiStorageBackend,
{
pub events: Arc<KnownEvents<D, T, S>>,
pub transport: Box<dyn Transport + Send + Sync>,
pub tel_transport: Box<dyn IdentifierTelTransport + Send + Sync>,
}

impl Communication {
impl<D, T, S> Communication<D, T, S>
where
D: EventDatabase + EscrowCreator + Send + Sync + 'static,
T: TelEventDatabase + Send + Sync + 'static,
S: OobiStorageBackend,
{
pub fn new(
known_events: Arc<KnownEvents>,
known_events: Arc<KnownEvents<D, T, S>>,
transport: Box<dyn Transport<ActorError> + Send + Sync>,
tel_transport: Box<dyn IdentifierTelTransport + Send + Sync>,
) -> Self {
Expand Down
67 changes: 28 additions & 39 deletions components/controller/src/controller/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use std::sync::Arc;

use keri_core::{
database::{EscrowCreator, EventDatabase},
event_message::signature::Signature,
oobi::LocationScheme,
oobi_manager::storage::OobiStorageBackend,
prefix::{BasicPrefix, IdentifierPrefix, SelfSigningPrefix},
processor::validator::VerificationError,
state::IdentifierState,
};
use teliox::database::TelEventDatabase;

#[cfg(feature = "query_cache")]
use crate::identifier::mechanics::cache::IdentifierCache;
Expand All @@ -19,48 +22,24 @@ use crate::{
};
pub mod verifying;

pub struct Controller {
pub known_events: Arc<KnownEvents>,
pub communication: Arc<Communication>,
pub struct Controller<D, T, S>
where
D: EventDatabase + EscrowCreator + 'static,
T: TelEventDatabase + 'static,
S: OobiStorageBackend,
{
pub known_events: Arc<KnownEvents<D, T, S>>,
pub communication: Arc<Communication<D, T, S>>,
#[cfg(feature = "query_cache")]
pub cache: Arc<IdentifierCache>,
}

impl Controller {
pub fn new(config: ControllerConfig) -> Result<Self, ControllerError> {
let ControllerConfig {
db_path,
initial_oobis,
escrow_config,
transport,
tel_transport,
} = config;
std::fs::create_dir_all(&db_path).unwrap();
let mut query_db_path = db_path.clone();
query_db_path.push("query_cache");

let events = Arc::new(KnownEvents::new(db_path, escrow_config)?);

#[cfg(feature = "query_cache")]
let query_cache = Arc::new(IdentifierCache::new(&query_db_path)?);
let comm = Arc::new(Communication {
events: events.clone(),
transport,
tel_transport,
});

let controller = Self {
known_events: events.clone(),
communication: comm,
#[cfg(feature = "query_cache")]
cache: query_cache,
};
if !initial_oobis.is_empty() {
async_std::task::block_on(controller.setup_witnesses(&initial_oobis)).unwrap();
}
Ok(controller)
}

impl<D, T, S> Controller<D, T, S>
where
D: EventDatabase + EscrowCreator + Send + Sync + 'static,
T: TelEventDatabase + Send + Sync + 'static,
S: OobiStorageBackend,
{
pub async fn incept(
&self,
public_keys: Vec<BasicPrefix>,
Expand All @@ -77,7 +56,7 @@ impl Controller {
&self,
event: &[u8],
sig: &SelfSigningPrefix,
) -> Result<Identifier, ControllerError> {
) -> Result<Identifier<D, T, S>, ControllerError> {
let initialized_id = self.known_events.finalize_inception(event, sig).unwrap();
Ok(Identifier::new(
initialized_id,
Expand Down Expand Up @@ -111,3 +90,13 @@ impl Controller {
self.known_events.get_state(id)
}
}

#[cfg(feature = "storage-redb")]
mod redb;
#[cfg(feature = "storage-redb")]
pub use redb::{RedbController, RedbIdentifier};

#[cfg(feature = "storage-postgres")]
mod postgres;
#[cfg(feature = "storage-postgres")]
pub use postgres::{PostgresController, PostgresIdentifier};
64 changes: 64 additions & 0 deletions components/controller/src/controller/postgres.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use std::sync::Arc;

use keri_core::database::postgres::PostgresDatabase;
use keri_core::database::postgres::oobi_storage::PostgresOobiStorage;
use teliox::database::postgres::PostgresTelDatabase;

#[cfg(feature = "query_cache")]
use crate::identifier::mechanics::cache::IdentifierCache;
use crate::{
communication::Communication,
config::ControllerConfig,
error::ControllerError,
identifier::Identifier,
known_events::PostgresKnownEvents,
};

use super::Controller;

pub type PostgresController = Controller<PostgresDatabase, PostgresTelDatabase, PostgresOobiStorage>;
pub type PostgresIdentifier = Identifier<PostgresDatabase, PostgresTelDatabase, PostgresOobiStorage>;

impl PostgresController {
pub async fn new_postgres(
database_url: &str,
config: ControllerConfig,
) -> Result<Self, ControllerError> {
let ControllerConfig {
db_path: _db_path,
initial_oobis,
escrow_config,
transport,
tel_transport,
} = config;

#[cfg(feature = "query_cache")]
let mut query_db_path = _db_path.clone();
#[cfg(feature = "query_cache")]
query_db_path.push("query_cache");

let events = Arc::new(
PostgresKnownEvents::with_postgres(database_url, escrow_config).await?,
);

#[cfg(feature = "query_cache")]
let query_cache = Arc::new(IdentifierCache::new(&query_db_path)?);

let comm = Arc::new(Communication {
events: events.clone(),
transport,
tel_transport,
});

let controller = Self {
known_events: events.clone(),
communication: comm,
#[cfg(feature = "query_cache")]
cache: query_cache,
};
if !initial_oobis.is_empty() {
controller.setup_witnesses(&initial_oobis).await.unwrap();
}
Ok(controller)
}
}
58 changes: 58 additions & 0 deletions components/controller/src/controller/redb.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use std::sync::Arc;

use keri_core::database::redb::RedbDatabase;
use keri_core::oobi_manager::storage::RedbOobiStorage;
use teliox::database::redb::RedbTelDatabase;

#[cfg(feature = "query_cache")]
use crate::identifier::mechanics::cache::IdentifierCache;
use crate::{
communication::Communication,
config::ControllerConfig,
error::ControllerError,
identifier::Identifier,
known_events::RedbKnownEvents,
};

use super::Controller;

pub type RedbController = Controller<RedbDatabase, RedbTelDatabase, RedbOobiStorage>;
pub type RedbIdentifier = Identifier<RedbDatabase, RedbTelDatabase, RedbOobiStorage>;

impl RedbController {
pub fn new(config: ControllerConfig) -> Result<Self, ControllerError> {
let ControllerConfig {
db_path,
initial_oobis,
escrow_config,
transport,
tel_transport,
} = config;
std::fs::create_dir_all(&db_path).unwrap();
#[cfg(feature = "query_cache")]
let mut query_db_path = db_path.clone();
#[cfg(feature = "query_cache")]
query_db_path.push("query_cache");

let events = Arc::new(RedbKnownEvents::with_redb(db_path, escrow_config)?);

#[cfg(feature = "query_cache")]
let query_cache = Arc::new(IdentifierCache::new(&query_db_path)?);
let comm = Arc::new(Communication {
events: events.clone(),
transport,
tel_transport,
});

let controller = Self {
known_events: events.clone(),
communication: comm,
#[cfg(feature = "query_cache")]
cache: query_cache,
};
if !initial_oobis.is_empty() {
async_std::task::block_on(controller.setup_witnesses(&initial_oobis)).unwrap();
}
Ok(controller)
}
}
10 changes: 9 additions & 1 deletion components/controller/src/controller/verifying.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
use cesrox::{parse_many, payload::Payload, ParsedData};
use itertools::Itertools;
use keri_core::{
database::{EscrowCreator, EventDatabase},
event_message::signature::{get_signatures, Signature},
oobi::Oobi,
oobi_manager::storage::OobiStorageBackend,
processor::validator::{EventValidator, VerificationError},
};
use teliox::database::TelEventDatabase;

use crate::{error::ControllerError, known_events::KnownEvents};

impl KnownEvents {
impl<D, T, S> KnownEvents<D, T, S>
where
D: EventDatabase + EscrowCreator + Send + Sync + 'static,
T: TelEventDatabase + Send + Sync + 'static,
S: OobiStorageBackend,
{
pub fn verify(&self, data: &[u8], signature: &Signature) -> Result<(), VerificationError> {
let verifier = EventValidator::new(self.storage.events_db.clone());
verifier.verify(data, signature)
Expand Down
19 changes: 14 additions & 5 deletions components/controller/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use keri_core::{
actor::prelude::VersionError, database::redb::RedbError,
event_message::cesr_adapter::ParseError, oobi::Scheme, prefix::IdentifierPrefix,
processor::validator::VerificationError,
actor::prelude::VersionError,
event_message::cesr_adapter::ParseError, oobi::Scheme, oobi::error::OobiError,
prefix::IdentifierPrefix, processor::validator::VerificationError,
};
use thiserror::Error;

Expand All @@ -12,8 +12,8 @@ use crate::{

#[derive(Error, Debug)]
pub enum ControllerError {
#[error("Redb error: {0}")]
RedbError(#[from] RedbError),
#[error("Database error: {0}")]
DatabaseError(String),

#[cfg(feature = "query_cache")]
#[error("SQL error: {0}")]
Expand Down Expand Up @@ -58,9 +58,18 @@ pub enum ControllerError {
#[error("Error: {0}")]
OtherError(String),

#[error("Oobi error: {0}")]
OobiError(String),

#[error(transparent)]
Mechanic(#[from] MechanicsError),

#[error("Watcher response error: {0}")]
WatcherResponseError(#[from] WatcherResponseError),
}

impl From<OobiError> for ControllerError {
fn from(e: OobiError) -> Self {
ControllerError::OobiError(e.to_string())
}
}
Loading