Skip to content

Introduce generalized DataStore #544

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,23 @@ use crate::gossip::GossipSource;
use crate::io::sqlite_store::SqliteStore;
use crate::io::utils::{read_node_metrics, write_node_metrics};
use crate::io::vss_store::VssStore;
use crate::io::{
self, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
};
use crate::liquidity::{
LSPS1ClientConfig, LSPS2ClientConfig, LSPS2ServiceConfig, LiquiditySourceBuilder,
};
use crate::logger::{log_error, log_info, LdkLogger, LogLevel, LogWriter, Logger};
use crate::message_handler::NodeCustomMessageHandler;
use crate::payment::store::PaymentStore;
use crate::peer_store::PeerStore;
use crate::tx_broadcaster::TransactionBroadcaster;
use crate::types::{
ChainMonitor, ChannelManager, DynStore, GossipSync, Graph, KeysManager, MessageRouter,
OnionMessenger, PeerManager,
OnionMessenger, PaymentStore, PeerManager,
};
use crate::wallet::persist::KVStoreWalletPersister;
use crate::wallet::Wallet;
use crate::{io, Node, NodeMetrics};
use crate::{Node, NodeMetrics};

use lightning::chain::{chainmonitor, BestBlock, Watch};
use lightning::io::Cursor;
Expand Down Expand Up @@ -1015,9 +1017,13 @@ fn build_with_store_internal(
let fee_estimator = Arc::new(OnchainFeeEstimator::new());

let payment_store = match io::utils::read_payments(Arc::clone(&kv_store), Arc::clone(&logger)) {
Ok(payments) => {
Arc::new(PaymentStore::new(payments, Arc::clone(&kv_store), Arc::clone(&logger)))
},
Ok(payments) => Arc::new(PaymentStore::new(
payments,
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
Arc::clone(&kv_store),
Arc::clone(&logger),
)),
Err(_) => {
return Err(BuildError::ReadFailed);
},
Expand Down
287 changes: 287 additions & 0 deletions src/data_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
// This file is Copyright its original authors, visible in version control history.
//
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
// accordance with one or both of these licenses.

use crate::logger::{log_error, LdkLogger};
use crate::types::DynStore;
use crate::Error;

use lightning::util::ser::{Readable, Writeable};

use std::collections::hash_map;
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::{Arc, Mutex};

pub(crate) trait StorableObject: Clone + Readable + Writeable {
type Id: StorableObjectId;
type Update: StorableObjectUpdate<Self>;

fn id(&self) -> Self::Id;
fn update(&mut self, update: &Self::Update) -> bool;
fn to_update(&self) -> Self::Update;
}

pub(crate) trait StorableObjectId: std::hash::Hash + PartialEq + Eq {
fn encode_to_hex_str(&self) -> String;
}

pub(crate) trait StorableObjectUpdate<SO: StorableObject> {
fn id(&self) -> SO::Id;
}

#[derive(PartialEq, Eq, Debug, Clone, Copy)]
pub(crate) enum DataStoreUpdateResult {
Updated,
Unchanged,
NotFound,
}

pub(crate) struct DataStore<SO: StorableObject, L: Deref>
where
L::Target: LdkLogger,
{
objects: Mutex<HashMap<SO::Id, SO>>,
primary_namespace: String,
secondary_namespace: String,
kv_store: Arc<DynStore>,
logger: L,
}

impl<SO: StorableObject, L: Deref> DataStore<SO, L>
where
L::Target: LdkLogger,
{
pub(crate) fn new(
objects: Vec<SO>, primary_namespace: String, secondary_namespace: String,
kv_store: Arc<DynStore>, logger: L,
) -> Self {
let objects =
Mutex::new(HashMap::from_iter(objects.into_iter().map(|obj| (obj.id(), obj))));
Self { objects, primary_namespace, secondary_namespace, kv_store, logger }
}

pub(crate) fn insert(&self, object: SO) -> Result<bool, Error> {
let mut locked_objects = self.objects.lock().unwrap();

self.persist(&object)?;
let updated = locked_objects.insert(object.id(), object).is_some();
Comment on lines +70 to +71
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the order matter here? The opposite order is used in insert_or_update, so there we could have the object in memory but fail persisting.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this one used to be the other way around, but I changed it here (actually should be the only behavioral diff I snuck in, IIRC), exactly because it would be preferable to not update the in-memory version if persistence failed. However, for insert_or_update/update we won't know whether the update is necessary at all until StorableObject::update returns, which is why we unfortunately can't first persist.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For insert_or_update/update, it seems we log an error but continue. I suppose this predates the PR, but I wonder if we should do something else like have a queue of ids that still need persistence. Though if we eventually crash that information would be loss. Maybe this is more of a storage implementation concern.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we could do that eventually, although it probably would somehow need to fit into a "halt everything on (remote) persistence failure, and resume/restart once we reestablish connectivity" scheme.

Ok(updated)
}

pub(crate) fn insert_or_update(&self, object: SO) -> Result<bool, Error> {
let mut locked_objects = self.objects.lock().unwrap();

let updated;
match locked_objects.entry(object.id()) {
hash_map::Entry::Occupied(mut e) => {
let update = object.to_update();
updated = e.get_mut().update(&update);
if updated {
self.persist(&e.get())?;
}
},
hash_map::Entry::Vacant(e) => {
e.insert(object.clone());
self.persist(&object)?;
updated = true;
},
}

Ok(updated)
}

pub(crate) fn remove(&self, id: &SO::Id) -> Result<(), Error> {
let removed = self.objects.lock().unwrap().remove(id).is_some();
if removed {
let store_key = id.encode_to_hex_str();
self.kv_store
.remove(&self.primary_namespace, &self.secondary_namespace, &store_key, false)
.map_err(|e| {
log_error!(
self.logger,
"Removing object data for key {}/{}/{} failed due to: {}",
&self.primary_namespace,
&self.secondary_namespace,
store_key,
e
);
Error::PersistenceFailed
})?;
}
Ok(())
}

pub(crate) fn get(&self, id: &SO::Id) -> Option<SO> {
self.objects.lock().unwrap().get(id).cloned()
}

pub(crate) fn update(&self, update: &SO::Update) -> Result<DataStoreUpdateResult, Error> {
let mut locked_objects = self.objects.lock().unwrap();

if let Some(object) = locked_objects.get_mut(&update.id()) {
let updated = object.update(update);
if updated {
self.persist(&object)?;
Ok(DataStoreUpdateResult::Updated)
} else {
Ok(DataStoreUpdateResult::Unchanged)
}
} else {
Ok(DataStoreUpdateResult::NotFound)
}
}

pub(crate) fn list_filter<F: FnMut(&&SO) -> bool>(&self, f: F) -> Vec<SO> {
self.objects.lock().unwrap().values().filter(f).cloned().collect::<Vec<SO>>()
}

fn persist(&self, object: &SO) -> Result<(), Error> {
let store_key = object.id().encode_to_hex_str();
let data = object.encode();
self.kv_store
.write(&self.primary_namespace, &self.secondary_namespace, &store_key, &data)
.map_err(|e| {
log_error!(
self.logger,
"Write for key {}/{}/{} failed due to: {}",
&self.primary_namespace,
&self.secondary_namespace,
store_key,
e
);
Error::PersistenceFailed
})?;
Ok(())
}
}

#[cfg(test)]
mod tests {
use lightning::impl_writeable_tlv_based;
use lightning::util::test_utils::{TestLogger, TestStore};

use crate::hex_utils;

use super::*;

#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
struct TestObjectId {
id: [u8; 4],
}

impl StorableObjectId for TestObjectId {
fn encode_to_hex_str(&self) -> String {
hex_utils::to_string(&self.id)
}
}
impl_writeable_tlv_based!(TestObjectId, { (0, id, required) });

struct TestObjectUpdate {
id: TestObjectId,
data: [u8; 3],
}
impl StorableObjectUpdate<TestObject> for TestObjectUpdate {
fn id(&self) -> TestObjectId {
self.id
}
}

#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
struct TestObject {
id: TestObjectId,
data: [u8; 3],
}

impl StorableObject for TestObject {
type Id = TestObjectId;
type Update = TestObjectUpdate;

fn id(&self) -> Self::Id {
self.id
}

fn update(&mut self, update: &Self::Update) -> bool {
if self.data != update.data {
self.data = update.data;
true
} else {
false
}
}

fn to_update(&self) -> Self::Update {
Self::Update { id: self.id, data: self.data }
}
}

impl_writeable_tlv_based!(TestObject, {
(0, id, required),
(2, data, required),
});

#[test]
fn data_is_persisted() {
let store: Arc<DynStore> = Arc::new(TestStore::new(false));
let logger = Arc::new(TestLogger::new());
let primary_namespace = "datastore_test_primary".to_string();
let secondary_namespace = "datastore_test_secondary".to_string();
let data_store: DataStore<TestObject, Arc<TestLogger>> = DataStore::new(
Vec::new(),
primary_namespace.clone(),
secondary_namespace.clone(),
Arc::clone(&store),
logger,
);

let id = TestObjectId { id: [42u8; 4] };
assert!(data_store.get(&id).is_none());

let store_key = id.encode_to_hex_str();

// Check we start empty.
assert!(store.read(&primary_namespace, &secondary_namespace, &store_key).is_err());

// Check we successfully store an object and return `false`
let object = TestObject { id, data: [23u8; 3] };
assert_eq!(Ok(false), data_store.insert(object.clone()));
assert_eq!(Some(object), data_store.get(&id));
assert!(store.read(&primary_namespace, &secondary_namespace, &store_key).is_ok());

// Test re-insertion returns `true`
let mut override_object = object.clone();
override_object.data = [24u8; 3];
assert_eq!(Ok(true), data_store.insert(override_object));
assert_eq!(Some(override_object), data_store.get(&id));

// Check update returns `Updated`
let update = TestObjectUpdate { id, data: [25u8; 3] };
assert_eq!(Ok(DataStoreUpdateResult::Updated), data_store.update(&update));
assert_eq!(data_store.get(&id).unwrap().data, [25u8; 3]);

// Check no-op update yields `Unchanged`
let update = TestObjectUpdate { id, data: [25u8; 3] };
assert_eq!(Ok(DataStoreUpdateResult::Unchanged), data_store.update(&update));

// Check bogus update yields `NotFound`
let bogus_id = TestObjectId { id: [84u8; 4] };
let update = TestObjectUpdate { id: bogus_id, data: [12u8; 3] };
assert_eq!(Ok(DataStoreUpdateResult::NotFound), data_store.update(&update));

// Check `insert_or_update` inserts unknown objects
let iou_id = TestObjectId { id: [55u8; 4] };
let iou_object = TestObject { id: iou_id, data: [34u8; 3] };
assert_eq!(Ok(true), data_store.insert_or_update(iou_object.clone()));

// Check `insert_or_update` doesn't update the same object
assert_eq!(Ok(false), data_store.insert_or_update(iou_object.clone()));

// Check `insert_or_update` updates if object changed
let mut new_iou_object = iou_object;
new_iou_object.data[0] += 1;
assert_eq!(Ok(true), data_store.insert_or_update(new_iou_object));
}
}
13 changes: 6 additions & 7 deletions src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
// accordance with one or both of these licenses.

use crate::types::{CustomTlvRecord, DynStore, Sweeper, Wallet};
use crate::types::{CustomTlvRecord, DynStore, PaymentStore, Sweeper, Wallet};

use crate::{
hex_utils, BumpTransactionEventHandler, ChannelManager, Error, Graph, PeerInfo, PeerStore,
Expand All @@ -14,13 +14,13 @@ use crate::{

use crate::config::{may_announce_channel, Config};
use crate::connection::ConnectionManager;
use crate::data_store::DataStoreUpdateResult;
use crate::fee_estimator::ConfirmationTarget;
use crate::liquidity::LiquiditySource;
use crate::logger::Logger;

use crate::payment::store::{
PaymentDetails, PaymentDetailsUpdate, PaymentDirection, PaymentKind, PaymentStatus,
PaymentStore, PaymentStoreUpdateResult,
};

use crate::io::{
Expand Down Expand Up @@ -449,7 +449,7 @@ where
output_sweeper: Arc<Sweeper>,
network_graph: Arc<Graph>,
liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
payment_store: Arc<PaymentStore<L>>,
payment_store: Arc<PaymentStore>,
peer_store: Arc<PeerStore<L>>,
runtime: Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>>,
logger: L,
Expand All @@ -466,7 +466,7 @@ where
channel_manager: Arc<ChannelManager>, connection_manager: Arc<ConnectionManager<L>>,
output_sweeper: Arc<Sweeper>, network_graph: Arc<Graph>,
liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
payment_store: Arc<PaymentStore<L>>, peer_store: Arc<PeerStore<L>>,
payment_store: Arc<PaymentStore>, peer_store: Arc<PeerStore<L>>,
runtime: Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>>, logger: L, config: Arc<Config>,
) -> Self {
Self {
Expand Down Expand Up @@ -906,12 +906,11 @@ where
};

match self.payment_store.update(&update) {
Ok(PaymentStoreUpdateResult::Updated)
| Ok(PaymentStoreUpdateResult::Unchanged) => (
Ok(DataStoreUpdateResult::Updated) | Ok(DataStoreUpdateResult::Unchanged) => (
// No need to do anything if the idempotent update was applied, which might
// be the result of a replayed event.
),
Ok(PaymentStoreUpdateResult::NotFound) => {
Ok(DataStoreUpdateResult::NotFound) => {
log_error!(
self.logger,
"Claimed payment with ID {} couldn't be found in store",
Expand Down
Loading
Loading