From 93622f7468e6deb89978afca210b64505efb761b Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Tue, 20 May 2025 15:26:24 +0200 Subject: [PATCH 1/2] Add generic `DataStore` We increasingly feature different data stores that essentially do the same thing, mod different data types. Here we add a generalized `DataStore` that will be used to DRY up our logic. --- src/data_store.rs | 287 ++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + 2 files changed, 288 insertions(+) create mode 100644 src/data_store.rs diff --git a/src/data_store.rs b/src/data_store.rs new file mode 100644 index 000000000..78e3e7870 --- /dev/null +++ b/src/data_store.rs @@ -0,0 +1,287 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +use crate::logger::{log_error, LdkLogger}; +use crate::types::DynStore; +use crate::Error; + +use lightning::util::ser::{Readable, Writeable}; + +use std::collections::hash_map; +use std::collections::HashMap; +use std::ops::Deref; +use std::sync::{Arc, Mutex}; + +pub(crate) trait StorableObject: Clone + Readable + Writeable { + type Id: StorableObjectId; + type Update: StorableObjectUpdate; + + fn id(&self) -> Self::Id; + fn update(&mut self, update: &Self::Update) -> bool; + fn to_update(&self) -> Self::Update; +} + +pub(crate) trait StorableObjectId: std::hash::Hash + PartialEq + Eq { + fn encode_to_hex_str(&self) -> String; +} + +pub(crate) trait StorableObjectUpdate { + fn id(&self) -> SO::Id; +} + +#[derive(PartialEq, Eq, Debug, Clone, Copy)] +pub(crate) enum DataStoreUpdateResult { + Updated, + Unchanged, + NotFound, +} + +pub(crate) struct DataStore +where + L::Target: LdkLogger, +{ + objects: Mutex>, + primary_namespace: String, + secondary_namespace: String, + kv_store: Arc, + logger: L, +} + +impl DataStore +where + L::Target: LdkLogger, +{ + pub(crate) fn new( + objects: Vec, primary_namespace: String, secondary_namespace: String, + kv_store: Arc, logger: L, + ) -> Self { + let objects = + Mutex::new(HashMap::from_iter(objects.into_iter().map(|obj| (obj.id(), obj)))); + Self { objects, primary_namespace, secondary_namespace, kv_store, logger } + } + + pub(crate) fn insert(&self, object: SO) -> Result { + let mut locked_objects = self.objects.lock().unwrap(); + + self.persist(&object)?; + let updated = locked_objects.insert(object.id(), object).is_some(); + Ok(updated) + } + + pub(crate) fn insert_or_update(&self, object: SO) -> Result { + let mut locked_objects = self.objects.lock().unwrap(); + + let updated; + match locked_objects.entry(object.id()) { + hash_map::Entry::Occupied(mut e) => { + let update = object.to_update(); + updated = e.get_mut().update(&update); + if updated { + self.persist(&e.get())?; + } + }, + hash_map::Entry::Vacant(e) => { + e.insert(object.clone()); + self.persist(&object)?; + updated = true; + }, + } + + Ok(updated) + } + + pub(crate) fn remove(&self, id: &SO::Id) -> Result<(), Error> { + let removed = self.objects.lock().unwrap().remove(id).is_some(); + if removed { + let store_key = id.encode_to_hex_str(); + self.kv_store + .remove(&self.primary_namespace, &self.secondary_namespace, &store_key, false) + .map_err(|e| { + log_error!( + self.logger, + "Removing object data for key {}/{}/{} failed due to: {}", + &self.primary_namespace, + &self.secondary_namespace, + store_key, + e + ); + Error::PersistenceFailed + })?; + } + Ok(()) + } + + pub(crate) fn get(&self, id: &SO::Id) -> Option { + self.objects.lock().unwrap().get(id).cloned() + } + + pub(crate) fn update(&self, update: &SO::Update) -> Result { + let mut locked_objects = self.objects.lock().unwrap(); + + if let Some(object) = locked_objects.get_mut(&update.id()) { + let updated = object.update(update); + if updated { + self.persist(&object)?; + Ok(DataStoreUpdateResult::Updated) + } else { + Ok(DataStoreUpdateResult::Unchanged) + } + } else { + Ok(DataStoreUpdateResult::NotFound) + } + } + + pub(crate) fn list_filter bool>(&self, f: F) -> Vec { + self.objects.lock().unwrap().values().filter(f).cloned().collect::>() + } + + fn persist(&self, object: &SO) -> Result<(), Error> { + let store_key = object.id().encode_to_hex_str(); + let data = object.encode(); + self.kv_store + .write(&self.primary_namespace, &self.secondary_namespace, &store_key, &data) + .map_err(|e| { + log_error!( + self.logger, + "Write for key {}/{}/{} failed due to: {}", + &self.primary_namespace, + &self.secondary_namespace, + store_key, + e + ); + Error::PersistenceFailed + })?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use lightning::impl_writeable_tlv_based; + use lightning::util::test_utils::{TestLogger, TestStore}; + + use crate::hex_utils; + + use super::*; + + #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] + struct TestObjectId { + id: [u8; 4], + } + + impl StorableObjectId for TestObjectId { + fn encode_to_hex_str(&self) -> String { + hex_utils::to_string(&self.id) + } + } + impl_writeable_tlv_based!(TestObjectId, { (0, id, required) }); + + struct TestObjectUpdate { + id: TestObjectId, + data: [u8; 3], + } + impl StorableObjectUpdate for TestObjectUpdate { + fn id(&self) -> TestObjectId { + self.id + } + } + + #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] + struct TestObject { + id: TestObjectId, + data: [u8; 3], + } + + impl StorableObject for TestObject { + type Id = TestObjectId; + type Update = TestObjectUpdate; + + fn id(&self) -> Self::Id { + self.id + } + + fn update(&mut self, update: &Self::Update) -> bool { + if self.data != update.data { + self.data = update.data; + true + } else { + false + } + } + + fn to_update(&self) -> Self::Update { + Self::Update { id: self.id, data: self.data } + } + } + + impl_writeable_tlv_based!(TestObject, { + (0, id, required), + (2, data, required), + }); + + #[test] + fn data_is_persisted() { + let store: Arc = Arc::new(TestStore::new(false)); + let logger = Arc::new(TestLogger::new()); + let primary_namespace = "datastore_test_primary".to_string(); + let secondary_namespace = "datastore_test_secondary".to_string(); + let data_store: DataStore> = DataStore::new( + Vec::new(), + primary_namespace.clone(), + secondary_namespace.clone(), + Arc::clone(&store), + logger, + ); + + let id = TestObjectId { id: [42u8; 4] }; + assert!(data_store.get(&id).is_none()); + + let store_key = id.encode_to_hex_str(); + + // Check we start empty. + assert!(store.read(&primary_namespace, &secondary_namespace, &store_key).is_err()); + + // Check we successfully store an object and return `false` + let object = TestObject { id, data: [23u8; 3] }; + assert_eq!(Ok(false), data_store.insert(object.clone())); + assert_eq!(Some(object), data_store.get(&id)); + assert!(store.read(&primary_namespace, &secondary_namespace, &store_key).is_ok()); + + // Test re-insertion returns `true` + let mut override_object = object.clone(); + override_object.data = [24u8; 3]; + assert_eq!(Ok(true), data_store.insert(override_object)); + assert_eq!(Some(override_object), data_store.get(&id)); + + // Check update returns `Updated` + let update = TestObjectUpdate { id, data: [25u8; 3] }; + assert_eq!(Ok(DataStoreUpdateResult::Updated), data_store.update(&update)); + assert_eq!(data_store.get(&id).unwrap().data, [25u8; 3]); + + // Check no-op update yields `Unchanged` + let update = TestObjectUpdate { id, data: [25u8; 3] }; + assert_eq!(Ok(DataStoreUpdateResult::Unchanged), data_store.update(&update)); + + // Check bogus update yields `NotFound` + let bogus_id = TestObjectId { id: [84u8; 4] }; + let update = TestObjectUpdate { id: bogus_id, data: [12u8; 3] }; + assert_eq!(Ok(DataStoreUpdateResult::NotFound), data_store.update(&update)); + + // Check `insert_or_update` inserts unknown objects + let iou_id = TestObjectId { id: [55u8; 4] }; + let iou_object = TestObject { id: iou_id, data: [34u8; 3] }; + assert_eq!(Ok(true), data_store.insert_or_update(iou_object.clone())); + + // Check `insert_or_update` doesn't update the same object + assert_eq!(Ok(false), data_store.insert_or_update(iou_object.clone())); + + // Check `insert_or_update` updates if object changed + let mut new_iou_object = iou_object; + new_iou_object.data[0] += 1; + assert_eq!(Ok(true), data_store.insert_or_update(new_iou_object)); + } +} diff --git a/src/lib.rs b/src/lib.rs index 7859a092e..7334eacd5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -80,6 +80,7 @@ mod builder; mod chain; pub mod config; mod connection; +mod data_store; mod error; mod event; mod fee_estimator; From 75aa06963ba1c952c51a6de932cc7e5111768afa Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Tue, 20 May 2025 16:32:07 +0200 Subject: [PATCH 2/2] Refactor `PaymentStore` to use `DataStore` .. we utilize the just-introduced generalized `DataStore` for our `PaymentStore`. --- src/builder.rs | 18 +- src/event.rs | 13 +- src/lib.rs | 5 +- src/payment/bolt11.rs | 13 +- src/payment/bolt12.rs | 10 +- src/payment/spontaneous.rs | 10 +- src/payment/store.rs | 428 +++++++++++-------------------------- src/types.rs | 4 + src/wallet/mod.rs | 11 +- 9 files changed, 165 insertions(+), 347 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index d48b71c7c..31a0fee45 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -18,21 +18,23 @@ use crate::gossip::GossipSource; use crate::io::sqlite_store::SqliteStore; use crate::io::utils::{read_node_metrics, write_node_metrics}; use crate::io::vss_store::VssStore; +use crate::io::{ + self, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, +}; use crate::liquidity::{ LSPS1ClientConfig, LSPS2ClientConfig, LSPS2ServiceConfig, LiquiditySourceBuilder, }; use crate::logger::{log_error, log_info, LdkLogger, LogLevel, LogWriter, Logger}; use crate::message_handler::NodeCustomMessageHandler; -use crate::payment::store::PaymentStore; use crate::peer_store::PeerStore; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ ChainMonitor, ChannelManager, DynStore, GossipSync, Graph, KeysManager, MessageRouter, - OnionMessenger, PeerManager, + OnionMessenger, PaymentStore, PeerManager, }; use crate::wallet::persist::KVStoreWalletPersister; use crate::wallet::Wallet; -use crate::{io, Node, NodeMetrics}; +use crate::{Node, NodeMetrics}; use lightning::chain::{chainmonitor, BestBlock, Watch}; use lightning::io::Cursor; @@ -1015,9 +1017,13 @@ fn build_with_store_internal( let fee_estimator = Arc::new(OnchainFeeEstimator::new()); let payment_store = match io::utils::read_payments(Arc::clone(&kv_store), Arc::clone(&logger)) { - Ok(payments) => { - Arc::new(PaymentStore::new(payments, Arc::clone(&kv_store), Arc::clone(&logger))) - }, + Ok(payments) => Arc::new(PaymentStore::new( + payments, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + Arc::clone(&kv_store), + Arc::clone(&logger), + )), Err(_) => { return Err(BuildError::ReadFailed); }, diff --git a/src/event.rs b/src/event.rs index 00d8441e5..e95983710 100644 --- a/src/event.rs +++ b/src/event.rs @@ -5,7 +5,7 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. -use crate::types::{CustomTlvRecord, DynStore, Sweeper, Wallet}; +use crate::types::{CustomTlvRecord, DynStore, PaymentStore, Sweeper, Wallet}; use crate::{ hex_utils, BumpTransactionEventHandler, ChannelManager, Error, Graph, PeerInfo, PeerStore, @@ -14,13 +14,13 @@ use crate::{ use crate::config::{may_announce_channel, Config}; use crate::connection::ConnectionManager; +use crate::data_store::DataStoreUpdateResult; use crate::fee_estimator::ConfirmationTarget; use crate::liquidity::LiquiditySource; use crate::logger::Logger; use crate::payment::store::{ PaymentDetails, PaymentDetailsUpdate, PaymentDirection, PaymentKind, PaymentStatus, - PaymentStore, PaymentStoreUpdateResult, }; use crate::io::{ @@ -449,7 +449,7 @@ where output_sweeper: Arc, network_graph: Arc, liquidity_source: Option>>>, - payment_store: Arc>, + payment_store: Arc, peer_store: Arc>, runtime: Arc>>>, logger: L, @@ -466,7 +466,7 @@ where channel_manager: Arc, connection_manager: Arc>, output_sweeper: Arc, network_graph: Arc, liquidity_source: Option>>>, - payment_store: Arc>, peer_store: Arc>, + payment_store: Arc, peer_store: Arc>, runtime: Arc>>>, logger: L, config: Arc, ) -> Self { Self { @@ -906,12 +906,11 @@ where }; match self.payment_store.update(&update) { - Ok(PaymentStoreUpdateResult::Updated) - | Ok(PaymentStoreUpdateResult::Unchanged) => ( + Ok(DataStoreUpdateResult::Updated) | Ok(DataStoreUpdateResult::Unchanged) => ( // No need to do anything if the idempotent update was applied, which might // be the result of a replayed event. ), - Ok(PaymentStoreUpdateResult::NotFound) => { + Ok(DataStoreUpdateResult::NotFound) => { log_error!( self.logger, "Claimed payment with ID {} couldn't be found in store", diff --git a/src/lib.rs b/src/lib.rs index 7334eacd5..c3bfe16d8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -136,7 +136,6 @@ use gossip::GossipSource; use graph::NetworkGraph; use io::utils::write_node_metrics; use liquidity::{LSPS1Liquidity, LiquiditySource}; -use payment::store::PaymentStore; use payment::{ Bolt11Payment, Bolt12Payment, OnchainPayment, PaymentDetails, SpontaneousPayment, UnifiedQrPayment, @@ -144,7 +143,7 @@ use payment::{ use peer_store::{PeerInfo, PeerStore}; use types::{ Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, DynStore, Graph, - KeysManager, OnionMessenger, PeerManager, Router, Scorer, Sweeper, Wallet, + KeysManager, OnionMessenger, PaymentStore, PeerManager, Router, Scorer, Sweeper, Wallet, }; pub use types::{ChannelDetails, CustomTlvRecord, PeerDetails, UserChannelId}; @@ -200,7 +199,7 @@ pub struct Node { _router: Arc, scorer: Arc>, peer_store: Arc>>, - payment_store: Arc>>, + payment_store: Arc, is_listening: Arc, node_metrics: Arc>, } diff --git a/src/payment/bolt11.rs b/src/payment/bolt11.rs index 22e12681f..052571818 100644 --- a/src/payment/bolt11.rs +++ b/src/payment/bolt11.rs @@ -11,16 +11,17 @@ use crate::config::{Config, LDK_PAYMENT_RETRY_TIMEOUT}; use crate::connection::ConnectionManager; +use crate::data_store::DataStoreUpdateResult; use crate::error::Error; use crate::liquidity::LiquiditySource; use crate::logger::{log_error, log_info, LdkLogger, Logger}; use crate::payment::store::{ LSPFeeLimits, PaymentDetails, PaymentDetailsUpdate, PaymentDirection, PaymentKind, - PaymentStatus, PaymentStore, PaymentStoreUpdateResult, + PaymentStatus, }; use crate::payment::SendingParameters; use crate::peer_store::{PeerInfo, PeerStore}; -use crate::types::ChannelManager; +use crate::types::{ChannelManager, PaymentStore}; use lightning::ln::bolt11_payment; use lightning::ln::channelmanager::{ @@ -90,7 +91,7 @@ pub struct Bolt11Payment { channel_manager: Arc, connection_manager: Arc>>, liquidity_source: Option>>>, - payment_store: Arc>>, + payment_store: Arc, peer_store: Arc>>, config: Arc, logger: Arc, @@ -102,7 +103,7 @@ impl Bolt11Payment { channel_manager: Arc, connection_manager: Arc>>, liquidity_source: Option>>>, - payment_store: Arc>>, peer_store: Arc>>, + payment_store: Arc, peer_store: Arc>>, config: Arc, logger: Arc, ) -> Self { Self { @@ -434,8 +435,8 @@ impl Bolt11Payment { }; match self.payment_store.update(&update) { - Ok(PaymentStoreUpdateResult::Updated) | Ok(PaymentStoreUpdateResult::Unchanged) => (), - Ok(PaymentStoreUpdateResult::NotFound) => { + Ok(DataStoreUpdateResult::Updated) | Ok(DataStoreUpdateResult::Unchanged) => (), + Ok(DataStoreUpdateResult::NotFound) => { log_error!( self.logger, "Failed to manually fail unknown payment with hash {}", diff --git a/src/payment/bolt12.rs b/src/payment/bolt12.rs index dbeee0ab8..8006f4bb9 100644 --- a/src/payment/bolt12.rs +++ b/src/payment/bolt12.rs @@ -12,10 +12,8 @@ use crate::config::LDK_PAYMENT_RETRY_TIMEOUT; use crate::error::Error; use crate::logger::{log_error, log_info, LdkLogger, Logger}; -use crate::payment::store::{ - PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, PaymentStore, -}; -use crate::types::ChannelManager; +use crate::payment::store::{PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus}; +use crate::types::{ChannelManager, PaymentStore}; use lightning::ln::channelmanager::{PaymentId, Retry}; use lightning::offers::invoice::Bolt12Invoice; @@ -39,14 +37,14 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; pub struct Bolt12Payment { runtime: Arc>>>, channel_manager: Arc, - payment_store: Arc>>, + payment_store: Arc, logger: Arc, } impl Bolt12Payment { pub(crate) fn new( runtime: Arc>>>, - channel_manager: Arc, payment_store: Arc>>, + channel_manager: Arc, payment_store: Arc, logger: Arc, ) -> Self { Self { runtime, channel_manager, payment_store, logger } diff --git a/src/payment/spontaneous.rs b/src/payment/spontaneous.rs index f33ea15cc..1508b6cd8 100644 --- a/src/payment/spontaneous.rs +++ b/src/payment/spontaneous.rs @@ -10,11 +10,9 @@ use crate::config::{Config, LDK_PAYMENT_RETRY_TIMEOUT}; use crate::error::Error; use crate::logger::{log_error, log_info, LdkLogger, Logger}; -use crate::payment::store::{ - PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, PaymentStore, -}; +use crate::payment::store::{PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus}; use crate::payment::SendingParameters; -use crate::types::{ChannelManager, CustomTlvRecord, KeysManager}; +use crate::types::{ChannelManager, CustomTlvRecord, KeysManager, PaymentStore}; use lightning::ln::channelmanager::{PaymentId, RecipientOnionFields, Retry, RetryableSendFailure}; use lightning::routing::router::{PaymentParameters, RouteParameters}; @@ -38,7 +36,7 @@ pub struct SpontaneousPayment { runtime: Arc>>>, channel_manager: Arc, keys_manager: Arc, - payment_store: Arc>>, + payment_store: Arc, config: Arc, logger: Arc, } @@ -47,7 +45,7 @@ impl SpontaneousPayment { pub(crate) fn new( runtime: Arc>>>, channel_manager: Arc, keys_manager: Arc, - payment_store: Arc>>, config: Arc, logger: Arc, + payment_store: Arc, config: Arc, logger: Arc, ) -> Self { Self { runtime, channel_manager, keys_manager, payment_store, config, logger } } diff --git a/src/payment/store.rs b/src/payment/store.rs index 2a074031c..75b2b1b2a 100644 --- a/src/payment/store.rs +++ b/src/payment/store.rs @@ -5,14 +5,6 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. -use crate::hex_utils; -use crate::io::{ - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, -}; -use crate::logger::{log_error, LdkLogger}; -use crate::types::DynStore; -use crate::Error; - use lightning::ln::channelmanager::PaymentId; use lightning::ln::msgs::DecodeError; use lightning::offers::offer::OfferId; @@ -27,12 +19,11 @@ use lightning_types::payment::{PaymentHash, PaymentPreimage, PaymentSecret}; use bitcoin::{BlockHash, Txid}; -use std::collections::hash_map; -use std::collections::HashMap; -use std::ops::Deref; -use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use crate::data_store::{StorableObject, StorableObjectId, StorableObjectUpdate}; +use crate::hex_utils; + /// Represents a payment. #[derive(Clone, Debug, PartialEq, Eq)] pub struct PaymentDetails { @@ -70,8 +61,118 @@ impl PaymentDetails { .as_secs(); Self { id, kind, amount_msat, fee_paid_msat, direction, status, latest_update_timestamp } } +} + +impl Writeable for PaymentDetails { + fn write( + &self, writer: &mut W, + ) -> Result<(), lightning::io::Error> { + write_tlv_fields!(writer, { + (0, self.id, required), // Used to be `hash` for v0.2.1 and prior + // 1 briefly used to be lsp_fee_limits, could probably be reused at some point in the future. + // 2 used to be `preimage` before it was moved to `kind` in v0.3.0 + (2, None::>, required), + (3, self.kind, required), + // 4 used to be `secret` before it was moved to `kind` in v0.3.0 + (4, None::>, required), + (5, self.latest_update_timestamp, required), + (6, self.amount_msat, required), + (7, self.fee_paid_msat, option), + (8, self.direction, required), + (10, self.status, required) + }); + Ok(()) + } +} + +impl Readable for PaymentDetails { + fn read(reader: &mut R) -> Result { + let unix_time_secs = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::from_secs(0)) + .as_secs(); + _init_and_read_len_prefixed_tlv_fields!(reader, { + (0, id, required), // Used to be `hash` + (1, lsp_fee_limits, option), + (2, preimage, required), + (3, kind_opt, option), + (4, secret, required), + (5, latest_update_timestamp, (default_value, unix_time_secs)), + (6, amount_msat, required), + (7, fee_paid_msat, option), + (8, direction, required), + (10, status, required) + }); - pub(crate) fn update(&mut self, update: &PaymentDetailsUpdate) -> bool { + let id: PaymentId = id.0.ok_or(DecodeError::InvalidValue)?; + let preimage: Option = preimage.0.ok_or(DecodeError::InvalidValue)?; + let secret: Option = secret.0.ok_or(DecodeError::InvalidValue)?; + let latest_update_timestamp: u64 = + latest_update_timestamp.0.ok_or(DecodeError::InvalidValue)?; + let amount_msat: Option = amount_msat.0.ok_or(DecodeError::InvalidValue)?; + let direction: PaymentDirection = direction.0.ok_or(DecodeError::InvalidValue)?; + let status: PaymentStatus = status.0.ok_or(DecodeError::InvalidValue)?; + + let kind = if let Some(kind) = kind_opt { + // If we serialized the payment kind, use it. + // This will always be the case for any version after v0.2.1. + kind + } else { + // Otherwise we persisted with v0.2.1 or before, and puzzle together the kind from the + // provided fields. + + // We used to track everything by hash, but switched to track everything by id + // post-v0.2.1. As both are serialized identically, we just switched the `0`-type field above + // from `PaymentHash` to `PaymentId` and serialize a separate `PaymentHash` in + // `PaymentKind` when needed. Here, for backwards compat, we can just re-create the + // `PaymentHash` from the id, as 'back then' `payment_hash == payment_id` was always + // true. + let hash = PaymentHash(id.0); + + if secret.is_some() { + if let Some(lsp_fee_limits) = lsp_fee_limits { + let counterparty_skimmed_fee_msat = None; + PaymentKind::Bolt11Jit { + hash, + preimage, + secret, + counterparty_skimmed_fee_msat, + lsp_fee_limits, + } + } else { + PaymentKind::Bolt11 { hash, preimage, secret } + } + } else { + PaymentKind::Spontaneous { hash, preimage } + } + }; + + Ok(PaymentDetails { + id, + kind, + amount_msat, + fee_paid_msat, + direction, + status, + latest_update_timestamp, + }) + } +} + +impl StorableObjectId for PaymentId { + fn encode_to_hex_str(&self) -> String { + hex_utils::to_string(&self.0) + } +} +impl StorableObject for PaymentDetails { + type Id = PaymentId; + type Update = PaymentDetailsUpdate; + + fn id(&self) -> Self::Id { + self.id + } + + fn update(&mut self, update: &Self::Update) -> bool { debug_assert_eq!( self.id, update.id, "We should only ever override payment data for the same payment id" @@ -201,101 +302,9 @@ impl PaymentDetails { updated } -} -impl Writeable for PaymentDetails { - fn write( - &self, writer: &mut W, - ) -> Result<(), lightning::io::Error> { - write_tlv_fields!(writer, { - (0, self.id, required), // Used to be `hash` for v0.2.1 and prior - // 1 briefly used to be lsp_fee_limits, could probably be reused at some point in the future. - // 2 used to be `preimage` before it was moved to `kind` in v0.3.0 - (2, None::>, required), - (3, self.kind, required), - // 4 used to be `secret` before it was moved to `kind` in v0.3.0 - (4, None::>, required), - (5, self.latest_update_timestamp, required), - (6, self.amount_msat, required), - (7, self.fee_paid_msat, option), - (8, self.direction, required), - (10, self.status, required) - }); - Ok(()) - } -} - -impl Readable for PaymentDetails { - fn read(reader: &mut R) -> Result { - let unix_time_secs = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap_or(Duration::from_secs(0)) - .as_secs(); - _init_and_read_len_prefixed_tlv_fields!(reader, { - (0, id, required), // Used to be `hash` - (1, lsp_fee_limits, option), - (2, preimage, required), - (3, kind_opt, option), - (4, secret, required), - (5, latest_update_timestamp, (default_value, unix_time_secs)), - (6, amount_msat, required), - (7, fee_paid_msat, option), - (8, direction, required), - (10, status, required) - }); - - let id: PaymentId = id.0.ok_or(DecodeError::InvalidValue)?; - let preimage: Option = preimage.0.ok_or(DecodeError::InvalidValue)?; - let secret: Option = secret.0.ok_or(DecodeError::InvalidValue)?; - let latest_update_timestamp: u64 = - latest_update_timestamp.0.ok_or(DecodeError::InvalidValue)?; - let amount_msat: Option = amount_msat.0.ok_or(DecodeError::InvalidValue)?; - let direction: PaymentDirection = direction.0.ok_or(DecodeError::InvalidValue)?; - let status: PaymentStatus = status.0.ok_or(DecodeError::InvalidValue)?; - - let kind = if let Some(kind) = kind_opt { - // If we serialized the payment kind, use it. - // This will always be the case for any version after v0.2.1. - kind - } else { - // Otherwise we persisted with v0.2.1 or before, and puzzle together the kind from the - // provided fields. - - // We used to track everything by hash, but switched to track everything by id - // post-v0.2.1. As both are serialized identically, we just switched the `0`-type field above - // from `PaymentHash` to `PaymentId` and serialize a separate `PaymentHash` in - // `PaymentKind` when needed. Here, for backwards compat, we can just re-create the - // `PaymentHash` from the id, as 'back then' `payment_hash == payment_id` was always - // true. - let hash = PaymentHash(id.0); - - if secret.is_some() { - if let Some(lsp_fee_limits) = lsp_fee_limits { - let counterparty_skimmed_fee_msat = None; - PaymentKind::Bolt11Jit { - hash, - preimage, - secret, - counterparty_skimmed_fee_msat, - lsp_fee_limits, - } - } else { - PaymentKind::Bolt11 { hash, preimage, secret } - } - } else { - PaymentKind::Spontaneous { hash, preimage } - } - }; - - Ok(PaymentDetails { - id, - kind, - amount_msat, - fee_paid_msat, - direction, - status, - latest_update_timestamp, - }) + fn to_update(&self) -> Self::Update { + self.into() } } @@ -590,139 +599,9 @@ impl From<&PaymentDetails> for PaymentDetailsUpdate { } } -#[derive(PartialEq, Eq, Debug, Clone, Copy)] -pub(crate) enum PaymentStoreUpdateResult { - Updated, - Unchanged, - NotFound, -} - -pub(crate) struct PaymentStore -where - L::Target: LdkLogger, -{ - payments: Mutex>, - kv_store: Arc, - logger: L, -} - -impl PaymentStore -where - L::Target: LdkLogger, -{ - pub(crate) fn new(payments: Vec, kv_store: Arc, logger: L) -> Self { - let payments = Mutex::new(HashMap::from_iter( - payments.into_iter().map(|payment| (payment.id, payment)), - )); - Self { payments, kv_store, logger } - } - - pub(crate) fn insert(&self, payment: PaymentDetails) -> Result { - let mut locked_payments = self.payments.lock().unwrap(); - - let updated = locked_payments.insert(payment.id, payment.clone()).is_some(); - self.persist_info(&payment.id, &payment)?; - Ok(updated) - } - - pub(crate) fn insert_or_update(&self, payment: &PaymentDetails) -> Result { - let mut locked_payments = self.payments.lock().unwrap(); - - let updated; - match locked_payments.entry(payment.id) { - hash_map::Entry::Occupied(mut e) => { - let update = payment.into(); - updated = e.get_mut().update(&update); - if updated { - self.persist_info(&payment.id, e.get())?; - } - }, - hash_map::Entry::Vacant(e) => { - e.insert(payment.clone()); - self.persist_info(&payment.id, payment)?; - updated = true; - }, - } - - Ok(updated) - } - - pub(crate) fn remove(&self, id: &PaymentId) -> Result<(), Error> { - let removed = self.payments.lock().unwrap().remove(id).is_some(); - if removed { - let store_key = hex_utils::to_string(&id.0); - self.kv_store - .remove( - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - &store_key, - false, - ) - .map_err(|e| { - log_error!( - self.logger, - "Removing payment data for key {}/{}/{} failed due to: {}", - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - store_key, - e - ); - Error::PersistenceFailed - })?; - } - Ok(()) - } - - pub(crate) fn get(&self, id: &PaymentId) -> Option { - self.payments.lock().unwrap().get(id).cloned() - } - - pub(crate) fn update( - &self, update: &PaymentDetailsUpdate, - ) -> Result { - let mut locked_payments = self.payments.lock().unwrap(); - - if let Some(payment) = locked_payments.get_mut(&update.id) { - let updated = payment.update(update); - if updated { - self.persist_info(&update.id, payment)?; - Ok(PaymentStoreUpdateResult::Updated) - } else { - Ok(PaymentStoreUpdateResult::Unchanged) - } - } else { - Ok(PaymentStoreUpdateResult::NotFound) - } - } - - pub(crate) fn list_filter bool>( - &self, f: F, - ) -> Vec { - self.payments.lock().unwrap().values().filter(f).cloned().collect::>() - } - - fn persist_info(&self, id: &PaymentId, payment: &PaymentDetails) -> Result<(), Error> { - let store_key = hex_utils::to_string(&id.0); - let data = payment.encode(); - self.kv_store - .write( - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - &store_key, - &data, - ) - .map_err(|e| { - log_error!( - self.logger, - "Write for key {}/{}/{} failed due to: {}", - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - store_key, - e - ); - Error::PersistenceFailed - })?; - Ok(()) +impl StorableObjectUpdate for PaymentDetailsUpdate { + fn id(&self) -> ::Id { + self.id } } @@ -730,11 +609,7 @@ where mod tests { use super::*; use bitcoin::io::Cursor; - use lightning::util::{ - ser::Readable, - test_utils::{TestLogger, TestStore}, - }; - use std::sync::Arc; + use lightning::util::ser::Readable; /// We refactored `PaymentDetails` to hold a payment id and moved some required fields into /// `PaymentKind`. Here, we keep the old layout available in order test de/ser compatibility. @@ -759,69 +634,6 @@ mod tests { (10, status, required) }); - #[test] - fn payment_info_is_persisted() { - let store: Arc = Arc::new(TestStore::new(false)); - let logger = Arc::new(TestLogger::new()); - let payment_store = PaymentStore::new(Vec::new(), Arc::clone(&store), logger); - - let hash = PaymentHash([42u8; 32]); - let id = PaymentId([42u8; 32]); - assert!(payment_store.get(&id).is_none()); - - let store_key = hex_utils::to_string(&hash.0); - assert!(store - .read( - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - &store_key - ) - .is_err()); - - let kind = PaymentKind::Bolt11 { hash, preimage: None, secret: None }; - let payment = PaymentDetails::new( - id, - kind, - None, - None, - PaymentDirection::Inbound, - PaymentStatus::Pending, - ); - - assert_eq!(Ok(false), payment_store.insert(payment.clone())); - assert!(payment_store.get(&id).is_some()); - assert!(store - .read( - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - &store_key - ) - .is_ok()); - - assert_eq!(Ok(true), payment_store.insert(payment)); - assert!(payment_store.get(&id).is_some()); - - // Check update returns `Updated` - let mut update = PaymentDetailsUpdate::new(id); - update.status = Some(PaymentStatus::Succeeded); - assert_eq!(Ok(PaymentStoreUpdateResult::Updated), payment_store.update(&update)); - - // Check no-op update yields `Unchanged` - let mut update = PaymentDetailsUpdate::new(id); - update.status = Some(PaymentStatus::Succeeded); - assert_eq!(Ok(PaymentStoreUpdateResult::Unchanged), payment_store.update(&update)); - - // Check bogus update yields `NotFound` - let bogus_id = PaymentId([84u8; 32]); - let mut update = PaymentDetailsUpdate::new(bogus_id); - update.status = Some(PaymentStatus::Succeeded); - assert_eq!(Ok(PaymentStoreUpdateResult::NotFound), payment_store.update(&update)); - - assert!(payment_store.get(&id).is_some()); - - assert_eq!(PaymentStatus::Succeeded, payment_store.get(&id).unwrap().status); - } - #[test] fn old_payment_details_deser_compat() { // We refactored `PaymentDetails` to hold a payment id and moved some required fields into diff --git a/src/types.rs b/src/types.rs index 1c9ab64b9..3103ead3f 100644 --- a/src/types.rs +++ b/src/types.rs @@ -7,10 +7,12 @@ use crate::chain::ChainSource; use crate::config::ChannelConfig; +use crate::data_store::DataStore; use crate::fee_estimator::OnchainFeeEstimator; use crate::gossip::RuntimeSpawner; use crate::logger::Logger; use crate::message_handler::NodeCustomMessageHandler; +use crate::payment::PaymentDetails; use lightning::chain::chainmonitor; use lightning::impl_writeable_tlv_based; @@ -143,6 +145,8 @@ pub(crate) type BumpTransactionEventHandler = Arc, >; +pub(crate) type PaymentStore = DataStore>; + /// A local, potentially user-provided, identifier of a channel. /// /// By default, this will be randomly generated for the user to ensure local uniqueness. diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index c4ef15731..6e5a0ddea 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -8,11 +8,12 @@ use persist::KVStoreWalletPersister; use crate::config::Config; -use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; +use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger}; use crate::fee_estimator::{ConfirmationTarget, FeeEstimator}; -use crate::payment::store::{ConfirmationStatus, PaymentStore}; +use crate::payment::store::ConfirmationStatus; use crate::payment::{PaymentDetails, PaymentDirection, PaymentStatus}; +use crate::types::PaymentStore; use crate::Error; use lightning::chain::chaininterface::BroadcasterInterface; @@ -73,7 +74,7 @@ where persister: Mutex, broadcaster: B, fee_estimator: E, - payment_store: Arc>>, + payment_store: Arc, config: Arc, logger: L, } @@ -87,7 +88,7 @@ where pub(crate) fn new( wallet: bdk_wallet::PersistedWallet, wallet_persister: KVStoreWalletPersister, broadcaster: B, fee_estimator: E, - payment_store: Arc>>, config: Arc, logger: L, + payment_store: Arc, config: Arc, logger: L, ) -> Self { let inner = Mutex::new(wallet); let persister = Mutex::new(wallet_persister); @@ -218,7 +219,7 @@ where payment_status, ); - self.payment_store.insert_or_update(&payment)?; + self.payment_store.insert_or_update(payment)?; } Ok(())