diff --git a/magicblock-aperture/src/encoder.rs b/magicblock-aperture/src/encoder.rs index b0a026f68..644d94807 100644 --- a/magicblock-aperture/src/encoder.rs +++ b/magicblock-aperture/src/encoder.rs @@ -1,3 +1,5 @@ +use std::fmt::Debug; + use hyper::body::Bytes; use json::Serialize; use magicblock_core::{ @@ -20,7 +22,7 @@ use crate::{ /// An abstraction trait over types which specialize in turning various /// websocket notification payload types into sequence of bytes -pub(crate) trait Encoder: Ord + Eq + Clone { +pub(crate) trait Encoder: Ord + Eq + Clone + Debug { type Data; fn encode( &self, @@ -64,7 +66,7 @@ impl From for AccountEncoder { } /// A `programSubscribe` payload encoder -#[derive(PartialEq, PartialOrd, Ord, Eq, Clone)] +#[derive(PartialEq, PartialOrd, Ord, Eq, Clone, Debug)] pub struct ProgramAccountEncoder { pub encoder: AccountEncoder, pub filters: ProgramFilters, @@ -106,7 +108,7 @@ impl Encoder for ProgramAccountEncoder { } /// A `signatureSubscribe` payload encoder -#[derive(PartialEq, PartialOrd, Ord, Eq, Clone)] +#[derive(PartialEq, PartialOrd, Ord, Eq, Clone, Debug)] pub(crate) struct TransactionResultEncoder; impl Encoder for TransactionResultEncoder { @@ -130,7 +132,7 @@ impl Encoder for TransactionResultEncoder { } /// A `logsSubscribe` payload encoder -#[derive(PartialEq, PartialOrd, Ord, Eq, Clone)] +#[derive(PartialEq, PartialOrd, Ord, Eq, Clone, Debug)] pub(crate) enum TransactionLogsEncoder { All, Mentions(Pubkey), @@ -172,7 +174,7 @@ impl Encoder for TransactionLogsEncoder { } /// A `slotSubscribe` payload encoder -#[derive(PartialEq, PartialOrd, Ord, Eq, Clone)] +#[derive(PartialEq, PartialOrd, Ord, Eq, Clone, Debug)] pub(crate) struct SlotEncoder; impl Encoder for SlotEncoder { diff --git a/magicblock-aperture/src/requests/websocket/account_subscribe.rs b/magicblock-aperture/src/requests/websocket/account_subscribe.rs index 3ddb59330..38cb8ca0d 100644 --- a/magicblock-aperture/src/requests/websocket/account_subscribe.rs +++ b/magicblock-aperture/src/requests/websocket/account_subscribe.rs @@ -32,8 +32,10 @@ impl WsDispatcher { .subscribe_to_account(pubkey, encoder, self.chan.clone()) .await; + let result = SubResult::SubId(handle.id); // Store the cleanup handle to manage the subscription's lifecycle. - self.unsubs.insert(handle.id, handle.cleanup); - Ok(SubResult::SubId(handle.id)) + self.register_unsub(handle); + + Ok(result) } } diff --git a/magicblock-aperture/src/requests/websocket/log_subscribe.rs b/magicblock-aperture/src/requests/websocket/log_subscribe.rs index efc5182a4..c6a8375b7 100644 --- a/magicblock-aperture/src/requests/websocket/log_subscribe.rs +++ b/magicblock-aperture/src/requests/websocket/log_subscribe.rs @@ -35,7 +35,10 @@ impl WsDispatcher { .subscriptions .subscribe_to_logs(encoder, self.chan.clone()); - self.unsubs.insert(handle.id, handle.cleanup); - Ok(SubResult::SubId(handle.id)) + let result = SubResult::SubId(handle.id); + // Store the cleanup handle to manage the subscription's lifecycle. + self.register_unsub(handle); + + Ok(result) } } diff --git a/magicblock-aperture/src/requests/websocket/program_subscribe.rs b/magicblock-aperture/src/requests/websocket/program_subscribe.rs index ff1f35fa5..9e13f75a7 100644 --- a/magicblock-aperture/src/requests/websocket/program_subscribe.rs +++ b/magicblock-aperture/src/requests/websocket/program_subscribe.rs @@ -43,7 +43,10 @@ impl WsDispatcher { .subscribe_to_program(pubkey, encoder, self.chan.clone()) .await; - self.unsubs.insert(handle.id, handle.cleanup); - Ok(SubResult::SubId(handle.id)) + let result = SubResult::SubId(handle.id); + // Store the cleanup handle to manage the subscription's lifecycle. + self.register_unsub(handle); + + Ok(result) } } diff --git a/magicblock-aperture/src/requests/websocket/slot_subscribe.rs b/magicblock-aperture/src/requests/websocket/slot_subscribe.rs index 52cf8521d..b17c6cdda 100644 --- a/magicblock-aperture/src/requests/websocket/slot_subscribe.rs +++ b/magicblock-aperture/src/requests/websocket/slot_subscribe.rs @@ -7,7 +7,10 @@ impl WsDispatcher { /// each time the validator advances to a new slot. pub(crate) fn slot_subscribe(&mut self) -> RpcResult { let handle = self.subscriptions.subscribe_to_slot(self.chan.clone()); - self.unsubs.insert(handle.id, handle.cleanup); - Ok(SubResult::SubId(handle.id)) + let result = SubResult::SubId(handle.id); + // Store the cleanup handle to manage the subscription's lifecycle. + self.register_unsub(handle); + + Ok(result) } } diff --git a/magicblock-aperture/src/server/websocket/dispatch.rs b/magicblock-aperture/src/server/websocket/dispatch.rs index cac407de7..5d4ead662 100644 --- a/magicblock-aperture/src/server/websocket/dispatch.rs +++ b/magicblock-aperture/src/server/websocket/dispatch.rs @@ -12,7 +12,9 @@ use crate::{ requests::{JsonRpcWsMethod, JsonWsRequest}, state::{ signatures::SignaturesExpirer, - subscriptions::{CleanUp, SubscriptionID, SubscriptionsDb}, + subscriptions::{ + CleanUp, SubscriptionHandle, SubscriptionID, SubscriptionsDb, + }, transactions::TransactionsCache, }, RpcResult, @@ -123,6 +125,16 @@ impl WsDispatcher { let success = self.unsubs.remove(&id).is_some(); Ok(SubResult::Unsub(success)) } + + /// Register the unsubscription callback for the new subscription on this connection + pub(crate) fn register_unsub(&mut self, handle: SubscriptionHandle) { + let cleanup = self.unsubs.insert(handle.id, handle.cleanup); + // If we have a duplicate subscription, drop the + // previous cleanup callback to prevent double-cleanup + if let Some(mut callback) = cleanup { + callback.0.take(); + } + } } /// Bundles a connection's unique ID with its dedicated sender channel. diff --git a/magicblock-aperture/src/state/subscriptions.rs b/magicblock-aperture/src/state/subscriptions.rs index 4fb94c530..852e034cd 100644 --- a/magicblock-aperture/src/state/subscriptions.rs +++ b/magicblock-aperture/src/state/subscriptions.rs @@ -285,7 +285,11 @@ pub(crate) struct UpdateSubscriber { impl UpdateSubscribers { /// Adds a connection to the appropriate subscriber group based on the encoder. /// If no group exists for the given encoder, a new one is created. - fn add_subscriber(&mut self, chan: WsConnectionChannel, encoder: E) -> u64 { + fn add_subscriber( + &mut self, + chan: WsConnectionChannel, + encoder: E, + ) -> SubscriptionID { match self.0.binary_search_by(|s| s.encoder.cmp(&encoder)) { // A subscriber group with this encoder already exists. Ok(index) => { @@ -380,7 +384,7 @@ pub(crate) struct SubscriptionHandle { /// A RAII guard that executes an asynchronous cleanup task when dropped. pub(crate) struct CleanUp( - Option + Send + Sync>>>, + pub(crate) Option + Send + Sync>>>, ); impl Drop for CleanUp { diff --git a/magicblock-aperture/src/utils.rs b/magicblock-aperture/src/utils.rs index d720dbce1..39ac3b8b2 100644 --- a/magicblock-aperture/src/utils.rs +++ b/magicblock-aperture/src/utils.rs @@ -49,14 +49,14 @@ impl Body for JsonBody { } /// A single, server-side filter for `getProgramAccounts`. -#[derive(PartialEq, PartialOrd, Ord, Eq, Clone)] +#[derive(PartialEq, PartialOrd, Ord, Eq, Clone, Debug)] pub(crate) enum ProgramFilter { DataSize(usize), MemCmp { offset: usize, bytes: Vec }, } /// A collection of server-side filters for `getProgramAccounts`. -#[derive(PartialEq, PartialOrd, Ord, Eq, Clone, Default)] +#[derive(PartialEq, PartialOrd, Ord, Eq, Clone, Default, Debug)] pub(crate) struct ProgramFilters(Vec); impl ProgramFilter {