From 6b8b874215a5c11ba227c032c8a443b0f13dfa62 Mon Sep 17 00:00:00 2001 From: Philip Kannegaard Hayes Date: Fri, 2 May 2025 12:21:20 -0700 Subject: [PATCH] ln: om: add option to internally queue forwards to offline peers Adds an option to `OnionMessenger` to have it queue all onion message forwards to offline peers. Context: when someone requests or pays a BOLT12 invoice to one of our user nodes, our LSP may need to first spin up their node before it can finally forward the onion message. Why not use `intercept_messages_for_offline_peers`? We'd have to rebuild almost the exact same `message_recipients` queue outside LDK. With this change, the logic turned out super simple on our end since we just have to handle the `ConnectionNeeded` event. All the pending messages then automatically forward once the peer connects. If the connection fails or it's not one of our peers, the pending messages will just naturally churn out in a few timer ticks. --- lightning/src/onion_message/messenger.rs | 72 +++++++++++++++++++++++- 1 file changed, 69 insertions(+), 3 deletions(-) diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index 6009a276976..95d77c20107 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -318,6 +318,7 @@ pub struct OnionMessenger< dns_resolver_handler: DRH, custom_handler: CMH, intercept_messages_for_offline_peers: bool, + queue_messages_for_offline_peers: bool, pending_intercepted_msgs_events: Mutex>, pending_peer_connected_events: Mutex>, pending_events_processor: AtomicBool, @@ -1233,6 +1234,8 @@ where entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, message_router: MR, offers_handler: OMH, async_payments_handler: APH, dns_resolver: DRH, custom_handler: CMH, ) -> Self { + let intercept_messages_for_offline_peers = false; + let queue_messages_for_offline_peers = false; Self::new_inner( entropy_source, node_signer, @@ -1243,7 +1246,8 @@ where async_payments_handler, dns_resolver, custom_handler, - false, + intercept_messages_for_offline_peers, + queue_messages_for_offline_peers, ) } @@ -1272,6 +1276,8 @@ where entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, message_router: MR, offers_handler: OMH, async_payments_handler: APH, dns_resolver: DRH, custom_handler: CMH, ) -> Self { + let intercept_messages_for_offline_peers = true; + let queue_messages_for_offline_peers = false; Self::new_inner( entropy_source, node_signer, @@ -1282,15 +1288,48 @@ where async_payments_handler, dns_resolver, custom_handler, - true, + intercept_messages_for_offline_peers, + queue_messages_for_offline_peers, + ) + } + + /// Similar to [`Self::new`], but rather than dropping onion messages that + /// are intended to be forwarded to offline peers, we'll queue them until + /// the peer connects or two timer ticks pass. + /// + /// A single [`Event::ConnectionNeeded`] event with just the NodeId and no + /// addresses will be generated once a message queues for an offline peer. + pub fn new_with_offline_peer_queueing( + entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, message_router: MR, + offers_handler: OMH, async_payments_handler: APH, dns_resolver: DRH, custom_handler: CMH, + ) -> Self { + let intercept_messages_for_offline_peers = false; + let queue_messages_for_offline_peers = true; + Self::new_inner( + entropy_source, + node_signer, + logger, + node_id_lookup, + message_router, + offers_handler, + async_payments_handler, + dns_resolver, + custom_handler, + intercept_messages_for_offline_peers, + queue_messages_for_offline_peers, ) } fn new_inner( entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, message_router: MR, offers_handler: OMH, async_payments_handler: APH, dns_resolver: DRH, custom_handler: CMH, - intercept_messages_for_offline_peers: bool, + intercept_messages_for_offline_peers: bool, queue_messages_for_offline_peers: bool, ) -> Self { + debug_assert!( + !(intercept_messages_for_offline_peers && queue_messages_for_offline_peers), + "Can't generate intercept events and queue messages for offline peers at the same time", + ); + let mut secp_ctx = Secp256k1::new(); secp_ctx.seeded_randomize(&entropy_source.get_secure_random_bytes()); OnionMessenger { @@ -1306,6 +1345,7 @@ where dns_resolver_handler: dns_resolver, custom_handler, intercept_messages_for_offline_peers, + queue_messages_for_offline_peers, pending_intercepted_msgs_events: Mutex::new(Vec::new()), pending_peer_connected_events: Mutex::new(Vec::new()), pending_events_processor: AtomicBool::new(false), @@ -2023,6 +2063,32 @@ where .entry(next_node_id) .or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new())); + // When enabled, we'll queue all messages, even for offline peers + // and peers pending connection. + if self.queue_messages_for_offline_peers { + match message_recipients.entry(next_node_id) { + hash_map::Entry::Occupied(mut e) => { + log_trace!(logger, "Forwarding onion message peer {next_node_id}"); + e.get_mut().enqueue_message(onion_message); + }, + hash_map::Entry::Vacant(e) => { + log_trace!( + logger, + "Forwarding onion message to disconnected peer {next_node_id}: \ + awaiting connection" + ); + let addrs = Vec::new(); + e.insert(OnionMessageRecipient::pending_connection(addrs)) + .enqueue_message(onion_message); + // Notify the background processor that we need to + // connect to this peer. + self.event_notifier.notify(); + }, + }; + return; + } + + // Otherwise, only forward to connected peers. match message_recipients.entry(next_node_id) { hash_map::Entry::Occupied(mut e) if matches!(e.get(), OnionMessageRecipient::ConnectedPeer(..)) =>