Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions magicblock-aperture/src/encoder.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::fmt::Debug;

use hyper::body::Bytes;
use json::Serialize;
use magicblock_core::{
Expand All @@ -20,7 +22,7 @@ use crate::{

/// An abstraction trait over types which specialize in turning various
/// websocket notification payload types into sequence of bytes
pub(crate) trait Encoder: Ord + Eq + Clone {
pub(crate) trait Encoder: Ord + Eq + Clone + Debug {
type Data;
fn encode(
&self,
Expand Down Expand Up @@ -64,7 +66,7 @@ impl From<UiAccountEncoding> for AccountEncoder {
}

/// A `programSubscribe` payload encoder
#[derive(PartialEq, PartialOrd, Ord, Eq, Clone)]
#[derive(PartialEq, PartialOrd, Ord, Eq, Clone, Debug)]
pub struct ProgramAccountEncoder {
pub encoder: AccountEncoder,
pub filters: ProgramFilters,
Expand Down Expand Up @@ -106,7 +108,7 @@ impl Encoder for ProgramAccountEncoder {
}

/// A `signatureSubscribe` payload encoder
#[derive(PartialEq, PartialOrd, Ord, Eq, Clone)]
#[derive(PartialEq, PartialOrd, Ord, Eq, Clone, Debug)]
pub(crate) struct TransactionResultEncoder;

impl Encoder for TransactionResultEncoder {
Expand All @@ -130,7 +132,7 @@ impl Encoder for TransactionResultEncoder {
}

/// A `logsSubscribe` payload encoder
#[derive(PartialEq, PartialOrd, Ord, Eq, Clone)]
#[derive(PartialEq, PartialOrd, Ord, Eq, Clone, Debug)]
pub(crate) enum TransactionLogsEncoder {
All,
Mentions(Pubkey),
Expand Down Expand Up @@ -172,7 +174,7 @@ impl Encoder for TransactionLogsEncoder {
}

/// A `slotSubscribe` payload encoder
#[derive(PartialEq, PartialOrd, Ord, Eq, Clone)]
#[derive(PartialEq, PartialOrd, Ord, Eq, Clone, Debug)]
pub(crate) struct SlotEncoder;

impl Encoder for SlotEncoder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ impl WsDispatcher {
.subscribe_to_account(pubkey, encoder, self.chan.clone())
.await;

let result = SubResult::SubId(handle.id);
// Store the cleanup handle to manage the subscription's lifecycle.
self.unsubs.insert(handle.id, handle.cleanup);
Ok(SubResult::SubId(handle.id))
self.register_unsub(handle);

Ok(result)
}
}
7 changes: 5 additions & 2 deletions magicblock-aperture/src/requests/websocket/log_subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ impl WsDispatcher {
.subscriptions
.subscribe_to_logs(encoder, self.chan.clone());

self.unsubs.insert(handle.id, handle.cleanup);
Ok(SubResult::SubId(handle.id))
let result = SubResult::SubId(handle.id);
// Store the cleanup handle to manage the subscription's lifecycle.
self.register_unsub(handle);

Ok(result)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ impl WsDispatcher {
.subscribe_to_program(pubkey, encoder, self.chan.clone())
.await;

self.unsubs.insert(handle.id, handle.cleanup);
Ok(SubResult::SubId(handle.id))
let result = SubResult::SubId(handle.id);
// Store the cleanup handle to manage the subscription's lifecycle.
self.register_unsub(handle);

Ok(result)
}
}
7 changes: 5 additions & 2 deletions magicblock-aperture/src/requests/websocket/slot_subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ impl WsDispatcher {
/// each time the validator advances to a new slot.
pub(crate) fn slot_subscribe(&mut self) -> RpcResult<SubResult> {
let handle = self.subscriptions.subscribe_to_slot(self.chan.clone());
self.unsubs.insert(handle.id, handle.cleanup);
Ok(SubResult::SubId(handle.id))
let result = SubResult::SubId(handle.id);
// Store the cleanup handle to manage the subscription's lifecycle.
self.register_unsub(handle);

Ok(result)
}
}
14 changes: 13 additions & 1 deletion magicblock-aperture/src/server/websocket/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use crate::{
requests::{JsonRpcWsMethod, JsonWsRequest},
state::{
signatures::SignaturesExpirer,
subscriptions::{CleanUp, SubscriptionID, SubscriptionsDb},
subscriptions::{
CleanUp, SubscriptionHandle, SubscriptionID, SubscriptionsDb,
},
transactions::TransactionsCache,
},
RpcResult,
Expand Down Expand Up @@ -123,6 +125,16 @@ impl WsDispatcher {
let success = self.unsubs.remove(&id).is_some();
Ok(SubResult::Unsub(success))
}

/// Register the unsubscription callback for the new subscription on this connection
pub(crate) fn register_unsub(&mut self, handle: SubscriptionHandle) {
let cleanup = self.unsubs.insert(handle.id, handle.cleanup);
// If we have a duplicate subscription, drop the
// previous cleanup callback to prevent double-cleanup
if let Some(mut callback) = cleanup {
callback.0.take();
}
}
}

/// Bundles a connection's unique ID with its dedicated sender channel.
Expand Down
8 changes: 6 additions & 2 deletions magicblock-aperture/src/state/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,11 @@ pub(crate) struct UpdateSubscriber<E> {
impl<E: Encoder> UpdateSubscribers<E> {
/// Adds a connection to the appropriate subscriber group based on the encoder.
/// If no group exists for the given encoder, a new one is created.
fn add_subscriber(&mut self, chan: WsConnectionChannel, encoder: E) -> u64 {
fn add_subscriber(
&mut self,
chan: WsConnectionChannel,
encoder: E,
) -> SubscriptionID {
match self.0.binary_search_by(|s| s.encoder.cmp(&encoder)) {
// A subscriber group with this encoder already exists.
Ok(index) => {
Expand Down Expand Up @@ -380,7 +384,7 @@ pub(crate) struct SubscriptionHandle {

/// A RAII guard that executes an asynchronous cleanup task when dropped.
pub(crate) struct CleanUp(
Option<Pin<Box<dyn Future<Output = ()> + Send + Sync>>>,
pub(crate) Option<Pin<Box<dyn Future<Output = ()> + Send + Sync>>>,
);

impl Drop for CleanUp {
Expand Down
4 changes: 2 additions & 2 deletions magicblock-aperture/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ impl Body for JsonBody {
}

/// A single, server-side filter for `getProgramAccounts`.
#[derive(PartialEq, PartialOrd, Ord, Eq, Clone)]
#[derive(PartialEq, PartialOrd, Ord, Eq, Clone, Debug)]
pub(crate) enum ProgramFilter {
DataSize(usize),
MemCmp { offset: usize, bytes: Vec<u8> },
}

/// A collection of server-side filters for `getProgramAccounts`.
#[derive(PartialEq, PartialOrd, Ord, Eq, Clone, Default)]
#[derive(PartialEq, PartialOrd, Ord, Eq, Clone, Default, Debug)]
pub(crate) struct ProgramFilters(Vec<ProgramFilter>);

impl ProgramFilter {
Expand Down