Skip to content

Conversation

@bmuddha
Copy link
Contributor

@bmuddha bmuddha commented Oct 27, 2025

This PR replaces all of the Arc<Lock> uses in the chainlink with the lock free alternatives based on the scc crate. This is necessary performance optimization, avoiding thread locking altogether, which can be quite detrimental to the performance characteristics of asynchronous runtime (where chainlink code is invoked).

Summary by CodeRabbit

Refactor

  • Improved internal concurrency mechanisms for enhanced performance and scalability.
  • Streamlined subscription management with more efficient shutdown handling.
  • Optimized dependency structure and caching implementation.

thlorenz and others added 30 commits September 17, 2025 17:19
- needs to be cleaned up to handle all program loaders
- unwrap needs to be removed as well
- covered in newer tests
- not repeatable due to hardcoded accounts
- testing obsolete behavior (non-eager cloning)
@bmuddha bmuddha requested a review from thlorenz October 27, 2025 12:29
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Oct 27, 2025

Walkthrough

Replaced thread-safe data structure synchronization patterns across the magicblock-chainlink crate: Mutex-wrapped HashMaps replaced with lock-free scc::HashMap, LRU cache replaced with HashCache, JoinSet-based subscription watcher management replaced with CancellationToken. Updated dependencies by removing lru, adding scc and parking_lot.

Changes

Cohort / File(s) Summary
Dependency updates
Cargo.toml
Removed lru dependency; added scc and parking_lot crates for lock-free concurrency primitives.
Accounts bank refactoring
src/accounts_bank.rs
Replaced Mutex<HashMap<Pubkey, AccountSharedData>> with scc::HashMap in AccountsBankStub::accounts field. Updated all CRUD methods (insert→upsert, get→map-and-clone, mutations via in-place operations) to use lock-free APIs. Removed all mutex lock/unlock logic.
Fetch cloner synchronization
src/chainlink/fetch_cloner.rs
Replaced Arc<Mutex<HashMap<Pubkey, RemoteAccountRequests>>> with Arc<scc::HashMap> for pending_requests field. Integrated Entry API (Occupied/Vacant) for deduplication management. Removed explicit lock acquisitions throughout.
Chain pubsub actor restructuring
src/remote_account_provider/chain_pubsub_actor.rs
Replaced Arc<Mutex<HashMap<Pubkey, AccountSubscription>>> with Arc<scc::HashMap>. Replaced Arc<Mutex<tokio::task::JoinSet<()>>> with Arc<CancellationToken> for subscription watcher management. Updated handle_msg, add_sub, recycle_connections function signatures. Reworked subscription lifecycle to use child tokens and cancellation-based shutdown.
Chain pubsub client updates
src/remote_account_provider/chain_pubsub_client.rs
Replaced std::sync::Mutex with parking_lot::Mutex for updates receiver in main impl. In mock module, replaced Arc<Mutex<HashSet<Pubkey>>> with Arc<scc::HashSet> for subscribed_pubkeys field. Removed locking in mock send/subscribe/unsubscribe paths.
LRU cache replacement
src/remote_account_provider/lru_cache.rs
Replaced Mutex<LruCache<Pubkey, ()>> with scc::HashCache<Pubkey, ()>. Changed constructor signature from pub fn new(capacity: NonZeroUsize) to pub fn new(capacity: usize). Reimplemented add/promote/contains/remove methods to use HashCache API. Rewrote tests synchronously with eviction-focused scenarios.
Remote account provider internals
src/remote_account_provider/mod.rs
Replaced private Mutex<HashMap> for fetching_accounts with plain scc::HashMap using Entry API. Replaced std::sync::Mutex with parking_lot::Mutex for removed_account_rx. Updated AccountsLruCache initialization. Reworked fetch tracking, pending state management, and added eviction notification tests.
Submux concurrency refactoring
src/submux/mod.rs
Replaced Arc<Mutex<HashMap<(Pubkey, u64), Instant>>> and Arc<Mutex<HashMap<Pubkey, DebounceState>>> with Arc<scc::HashMap> variants. Converted all access patterns to use Entry API and direct operations. Updated ForwarderParams to carry lock-free maps. Replaced lock-based iteration with retain_async for debounce flushing.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

  • Multiple heterogeneous synchronization patterns: Lock-free HashMap operations, CancellationToken-based lifecycle management, and HashCache semantics require separate reasoning for each file.
  • Public API changes affecting external consumers: Field types and function signatures modified in AccountsBankStub, FetchCloner, ChainPubsubActor, ChainPubsubClientMock, and AccountsLruCache.
  • Subtle concurrency semantics: Entry API correctness, cancellation token propagation, and HashCache eviction behavior differ from mutex-guarded access patterns.
  • Test rewrite scope: Extensive test changes in lru_cache.rs and mod.rs introducing eviction verification scenarios.

