Skip to content

Commit 2617901

Browse files
authored
fix: don't increment subscription id for dups (#610)
<!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **Refactor** * Enhanced internal debugging capabilities across encoder types and filter structures. * Improved subscription lifecycle management through centralized registration handling, replacing direct insertion patterns. * **Chores** * Updated type exports and visibility modifiers for subscription-related infrastructure. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 1faae18 commit 2617901

File tree

8 files changed

+47
-18
lines changed

8 files changed

+47
-18
lines changed

magicblock-aperture/src/encoder.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::fmt::Debug;
2+
13
use hyper::body::Bytes;
24
use json::Serialize;
35
use magicblock_core::{
@@ -20,7 +22,7 @@ use crate::{
2022

2123
/// An abstraction trait over types which specialize in turning various
2224
/// websocket notification payload types into sequence of bytes
23-
pub(crate) trait Encoder: Ord + Eq + Clone {
25+
pub(crate) trait Encoder: Ord + Eq + Clone + Debug {
2426
type Data;
2527
fn encode(
2628
&self,
@@ -64,7 +66,7 @@ impl From<UiAccountEncoding> for AccountEncoder {
6466
}
6567

6668
/// A `programSubscribe` payload encoder
67-
#[derive(PartialEq, PartialOrd, Ord, Eq, Clone)]
69+
#[derive(PartialEq, PartialOrd, Ord, Eq, Clone, Debug)]
6870
pub struct ProgramAccountEncoder {
6971
pub encoder: AccountEncoder,
7072
pub filters: ProgramFilters,
@@ -106,7 +108,7 @@ impl Encoder for ProgramAccountEncoder {
106108
}
107109

108110
/// A `signatureSubscribe` payload encoder
109-
#[derive(PartialEq, PartialOrd, Ord, Eq, Clone)]
111+
#[derive(PartialEq, PartialOrd, Ord, Eq, Clone, Debug)]
110112
pub(crate) struct TransactionResultEncoder;
111113

112114
impl Encoder for TransactionResultEncoder {
@@ -130,7 +132,7 @@ impl Encoder for TransactionResultEncoder {
130132
}
131133

132134
/// A `logsSubscribe` payload encoder
133-
#[derive(PartialEq, PartialOrd, Ord, Eq, Clone)]
135+
#[derive(PartialEq, PartialOrd, Ord, Eq, Clone, Debug)]
134136
pub(crate) enum TransactionLogsEncoder {
135137
All,
136138
Mentions(Pubkey),
@@ -172,7 +174,7 @@ impl Encoder for TransactionLogsEncoder {
172174
}
173175

174176
/// A `slotSubscribe` payload encoder
175-
#[derive(PartialEq, PartialOrd, Ord, Eq, Clone)]
177+
#[derive(PartialEq, PartialOrd, Ord, Eq, Clone, Debug)]
176178
pub(crate) struct SlotEncoder;
177179

178180
impl Encoder for SlotEncoder {

magicblock-aperture/src/requests/websocket/account_subscribe.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@ impl WsDispatcher {
3232
.subscribe_to_account(pubkey, encoder, self.chan.clone())
3333
.await;
3434

35+
let result = SubResult::SubId(handle.id);
3536
// Store the cleanup handle to manage the subscription's lifecycle.
36-
self.unsubs.insert(handle.id, handle.cleanup);
37-
Ok(SubResult::SubId(handle.id))
37+
self.register_unsub(handle);
38+
39+
Ok(result)
3840
}
3941
}

magicblock-aperture/src/requests/websocket/log_subscribe.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@ impl WsDispatcher {
3535
.subscriptions
3636
.subscribe_to_logs(encoder, self.chan.clone());
3737

38-
self.unsubs.insert(handle.id, handle.cleanup);
39-
Ok(SubResult::SubId(handle.id))
38+
let result = SubResult::SubId(handle.id);
39+
// Store the cleanup handle to manage the subscription's lifecycle.
40+
self.register_unsub(handle);
41+
42+
Ok(result)
4043
}
4144
}

magicblock-aperture/src/requests/websocket/program_subscribe.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,10 @@ impl WsDispatcher {
4343
.subscribe_to_program(pubkey, encoder, self.chan.clone())
4444
.await;
4545

46-
self.unsubs.insert(handle.id, handle.cleanup);
47-
Ok(SubResult::SubId(handle.id))
46+
let result = SubResult::SubId(handle.id);
47+
// Store the cleanup handle to manage the subscription's lifecycle.
48+
self.register_unsub(handle);
49+
50+
Ok(result)
4851
}
4952
}

magicblock-aperture/src/requests/websocket/slot_subscribe.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@ impl WsDispatcher {
77
/// each time the validator advances to a new slot.
88
pub(crate) fn slot_subscribe(&mut self) -> RpcResult<SubResult> {
99
let handle = self.subscriptions.subscribe_to_slot(self.chan.clone());
10-
self.unsubs.insert(handle.id, handle.cleanup);
11-
Ok(SubResult::SubId(handle.id))
10+
let result = SubResult::SubId(handle.id);
11+
// Store the cleanup handle to manage the subscription's lifecycle.
12+
self.register_unsub(handle);
13+
14+
Ok(result)
1215
}
1316
}

magicblock-aperture/src/server/websocket/dispatch.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ use crate::{
1212
requests::{JsonRpcWsMethod, JsonWsRequest},
1313
state::{
1414
signatures::SignaturesExpirer,
15-
subscriptions::{CleanUp, SubscriptionID, SubscriptionsDb},
15+
subscriptions::{
16+
CleanUp, SubscriptionHandle, SubscriptionID, SubscriptionsDb,
17+
},
1618
transactions::TransactionsCache,
1719
},
1820
RpcResult,
@@ -123,6 +125,16 @@ impl WsDispatcher {
123125
let success = self.unsubs.remove(&id).is_some();
124126
Ok(SubResult::Unsub(success))
125127
}
128+
129+
/// Register the unsubscription callback for the new subscription on this connection
130+
pub(crate) fn register_unsub(&mut self, handle: SubscriptionHandle) {
131+
let cleanup = self.unsubs.insert(handle.id, handle.cleanup);
132+
// If we have a duplicate subscription, drop the
133+
// previous cleanup callback to prevent double-cleanup
134+
if let Some(mut callback) = cleanup {
135+
callback.0.take();
136+
}
137+
}
126138
}
127139

128140
/// Bundles a connection's unique ID with its dedicated sender channel.

magicblock-aperture/src/state/subscriptions.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,11 @@ pub(crate) struct UpdateSubscriber<E> {
285285
impl<E: Encoder> UpdateSubscribers<E> {
286286
/// Adds a connection to the appropriate subscriber group based on the encoder.
287287
/// If no group exists for the given encoder, a new one is created.
288-
fn add_subscriber(&mut self, chan: WsConnectionChannel, encoder: E) -> u64 {
288+
fn add_subscriber(
289+
&mut self,
290+
chan: WsConnectionChannel,
291+
encoder: E,
292+
) -> SubscriptionID {
289293
match self.0.binary_search_by(|s| s.encoder.cmp(&encoder)) {
290294
// A subscriber group with this encoder already exists.
291295
Ok(index) => {
@@ -380,7 +384,7 @@ pub(crate) struct SubscriptionHandle {
380384

381385
/// A RAII guard that executes an asynchronous cleanup task when dropped.
382386
pub(crate) struct CleanUp(
383-
Option<Pin<Box<dyn Future<Output = ()> + Send + Sync>>>,
387+
pub(crate) Option<Pin<Box<dyn Future<Output = ()> + Send + Sync>>>,
384388
);
385389

386390
impl Drop for CleanUp {

magicblock-aperture/src/utils.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,14 +49,14 @@ impl Body for JsonBody {
4949
}
5050

5151
/// A single, server-side filter for `getProgramAccounts`.
52-
#[derive(PartialEq, PartialOrd, Ord, Eq, Clone)]
52+
#[derive(PartialEq, PartialOrd, Ord, Eq, Clone, Debug)]
5353
pub(crate) enum ProgramFilter {
5454
DataSize(usize),
5555
MemCmp { offset: usize, bytes: Vec<u8> },
5656
}
5757

5858
/// A collection of server-side filters for `getProgramAccounts`.
59-
#[derive(PartialEq, PartialOrd, Ord, Eq, Clone, Default)]
59+
#[derive(PartialEq, PartialOrd, Ord, Eq, Clone, Default, Debug)]
6060
pub(crate) struct ProgramFilters(Vec<ProgramFilter>);
6161

6262
impl ProgramFilter {

0 commit comments

Comments
 (0)