Skip to content

Commit 5fb2ae3

Browse files
committed
Remove pruned LSPS2/LSPS5 peer state entries from the KVStore
Previously, we'd persist peer states to the `KVStore`, but, while we pruned them eventually from our in-memory state, we wouldn't remove it from the `KVStore`. Here, we change this and regularly prune and delete peer state entries from the `KVStore`. Note we still prune the state-internal data on peer disconnection, but leave removal to our (BP-driven) async `persist` calls.
1 parent 87a2987 commit 5fb2ae3

File tree

3 files changed

+104
-28
lines changed

3 files changed

+104
-28
lines changed

lightning-liquidity/src/lsps2/service.rs

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,7 @@ impl PeerState {
514514
// We abort the flow, and prune any data kept.
515515
self.intercept_scid_by_channel_id.retain(|_, iscid| intercept_scid != iscid);
516516
self.intercept_scid_by_user_channel_id.retain(|_, iscid| intercept_scid != iscid);
517-
// TODO: Remove peer state entry from the KVStore
517+
self.needs_persist |= true;
518518
return false;
519519
}
520520
true
@@ -1662,28 +1662,44 @@ where
16621662
}
16631663

16641664
pub(crate) fn peer_disconnected(&self, counterparty_node_id: PublicKey) {
1665-
let mut outer_state_lock = self.per_peer_state.write().unwrap();
1666-
let is_prunable =
1667-
if let Some(inner_state_lock) = outer_state_lock.get(&counterparty_node_id) {
1668-
let mut peer_state_lock = inner_state_lock.lock().unwrap();
1669-
peer_state_lock.prune_expired_request_state();
1670-
peer_state_lock.is_prunable()
1671-
} else {
1672-
return;
1673-
};
1674-
if is_prunable {
1675-
outer_state_lock.remove(&counterparty_node_id);
1665+
let outer_state_lock = self.per_peer_state.write().unwrap();
1666+
if let Some(inner_state_lock) = outer_state_lock.get(&counterparty_node_id) {
1667+
let mut peer_state_lock = inner_state_lock.lock().unwrap();
1668+
// We clean up the peer state, but leave removing the peer entry to `prune_peer_state`
1669+
// which also removes it from the store.
1670+
peer_state_lock.prune_expired_request_state();
16761671
}
16771672
}
16781673

16791674
#[allow(clippy::bool_comparison)]
1680-
pub(crate) fn prune_peer_state(&self) {
1681-
let mut outer_state_lock = self.per_peer_state.write().unwrap();
1682-
outer_state_lock.retain(|_, inner_state_lock| {
1683-
let mut peer_state_lock = inner_state_lock.lock().unwrap();
1684-
peer_state_lock.prune_expired_request_state();
1685-
peer_state_lock.is_prunable() == false
1686-
});
1675+
pub(crate) async fn prune_peer_state(&self) {
1676+
let mut need_remove = Vec::new();
1677+
1678+
{
1679+
let mut outer_state_lock = self.per_peer_state.write().unwrap();
1680+
outer_state_lock.retain(|counterparty_node_id, inner_state_lock| {
1681+
let mut peer_state_lock = inner_state_lock.lock().unwrap();
1682+
peer_state_lock.prune_expired_request_state();
1683+
let is_prunable = peer_state_lock.is_prunable();
1684+
if is_prunable {
1685+
need_remove.push(*counterparty_node_id);
1686+
}
1687+
is_prunable == false
1688+
});
1689+
}
1690+
1691+
for counterparty_node_id in need_remove {
1692+
let key = counterparty_node_id.to_string();
1693+
let _ = self
1694+
.kv_store
1695+
.remove(
1696+
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1697+
LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
1698+
&key,
1699+
true,
1700+
)
1701+
.await;
1702+
}
16871703
}
16881704
}
16891705

lightning-liquidity/src/lsps5/service.rs

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -270,15 +270,47 @@ where
270270
});
271271

272272
if should_prune {
273+
for (_, peer_state) in outer_state_lock.iter_mut() {
274+
// Prune stale webhooks, but leave removal of the peers states to prune_peer_state
275+
// which will also remove it from the store.
276+
peer_state.prune_stale_webhooks(now)
277+
}
278+
*last_pruning = Some(now);
279+
}
280+
}
281+
282+
pub(crate) async fn prune_peer_state(&self) {
283+
let mut need_remove = Vec::new();
284+
285+
{
286+
let mut outer_state_lock = self.per_peer_state.write().unwrap();
287+
self.check_prune_stale_webhooks(&mut outer_state_lock);
288+
273289
outer_state_lock.retain(|client_id, peer_state| {
274290
if self.client_has_open_channel(client_id) {
275291
// Don't prune clients with open channels
276292
return true;
277293
}
278-
// TODO: Remove peer state entry from the KVStore
279-
!peer_state.prune_stale_webhooks(now)
294+
295+
let is_prunable = peer_state.is_prunable();
296+
if is_prunable {
297+
need_remove.push(*client_id);
298+
}
299+
!is_prunable
280300
});
281-
*last_pruning = Some(now);
301+
}
302+
303+
for counterparty_node_id in need_remove {
304+
let key = counterparty_node_id.to_string();
305+
let _ = self
306+
.kv_store
307+
.remove(
308+
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
309+
LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
310+
&key,
311+
true,
312+
)
313+
.await;
282314
}
283315
}
284316