Areas requiring extra attention:

  • src/chainlink/fetch_cloner.rs: Entry API deduplication logic for pending request tracking
  • src/remote_account_provider/chain_pubsub_actor.rs: CancellationToken hierarchy and child_token() scoping correctness; subscription lifecycle during shutdown
  • src/remote_account_provider/lru_cache.rs: HashCache promotion semantics vs. LRU behavior; eviction notification channel correctness
  • src/remote_account_provider/mod.rs: Entry-based state machine for pending fetchers; integration with eviction notifications
  • src/submux/mod.rs: Entry API usage in dedup_cache and debounce_states; retain_async iteration correctness

Possibly related issues

  • chore: merge and test lock free chainlink branch #583: This PR's widespread refactoring replacing Mutex-guarded HashMap/LruCache structures with lock-free scc::HashMap/HashCache and parking_lot/CancellationToken directly aligns with the lock-free chainlink branch initiative and affects the same components.

Possibly related PRs

  • feat: new RPC, Bank and Cloning Pipeline #578: Modifies the same magicblock-chainlink crate modules (accounts_bank.rs, fetch_cloner.rs, remote_account_provider/*, lru_cache.rs) with concurrent synchronization updates; this PR represents further refactoring of the same crate's data structures and concurrency primitives.

Suggested reviewers

  • thlorenz
  • GabrielePicco
  • Dodecahedr0x

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title "Lock free chainlink" accurately reflects the primary objective and scope of the changeset. The PR systematically replaces all Arc<Mutex<...>> patterns with lock-free alternatives from the scc crate across multiple files in the chainlink module, which directly aligns with the stated title. The title is concise, specific to the component being refactored, and clearly conveys the core performance optimization intent without being vague or overly broad. A developer scanning the history would immediately understand this PR addresses concurrency patterns rather than implementing new features or fixing bugs.
Docstring Coverage ✅ Passed Docstring coverage is 98.00% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch bmuddha/lock-free-chainlink

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link

Manual Deploy Available

You can trigger a manual deploy of this PR branch to testnet:

Deploy to Testnet 🚀

Alternative: Comment /deploy on this PR to trigger deployment directly.

⚠️ Note: Manual deploy requires authorization. Only authorized users can trigger deployments.

Comment updated automatically when the PR is synchronized.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (9)
magicblock-chainlink/src/accounts_bank.rs (2)

30-34: Mutating values requires a mutable reference from the entry.

Calling set_owner on the result of get(...) won’t compile unless you take a &mut to the value. Use get_mut() on the occupied entry.