@@ -733,11 +765,17 @@ impl PeerState {
733765
}
734766

735767
// Returns whether the entire state is empty and can be pruned.
736-
fn prune_stale_webhooks(&mut self, now: LSPSDateTime) -> bool {
768+
fn prune_stale_webhooks(&mut self, now: LSPSDateTime) {
737769
self.webhooks.retain(|(_, webhook)| {
738-
now.duration_since(&webhook.last_used) < MIN_WEBHOOK_RETENTION_DAYS
770+
let should_prune = now.duration_since(&webhook.last_used) >= MIN_WEBHOOK_RETENTION_DAYS;
771+
if should_prune {
772+
self.needs_persist |= true;
773+
}
774+
!should_prune
739775
});
776+
}
740777

778+
fn is_prunable(&mut self) -> bool {
741779
self.webhooks.is_empty()
742780
}
743781
}

lightning-liquidity/src/manager.rs

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ use bitcoin::secp256k1::PublicKey;
6161
use core::future::Future as StdFuture;
6262
use core::ops::Deref;
6363
use core::task;
64+
use core::time::Duration;
6465

6566
const LSPS_FEATURE_BIT: usize = 729;
6667

@@ -311,7 +312,9 @@ pub struct LiquidityManager<
311312
service_config: Option<LiquidityServiceConfig>,
312313
_client_config: Option<LiquidityClientConfig>,
313314
best_block: RwLock<Option<BestBlock>>,
315+
last_peer_state_pruning: Mutex<Option<Duration>>,
314316
_chain_source: Option<C>,
317+
time_provider: TP,
315318
pending_msgs_or_needs_persist_notifier: Arc<Notifier>,
316319
}
317320

@@ -461,7 +464,7 @@ where
461464
kv_store.clone(),
462465
node_signer,
463466
lsps5_service_config.clone(),
464-
time_provider,
467+
time_provider.clone(),
465468
))
466469
} else {
467470
None
@@ -512,6 +515,8 @@ where
512515
None
513516
};
514517

518+
let last_peer_state_pruning = Mutex::new(None);
519+
515520
Ok(Self {
516521
pending_messages,
517522
pending_events,
@@ -529,7 +534,9 @@ where
529534
service_config,
530535
_client_config: client_config,
531536
best_block: RwLock::new(chain_params.map(|chain_params| chain_params.best_block)),
537+
last_peer_state_pruning,
532538
_chain_source: chain_source,
539+
time_provider,
533540
pending_msgs_or_needs_persist_notifier,
534541
})
535542
}
@@ -650,14 +657,32 @@ where
650657
/// This will be regularly called by LDK's background processor if necessary and only needs to
651658
/// be called manually if it's not utilized.
652659
pub async fn persist(&self) -> Result<(), lightning::io::Error> {
660+
let should_prune_state = {
661+
const PRUNE_INTERVAL: Duration = Duration::from_secs(600);
662+
let mut last_peer_state_pruning_lock = self.last_peer_state_pruning.lock().unwrap();
663+
let now = self.time_provider.duration_since_epoch();
664+
if last_peer_state_pruning_lock.map_or(true, |l| l + PRUNE_INTERVAL < now) {
665+
*last_peer_state_pruning_lock = Some(now);
666+
true
667+
} else {
668+
false
669+
}
670+
};
671+
653672
// TODO: We should eventually persist in parallel.
654673
self.pending_events.persist().await?;
655674

656675
if let Some(lsps2_service_handler) = self.lsps2_service_handler.as_ref() {
676+
if should_prune_state {
677+
lsps2_service_handler.prune_peer_state().await;
678+
}
657679
lsps2_service_handler.persist().await?;
658680
}
659681

660682
if let Some(lsps5_service_handler) = self.lsps5_service_handler.as_ref() {
683+
if should_prune_state {
684+
lsps5_service_handler.prune_peer_state().await;
685+
}
661686
lsps5_service_handler.persist().await?;
662687
}
663688

@@ -1015,9 +1040,6 @@ where
10151040
*self.best_block.write().unwrap() = Some(new_best_block);
10161041

10171042
// TODO: Call best_block_updated on all sub-modules that require it, e.g., LSPS1MessageHandler.
1018-
if let Some(lsps2_service_handler) = self.lsps2_service_handler.as_ref() {
1019-
lsps2_service_handler.prune_peer_state();
1020-
}
10211043
}
10221044

10231045
fn get_relevant_txids(&self) -> Vec<(bitcoin::Txid, u32, Option<bitcoin::BlockHash>)> {

0 commit comments

Comments
 (0)