-            if let Some(mut account) = self.accounts.get(pubkey) {
-                account.set_owner(owner);
+            if let Some(mut e) = self.accounts.get(pubkey) {
+                e.get_mut().set_owner(owner);

40-44: Same mutation issue for delegated flag.

Acquire a mutable value from the entry before mutating.

-            if let Some(mut account) = self.accounts.get(pubkey) {
-                account.set_delegated(delegated);
+            if let Some(mut e) = self.accounts.get(pubkey) {
+                e.get_mut().set_delegated(delegated);
magicblock-chainlink/src/chainlink/fetch_cloner.rs (1)

954-1000: Followers return Ok even if the leader fetch failed. Propagate outcome.

When fetch_new is empty, result defaults to Ok(...), so concurrent followers may return Ok despite the leader error. Send the leader’s outcome to followers via the oneshot.

Minimal change:

- type RemoteAccountRequests = Vec<oneshot::Sender<()>>;
+ type RemoteAccountRequests = Vec<oneshot::Sender<Result<(), String>>>;
...
-                        let (sender, receiver) = oneshot::channel();
+                        let (sender, receiver) = oneshot::channel::<Result<(), String>>();
...
-            self.fetch_and_clone_accounts(...).await
+            let res = self.fetch_and_clone_accounts(...).await;
+            let outcome = res.as_ref().map(|_| ()).map_err(|e| e.to_string());
+            res
...
-                if let Some(requests) = self.pending_requests.remove(&pubkey) {
-                    for sender in requests {
-                        let _ = sender.send(());
-                    }
-                }
+                if let Some(requests) = self.pending_requests.remove(&pubkey) {
+                    for sender in requests {
+                        let _ = sender.send(outcome.clone());
+                    }
+                }
...
-        for (_, receiver) in await_pending {
+        for (_, receiver) in await_pending {
             joinset.spawn(async move {
-                if let Err(err) = receiver.await {
+                match receiver.await {
+                    Ok(Ok(())) => {}
+                    Ok(Err(e)) => error!("Pending fetch failed: {e}"),
+                    Err(err) => {
                         error!(
                             "Failed to receive account from pending request: {err}"
                         );
-                    }
+                    }
                 }
             });
         }

Alternatively: after awaiting, recompute a FetchAndCloneResult for the waited pubkeys by checking the bank. Based on learnings.

Also applies to: 1019-1034

magicblock-chainlink/src/submux/mod.rs (1)

254-274: Do not push into a shared Vec from retain_async; use synchronous retain or collect safely.

retain_async may invoke closures across shards; pushing to a Vec captured by the outer task is not thread‑safe.

Apply this safer, synchronous variant:

-                states
-                    .retain_async(|_, debounce_state| {
+                states.retain(|_, debounce_state| {
                         if let DebounceState::Enabled {
                             next_allowed_forward,
                             pending,
                             ..
                         } = debounce_state
                         {
                             if now >= *next_allowed_forward {
                                 if let Some(u) = pending.take() {
                                     *next_allowed_forward = now + interval;
                                     to_forward.push(u);
                                 }
                             }
                         }
-                        true
-                    })
-                    .await;
+                        true
+                });

If you must stay async, collect per‑bucket into thread‑local buffers and merge post‑await, or protect to_forward with a Mutex.

magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs (1)

185-193: Avoid cloning Arc in hot path

Cloning Arc to call contains() is unnecessary and adds overhead.

Apply this diff:

-            let subscribed_pubkeys = self.subscribed_pubkeys.clone();
-            if subscribed_pubkeys.contains(&update.pubkey) {
+            if self.subscribed_pubkeys.contains(&update.pubkey) {
                 let _ =
                     self.updates_sndr.send(update).await.inspect_err(|err| {
                         error!("Failed to send subscription update: {err:?}")
                     });
             }
magicblock-chainlink/src/remote_account_provider/mod.rs (2)

345-367: Fix scc::HashMap remove/upsert misuse (compile-time and logic bugs)

  • HashMap::remove returns Option, not Option<(K, V)>. The current tuple destructuring won’t compile.
  • upsert requires a merge closure; calling it with only (key, value) won’t compile.

Use remove to get V and reinsert via insert (or entry().insert_entry). Also drop the unnecessary key in the pattern.

Apply this diff:

-                        if let Some((_, (fetch_start_slot, pending_requests))) =
-                            fetching_accounts.remove(&update.pubkey)
+                        if let Some((fetch_start_slot, pending_requests)) =
+                            fetching_accounts.remove(&update.pubkey)
                         {
@@
-                            } else {
+                            } else {
                                 // Subscription is stale, put the fetch tracking back
                                 warn!("Received stale subscription update for {} at slot {}. Fetch started at slot {}",
                                     update.pubkey, slot, fetch_start_slot);
-                                fetching_accounts.upsert(
-                                    update.pubkey,
-                                    (fetch_start_slot, pending_requests),
-                                );
+                                // Reinsert the pending requests to await the fetch result.
+                                fetching_accounts.insert(
+                                    update.pubkey,
+                                    (fetch_start_slot, pending_requests),
+                                );
                                 None
                             }
                         } else {

538-543: Avoid leaving “pending” entries on subscription setup failure

If setup_subscriptions fails here, the function returns early while entries were already inserted into fetching_accounts. Those senders will never be fulfilled, leaving is_pending true and leaking state.

Recommended: degrade gracefully—log and proceed with fetch-only, or roll back the inserted entries. Simpler and safer is to proceed with fetch-only.

Apply this diff:

-        self.setup_subscriptions(&subscription_overrides).await?;
+        if let Err(err) = self.setup_subscriptions(&subscription_overrides).await {
+            error!(
+                "setup_subscriptions failed, proceeding with fetch-only: {err:?}"
+            );
+            // We intentionally continue to fetch so all pending requests resolve.
+        }

Optionally, if you prefer strict failure, remove the entries you added for these pubkeys before returning the error.

magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs (2)

226-236: Fix remove() usage: return type is Option, not Option<(K, V)>

The .map(|e| e.1) indicates expecting (K, V). Use the value directly.

Apply this diff:

-                if let Some(AccountSubscription { cancellation_token }) =
-                    subscriptions.remove(&pubkey).map(|e| e.1)
+                if let Some(AccountSubscription { cancellation_token }) =
+                    subscriptions.remove(&pubkey)
                 {
                     cancellation_token.cancel();
                     let _ = response.send(Ok(()));
                 } else {

140-149: Shutdown path: consider cancelling parent token as final cleanup (optional)

On shutdown, after cancelling per-sub tokens, optionally cancel the parent subscription_watchers as well to ensure no new children are created accidentally during teardown. Not required if message loop has exited.

Apply this diff:

     pub async fn shutdown(&self) {
         info!("Shutting down ChainPubsubActor");
         self.subscriptions
             .scan_async(|_, sub| sub.cancellation_token.cancel())
             .await;
+        // Optional: ensure no new child tokens are created during shutdown
+        self.subscription_watchers.cancel();
         self.shutdown_token.cancel();
     }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a1dfd59 and 7edf7b5.

⛔ Files ignored due to path filters (2)
  • Cargo.lock is excluded by !**/*.lock
  • test-integration/Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (8)
  • magicblock-chainlink/Cargo.toml (2 hunks)
  • magicblock-chainlink/src/accounts_bank.rs (3 hunks)
  • magicblock-chainlink/src/chainlink/fetch_cloner.rs (6 hunks)
  • magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs (13 hunks)
  • magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs (6 hunks)
  • magicblock-chainlink/src/remote_account_provider/lru_cache.rs (6 hunks)
  • magicblock-chainlink/src/remote_account_provider/mod.rs (12 hunks)
  • magicblock-chainlink/src/submux/mod.rs (12 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
magicblock-chainlink/src/remote_account_provider/mod.rs (3)
magicblock-chainlink/src/chainlink/fetch_cloner.rs (6)
  • new (135-159)
  • pubkeys (447-450)
  • pubkeys (458-461)
  • pubkeys (462-465)
  • pubkeys (941-944)
  • pubkeys (946-949)
magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs (1)
  • new (169-179)
magicblock-chainlink/src/remote_account_provider/lru_cache.rs (2)
  • new (25-31)
  • pubkeys (35-38)
magicblock-chainlink/src/accounts_bank.rs (5)
magicblock-chainlink/src/testing/cloner_stub.rs (1)
  • get_account (41-45)
magicblock-chainlink/src/chainlink/blacklisted_accounts.rs (1)
  • blacklisted_accounts (6-31)
magicblock-chainlink/src/remote_account_provider/remote_account.rs (1)
  • account (180-202)
magicblock-accounts-db/src/lib.rs (3)
  • get_account (335-338)
  • remove_account (340-345)
  • remove_where (351-365)
magicblock-core/src/traits.rs (3)
  • get_account (12-12)
  • remove_account (13-13)
  • remove_where (14-17)
magicblock-chainlink/src/remote_account_provider/lru_cache.rs (1)
magicblock-chainlink/src/remote_account_provider/mod.rs (1)
  • new (190-228)
magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs (2)
magicblock-accounts/src/scheduled_commits_processor.rs (2)
  • new (66-90)
  • new (418-433)
magicblock-chainlink/src/testing/chain_pubsub.rs (1)
  • unsubscribe (43-55)
magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs (2)
magicblock-chainlink/src/remote_account_provider/mod.rs (1)
  • new (190-228)
magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs (1)
  • new (110-138)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: run_make_ci_test
  • GitHub Check: run_make_ci_lint
  • GitHub Check: run_make_ci_format
  • GitHub Check: run_make_ci_test
🔇 Additional comments (14)
magicblock-chainlink/Cargo.toml (1)

15-15: Deps update matches refactor.

Adding parking_lot and scc is consistent with the lock‑free migration.

Also applies to: 30-30

magicblock-chainlink/src/accounts_bank.rs (1)

95-98: retain usage looks fine; confirm predicate cost.

retain runs user code per entry; ensure predicate stays cheap to avoid long scans in large banks. Consider early collecting keys to remove if expensive.

magicblock-chainlink/src/chainlink/fetch_cloner.rs (2)

968-979: Entry API usage for dedup looks good.

Occupied pushes a sender; Vacant reserves the slot to serialize the first fetch.

Please also ensure pubkeys is de‑duplicated earlier; duplicates in the same call will enqueue multiple receivers unnecessarily. If needed, dedup with a small HashSet before this loop.


1005-1017: The review comment is factually incorrect about scc::HashMap::remove's return type.

scc::HashMap::remove returns Option<(K, V)>, not Option. The code correctly destructures the tuple with the (_, requests) pattern. The suggested fix would introduce a compile error.

Likely an incorrect or invalid review comment.

magicblock-chainlink/src/submux/mod.rs (2)

381-401: Dedup logic is sound.

Entry pattern ensures per-key timestamp updates without locks; pruner complements it.


506-507: Nice: read-only snapshot helper uses read().

This keeps tests cheap and avoids cloning the map.

magicblock-chainlink/src/remote_account_provider/lru_cache.rs (3)

64-72: put() return handling: align with HashCache API.

Your .ok().flatten() assumes put returns Result<Option<(K,V)>>. Verify against the scc version in workspace; adjust mapping accordingly.

If it returns Option<(K,V)>, drop ok(). If it returns Result<...>, keep as is. Based on learnings.


43-45: Promotion via get() is appropriate.

Accessing via get() to refresh recency is the right primitive for HashCache.


25-31: No changes needed — review comment is incorrect.

The web search confirms HashCache::with_capacity signature is pub fn with_capacity(minimum_capacity: usize, maximum_capacity: usize) -> Self, requiring two arguments. The current code HashCache::with_capacity(capacity, capacity) is correct and matches the API. The suggested diff would introduce a compilation error by passing only one argument to a function that requires two.

Likely an incorrect or invalid review comment.

magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs (1)

97-106: take_updates: good use of parking_lot::Mutex; semantics preserved

Guard.take() with an expect is correct for single-consumer semantics. No issues.

magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs (3)

260-269: Subscribe deduplication policy

Current add_sub unconditionally upserts. If duplicate subscribe requests for same pubkey are errors in your protocol, respond with an error instead of replacing tokens. If replacement is desired, the merge closure above correctly cancels the old watcher.

Would you like to reject duplicate subscribes? If yes, I can provide a patch returning AccountSubscriptionAlreadyExists.


339-400: Resubscribe loop looks good after fixes

After correcting parent-token cancellation and upsert usage, the resubscribe flow rehydrates all subscriptions on the new client. LGTM.


1-5: No API alignment issues found — code correctly uses scc 2.4

The scc::HashMap remove method returns an Option containing the removed (K, V) tuple. The code at line 226 in chain_pubsub_actor.rs correctly uses this API: subscriptions.remove(&pubkey).map(|e| e.1) extracts the value from the returned tuple. This pattern aligns with scc 2.4's pinned version. No problematic upsert, prune, or scan_async patterns were detected in the codebase.

magicblock-chainlink/src/remote_account_provider/mod.rs (1)

427-430: No issues found - code correctly uses scc::HashMap API

Verification confirms the contains() call at lines 427-430 targets scc::HashMap (via Arc<FetchingAccounts>), which correctly uses the contains(&K) API. All related collections throughout the codebase consistently use scc types (HashMap, HashCache, HashSet), with imports properly referencing the scc crate. The single std::HashMap usage found is isolated to a test module, posing no risk of API mismatch in production code.

pub fn insert(&self, pubkey: Pubkey, account: AccountSharedData) {
trace!("Inserting account: {pubkey}");
self.accounts.lock().unwrap().insert(pubkey, account);
self.accounts.upsert(pubkey, account);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix scc::HashMap insertion: use insert or proper upsert signature.

scc::HashMap::upsert typically requires an update closure; the 2‑arg form is invalid. Use insert or the 3‑arg upsert variant.

Apply one of these:

-            self.accounts.upsert(pubkey, account);
+            // Simple replace/insert is sufficient here
+            self.accounts.insert(pubkey, account);

Or, if you truly want upsert semantics with an update:

-            self.accounts.upsert(pubkey, account);
+            self.accounts.upsert(pubkey, account, |_, v| {
+                *v = account.clone();
+            });
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
self.accounts.upsert(pubkey, account);
// Simple replace/insert is sufficient here
self.accounts.insert(pubkey, account);
🤖 Prompt for AI Agents
In magicblock-chainlink/src/accounts_bank.rs around line 21, the call
self.accounts.upsert(pubkey, account) uses an invalid 2-argument form; change it
to either use insert by replacing the call with self.accounts.insert(pubkey,
account) when you just want to set/replace the value, or use scc::HashMap's
3-arg upsert variant with an initializer closure and an update closure (e.g.,
upsert(key, || new_value, |existing| { /* modify or return replacement */ })) to
implement true upsert semantics.

Comment on lines +104 to 108
self.accounts.scan(|pubkey, acc| {
let _ = write!(f, "\n - {pubkey}{acc:?}");
});
write!(f, "}}\nTotal {} accounts", self.accounts.len())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

fmt using scan is OK, but avoid heavy work in scan if it becomes parallel later.

If scan ever becomes parallel, writing to fmt::Formatter inside the closure would be unsafe. Today it’s synchronous; to future‑proof, collect into a String in scan and write once afterward.

🤖 Prompt for AI Agents
In magicblock-chainlink/src/accounts_bank.rs around lines 104 to 108, the
closure passed to self.accounts.scan performs writing to the fmt::Formatter
directly which would be unsafe if scan becomes parallel; instead allocate a
local String, append each line inside the scan closure (e.g., format! or
push_str with prepared pieces), then after scan finishes call write!(f,
"{}\nTotal {} accounts", collected_string, self.accounts.len()) so all
formatting to f happens once outside the scan.

Comment on lines +294 to 300
subs.upsert(
pubkey,
AccountSubscription {
cancellation_token: cancellation_token.clone(),
},
);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

subs.upsert() missing merge closure; cancel old token on replace

scc::HashMap::upsert requires a merge function. Use it to cancel any previous subscription token before replacing.

Apply this diff:

-            subs.upsert(
-                pubkey,
-                AccountSubscription {
-                    cancellation_token: cancellation_token.clone(),
-                },
-            );
+            subs.upsert(
+                pubkey,
+                AccountSubscription {
+                    cancellation_token: cancellation_token.clone(),
+                },
+                |_, existing, newv| {
+                    existing.cancellation_token.cancel();
+                    *existing = newv;
+                },
+            );
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
subs.upsert(
pubkey,
AccountSubscription {
cancellation_token: cancellation_token.clone(),
},
);
subs.upsert(
pubkey,
AccountSubscription {
cancellation_token: cancellation_token.clone(),
},
|_, existing, newv| {
existing.cancellation_token.cancel();
*existing = newv;
},
);
🤖 Prompt for AI Agents
magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs around
lines 294 to 300: subs.upsert is being called without the required merge closure
so an existing subscription token will be dropped without being cancelled;
change the call to use the upsert overload that takes a merge function which,
when an existing AccountSubscription is present, calls
existing.cancellation_token.cancel() (or .cancelled/abort depending on type)
before returning the new AccountSubscription (constructed using the current
cancellation_token.clone()), ensuring the old token is explicitly cancelled
prior to replacement.

Comment on lines 374 to 377
// Abort and await all watcher tasks and add fresh joinset
debug!("RecycleConnections: aborting watcher tasks");
let mut old_joinset = {
let mut watchers = subscription_watchers
.lock()
.expect("subscription_watchers lock poisonde");
std::mem::replace(&mut *watchers, tokio::task::JoinSet::new())
};
old_joinset.abort_all();
while let Some(_res) = old_joinset.join_next().await {}
debug!("RecycleConnections: watcher tasks terminated");
subscription_watchers.cancel();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Recycle bug: cancelling parent token breaks all future subscriptions

subscription_watchers.cancel() cancels the parent; all subsequently created child_token()s will be immediately cancelled. New subscriptions will self-cancel on creation.

Minimal safe fix: do not cancel the parent here; you already cancel each per-sub token above.

-        debug!("RecycleConnections: aborting watcher tasks");
-        subscription_watchers.cancel();
+        debug!("RecycleConnections: watchers cancelled via per-sub tokens");

If you need a “parent” reset, create a fresh token and pass it into new add_sub calls, but that requires plumbing a new Arc through start_worker state.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Abort and await all watcher tasks and add fresh joinset
debug!("RecycleConnections: aborting watcher tasks");
let mut old_joinset = {
let mut watchers = subscription_watchers
.lock()
.expect("subscription_watchers lock poisonde");
std::mem::replace(&mut *watchers, tokio::task::JoinSet::new())
};
old_joinset.abort_all();
while let Some(_res) = old_joinset.join_next().await {}
debug!("RecycleConnections: watcher tasks terminated");
subscription_watchers.cancel();
// Abort and await all watcher tasks and add fresh joinset
debug!("RecycleConnections: watchers cancelled via per-sub tokens");
🤖 Prompt for AI Agents
In magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs around
lines 374 to 377, calling subscription_watchers.cancel() cancels the parent
CancellationToken which causes any future child_token()s to be immediately
cancelled; remove the subscription_watchers.cancel() call so you only cancel
per-sub tokens (which are already cancelled above). If you truly need a reset of
the parent token, instead allocate a new Arc<CancellationToken> and thread it
into the start_worker/add_sub call sites so new subscriptions receive the fresh
parent; do not reuse or cancel the existing parent token.

Comment on lines +144 to 147
use std::sync::{
atomic::{AtomicU64, Ordering},
Mutex,
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Unify on parking_lot::Mutex in mocks to remove unwrap and avoid poisoning pitfalls

Mocks still use std::sync::Mutex and unwrap(). For consistency and fewer panics under test failures, switch to parking_lot::Mutex and drop unwraps.

Apply this diff:

-    use std::sync::{
-        atomic::{AtomicU64, Ordering},
-        Mutex,
-    };
+    use std::sync::atomic::{AtomicU64, Ordering};
+    use parking_lot::Mutex;

@@
         updates_sndr: mpsc::Sender<SubscriptionUpdate>,
-        updates_rcvr: Arc<Mutex<Option<mpsc::Receiver<SubscriptionUpdate>>>>,
+        updates_rcvr: Arc<Mutex<Option<mpsc::Receiver<SubscriptionUpdate>>>>,
@@
-            self.updates_rcvr.lock().unwrap().take().expect(
+            self.updates_rcvr.lock().take().expect(
                 "ChainPubsubClientMock::take_updates called more than once",
             )

Also applies to: 163-176, 229-237

🤖 Prompt for AI Agents
In magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs
around lines 144-147 (and also apply the same changes to 163-176 and 229-237),
replace uses of std::sync::Mutex in mocks with parking_lot::Mutex and remove any
.unwrap() calls that were used to handle lock poisoning; update the use/import
block to import parking_lot::Mutex instead of std::sync::Mutex, ensure types
referencing the mutex remain the same, and if parking_lot is not yet a
dependency add it to Cargo.toml; adjust code to lock via parking_lot's API (no
Result/poisoning) so unwraps are unnecessary.

Comment on lines +100 to +103
/// The minimum capacity for scc::HashCache is 64. We use a higher value
/// to make statistical outcomes in tests highly predictable.
const CAPACITY: usize = 512;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Test stability: survival rate >95% can be flaky.

Random Pubkeys and probabilistic eviction can produce rare false negatives. Consider checking “promoted > non‑promoted” with a margin, or seeding/scoping churn deterministically.

Also applies to: 178-230

Comment on lines 886 to +890
// Remove from fetching and get pending requests
// Note: the account might have been resolved by subscription update already
if let Some((_, requests)) = fetching.remove(pubkey) {
if let Some((_, (_, requests))) =
fetching_accounts.remove(pubkey)
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix second misuse of HashMap::remove return type

Same issue as above: remove returns Option. Drop the extra tuple.

Apply this diff:

-                    if let Some((_, (_, requests))) =
-                        fetching_accounts.remove(pubkey)
+                    if let Some((_, requests)) =
+                        fetching_accounts.remove(pubkey)
                     {
                         requests
                     } else {
🤖 Prompt for AI Agents
In magicblock-chainlink/src/remote_account_provider/mod.rs around lines 886 to
890, the pattern matching incorrectly assumes HashMap::remove returns an Option
of a tuple pair; change the match to unwrap the Option<V> directly by dropping
the extra tuple level — e.g., use if let Some((_, requests)) =
fetching_accounts.remove(pubkey) { … } so you only destructure the stored value
(pair) returned by remove.

Comment on lines +414 to 420
let mut debounce_state = debounce_states
.entry(pubkey)
.or_insert_with(|| DebounceState::Disabled {
pubkey,
arrivals: VecDeque::new(),
}
});
});

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Entry API misuse: get a &mut DebounceState from the entry, then mutate.

or_insert_with returns an entry, not the value; operations like arrivals_mut() and maybe_enable/disable must act on &mut DebounceState via get_mut().

-        let mut maybe_forward_now = None;
-        {
-            let mut debounce_state = debounce_states
-                .entry(pubkey)
-                .or_insert_with(|| DebounceState::Disabled {
-                    pubkey,
-                    arrivals: VecDeque::new(),
-                });
+        let mut maybe_forward_now = None;
+        {
+            let mut entry = match debounce_states.entry(pubkey) {
+                Entry::Occupied(e) => e,
+                Entry::Vacant(v) => v.insert_entry(DebounceState::Disabled {
+                    pubkey,
+                    arrivals: VecDeque::new(),
+                }),
+            };
+            let debounce_state = entry.get_mut();
             // prune and push current
             let arrivals_len = {
                 let arrivals = debounce_state.arrivals_mut();
                 ...
             };
             ...
-            match debounce_state.get_mut() {
+            match debounce_state {
                 DebounceState::Disabled { .. } => {
                     maybe_forward_now = Some(update);
                 }
                 DebounceState::Enabled {
                     next_allowed_forward,
                     pending,
                     ..
                 } => {
                     if now >= *next_allowed_forward {
                         *next_allowed_forward = now + debounce_interval;
                         *pending = None;
                         maybe_forward_now = Some(update);
                     } else {
                         *pending = Some(update);
                     }
                 }
             }
         }

Also applies to: 477-495

🤖 Prompt for AI Agents
In magicblock-chainlink/src/submux/mod.rs around lines 414-420 (and similarly
477-495), the Entry API is being misused by assuming or_insert_with returned a
mutable reference; change the flow to insert or obtain the value and then call
get_mut() (or match on entry) to get a &mut DebounceState before calling
arrivals_mut(), maybe_enable(), disable(), or any other mutating methods; e.g.,
use entry.or_insert_with(|| DebounceState::Disabled { ... }); let debounce_state
= entry.get_mut(); then perform mutations on that &mut DebounceState (apply the
same pattern to the other block at 477-495).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants