diff --git a/crates/core/src/runloops/mod.rs b/crates/core/src/runloops/mod.rs index 1a2b2437..89efc996 100644 --- a/crates/core/src/runloops/mod.rs +++ b/crates/core/src/runloops/mod.rs @@ -50,7 +50,7 @@ use crate::{ admin::AdminRpc, bank_data::BankData, full::Full, jito::Jito, minimal::Minimal, surfnet_cheatcodes::SurfnetCheatcodes, ws::Rpc, }, - surfnet::{GeyserEvent, PluginCommand, locker::SurfnetSvmLocker, remote::SurfnetRemoteClient}, + surfnet::{GeyserEvent, PluginCommand, ProfilingJob, locker::SurfnetSvmLocker, remote::SurfnetRemoteClient}, }; const BLOCKHASH_SLOT_TTL: u64 = 75; @@ -247,6 +247,9 @@ pub async fn start_local_surfnet_runloop( simnet_events_tx_cc.send(SimnetEvent::error(format!("Geyser plugin failed: {e}"))); } }; + + setup_profiling(&svm_locker); + let (clock_event_rx, clock_command_tx) = start_clock_runloop(simnet_config.slot_time, Some(simnet_events_tx_cc.clone())); @@ -1138,3 +1141,57 @@ async fn start_ws_rpc_server_runloop( .map_err(|e| format!("Failed to spawn WebSocket RPC Handler thread: {:?}", e))?; Ok(_ws_handle) } + + +pub fn setup_profiling(svm_locker: &SurfnetSvmLocker) { + let simnet_events_tx = svm_locker.simnet_events_tx(); + let (profiling_job_tx, profiling_job_rx) = crossbeam_channel::bounded(128); + start_profiling_runloop(profiling_job_rx, simnet_events_tx); + svm_locker.with_svm_writer(|svm| { + svm.profiling_job_tx = Some(profiling_job_tx); + }); +} + +pub fn start_profiling_runloop( + profiling_job_rx: Receiver, + simnet_events_tx: Sender, +) { + // no need of this channel in profiling + let (temp_status_tx, _) = crossbeam_channel::unbounded(); + hiro_system_kit::thread_named("Instruction Profiler").spawn(move || { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .expect("Failed to build profiling runtime"); + + while let Ok(job) = profiling_job_rx.recv() { + let result = rt.block_on(async { + let profiling_locker = SurfnetSvmLocker::new(job.profiling_svm); + profiling_locker + .generate_instruction_profiles( + &job.transaction, + &job.transaction_accounts, + &job.loaded_addresses, + &job.accounts_before, + &job.token_accounts_before, + &job.token_programs, + job.pre_execution_capture, + &temp_status_tx + ) + .await + }); + + let profiles = match result { + Ok(profiles) => profiles, + Err(e) => { + let _ = simnet_events_tx.try_send(SimnetEvent::error(format!( + "Instruction profiling failed: {}", e + ))); + None + } + }; + let _ = job.result_tx.send(profiles); + } + }).expect("Failed to spawn Instruction Profiler thread"); +} diff --git a/crates/core/src/surfnet/locker.rs b/crates/core/src/surfnet/locker.rs index 45134b66..580b987b 100644 --- a/crates/core/src/surfnet/locker.rs +++ b/crates/core/src/surfnet/locker.rs @@ -70,7 +70,7 @@ use crate::{ error::{SurfpoolError, SurfpoolResult}, helpers::time_travel::calculate_time_travel_clock, rpc::utils::{convert_transaction_metadata_from_canonical, verify_pubkey}, - surfnet::{FINALIZATION_SLOT_THRESHOLD, SLOTS_PER_EPOCH}, + surfnet::{FINALIZATION_SLOT_THRESHOLD, ProfilingJob, SLOTS_PER_EPOCH}, types::{ GeyserAccountUpdate, OfflineAccountConfig, RemoteRpcResult, SurfnetTransactionStatus, TimeTravelConfig, TokenAccount, TransactionLoadedAddresses, TransactionWithStatusMeta, @@ -1096,7 +1096,7 @@ impl SurfnetSvmLocker { ) -> SurfpoolResult<()> { let do_propagate_status_updates = true; let signature = transaction.signatures[0]; - let profile_result = match self + let (profile_result, ix_profile_rx) = match self .fetch_all_tx_accounts_then_process_tx_returning_profile_res( remote_ctx, transaction, @@ -1133,6 +1133,32 @@ impl SurfnetSvmLocker { self.with_svm_writer(|svm_writer| { svm_writer.write_executed_profile_result(signature, profile_result) })?; + + // Spawn an async task to receive profiling results and append them to the + // stored profiles + if let Some(rx) = ix_profile_rx { + let locker = self.clone(); + tokio::spawn(async move { + let ix_profiles = tokio::task::spawn_blocking(move || rx.recv().ok().flatten()) + .await + .ok() + .flatten(); + if let Some(profiles) = ix_profiles { + locker.with_svm_writer(|svm_writer| { + if let Ok(Some(mut keyed_profile)) = svm_writer + .executed_transaction_profiles + .get(&signature.to_string()) + { + keyed_profile.instruction_profiles = Some(profiles); + let _ = svm_writer + .executed_transaction_profiles + .store(signature.to_string(), keyed_profile); + } + }); + } + }); + } + Ok(()) } @@ -1153,7 +1179,7 @@ impl SurfnetSvmLocker { let skip_preflight = true; // skip preflight checks during transaction profiling let sigverify = true; // do verify signatures during transaction profiling let do_propagate_status_updates = false; // don't propagate status updates during transaction profiling - let mut profile_result = svm_locker + let (mut profile_result, ix_profile_rx) = svm_locker .fetch_all_tx_accounts_then_process_tx_returning_profile_res( remote_ctx, transaction, @@ -1171,9 +1197,37 @@ impl SurfnetSvmLocker { svm_writer.write_simulated_profile_result(uuid, tag, profile_result) })?; + if let Some(rx) = ix_profile_rx { + let locker = self.clone(); + tokio::spawn(async move { + let ix_profiles = tokio::task::spawn_blocking(move || rx.recv().ok().flatten()) + .await + .ok() + .flatten(); + if let Some(profiles) = ix_profiles { + locker.with_svm_writer(|svm_writer| { + if let Ok(Some(mut keyed_profile)) = svm_writer + .simulated_transaction_profiles + .get(&uuid.to_string()) + { + keyed_profile.instruction_profiles = Some(profiles); + let _ = svm_writer + .simulated_transaction_profiles + .store(uuid.to_string(), keyed_profile); + } + }); + } + }); + } + Ok(self.with_contextualized_svm_reader(|_| uuid)) } + + pub fn profiling_job_tx(&self) -> Option> { + self.with_svm_reader(|svm| svm.profiling_job_tx.clone()) + } + async fn fetch_all_tx_accounts_then_process_tx_returning_profile_res( &self, remote_ctx: &Option<(SurfnetRemoteClient, CommitmentConfig)>, @@ -1182,7 +1236,7 @@ impl SurfnetSvmLocker { skip_preflight: bool, sigverify: bool, do_propagate: bool, - ) -> SurfpoolResult { + ) -> SurfpoolResult<(KeyedProfileResult, Option>>>)> { let signature = transaction.signatures[0]; // Sigverify the transaction upfront before doing any account fetching or other pre-processing. @@ -1338,29 +1392,28 @@ impl SurfnetSvmLocker { let loaded_addresses = tx_loaded_addresses.as_ref().map(|l| l.loaded_addresses()); - let ix_profiles = if self.is_instruction_profiling_enabled() { - match self - .generate_instruction_profiles( - &transaction, - &transaction_accounts, - &tx_loaded_addresses, - &accounts_before, - &token_accounts_before, - &token_programs, - pre_execution_capture.clone(), - &status_tx, - ) - .await - { - Ok(profiles) => profiles, - Err(e) => { - let _ = self.simnet_events_tx().try_send(SimnetEvent::error(format!( - "Failed to generate instruction profiles: {}", - e - ))); - None - } + let ix_profile_rx = if self.is_instruction_profiling_enabled() { + if let Some(profiling_job_tx) = self.profiling_job_tx() { + let profiling_svm = self.with_svm_reader(|r| r.clone_for_profiling()); + let (result_tx, result_rx) = crossbeam_channel::bounded(1); + let job = ProfilingJob { + profiling_svm, + transaction: transaction.clone(), + transaction_accounts: transaction_accounts.to_vec(), + loaded_addresses: tx_loaded_addresses.clone(), + accounts_before: accounts_before.to_vec(), + token_accounts_before: token_accounts_before.to_vec(), + token_programs: token_programs.to_vec(), + pre_execution_capture: pre_execution_capture.clone(), + result_tx, + }; + let _ = profiling_job_tx.send(job); + Some(result_rx) + } + else { + None } + } else { None }; @@ -1381,17 +1434,20 @@ impl SurfnetSvmLocker { ) .await?; - Ok(KeyedProfileResult::new( - latest_absolute_slot, - UuidOrSignature::Signature(signature), - ix_profiles, - profile_result, - readonly_account_states, + Ok(( + KeyedProfileResult::new( + latest_absolute_slot, + UuidOrSignature::Signature(signature), + None, + profile_result, + readonly_account_states, + ), + ix_profile_rx, )) } #[allow(clippy::too_many_arguments)] - async fn generate_instruction_profiles( + pub async fn generate_instruction_profiles( &self, transaction: &VersionedTransaction, transaction_accounts: &[Pubkey], diff --git a/crates/core/src/surfnet/mod.rs b/crates/core/src/surfnet/mod.rs index 855047b5..f43bba7b 100644 --- a/crates/core/src/surfnet/mod.rs +++ b/crates/core/src/surfnet/mod.rs @@ -18,12 +18,13 @@ use solana_signature::Signature; use solana_transaction::versioned::VersionedTransaction; use solana_transaction_error::TransactionError; use solana_transaction_status::{EncodedConfirmedTransactionWithStatusMeta, TransactionStatus}; +use surfpool_types::{ExecutionCapture, ProfileResult}; use svm::SurfnetSvm; use crate::{ PluginInfo, error::{SurfpoolError, SurfpoolResult}, - types::{GeyserAccountUpdate, TransactionWithStatusMeta}, + types::{GeyserAccountUpdate, TokenAccount, TransactionLoadedAddresses, TransactionWithStatusMeta}, }; pub mod locker; @@ -195,6 +196,18 @@ pub enum SnapshotImportStatus { Failed, } +pub struct ProfilingJob { + pub profiling_svm: SurfnetSvm, + pub transaction: VersionedTransaction, + pub transaction_accounts: Vec, + pub loaded_addresses: Option, + pub accounts_before: Vec>, + pub token_accounts_before: Vec<(usize, TokenAccount)>, + pub token_programs: Vec, + pub pre_execution_capture: ExecutionCapture, + pub result_tx: Sender>>, +} + #[derive(Debug, Clone, PartialEq)] pub enum SignatureSubscriptionType { Received, diff --git a/crates/core/src/surfnet/svm.rs b/crates/core/src/surfnet/svm.rs index 923157e9..2aa5c6a0 100644 --- a/crates/core/src/surfnet/svm.rs +++ b/crates/core/src/surfnet/svm.rs @@ -82,7 +82,7 @@ use crate::{ scenarios::TemplateRegistry, storage::{OverlayStorage, Storage, new_kv_store, new_kv_store_with_default}, surfnet::{ - LogsSubscriptionData, locker::is_supported_token_program, surfnet_lite_svm::SurfnetLiteSvm, + LogsSubscriptionData, ProfilingJob, locker::is_supported_token_program, surfnet_lite_svm::SurfnetLiteSvm }, types::{ GeyserAccountUpdate, MintAccount, OfflineAccountConfig, SerializableAccountAdditionalData, @@ -235,6 +235,7 @@ pub struct SurfnetSvm { Sender, Option, )>, + pub profiling_job_tx: Option>, pub perf_samples: VecDeque, pub transactions_processed: u64, pub latest_epoch_info: EpochInfo, @@ -344,6 +345,7 @@ impl SurfnetSvm { inner: self.inner.clone_for_profiling(), remote_rpc_url: self.remote_rpc_url.clone(), chain_tip: self.chain_tip.clone(), + profiling_job_tx: self.profiling_job_tx.clone(), // Wrap all storage fields with OverlayStorage blocks: OverlayStorage::wrap(self.blocks.clone_box()), @@ -557,6 +559,7 @@ impl SurfnetSvm { chain_tip, blocks: blocks_db, transactions: transactions_db, + profiling_job_tx: None, perf_samples: VecDeque::new(), transactions_processed, simnet_events_tx, diff --git a/crates/core/src/tests/helpers.rs b/crates/core/src/tests/helpers.rs index 71b2d0fd..3dbe29f3 100644 --- a/crates/core/src/tests/helpers.rs +++ b/crates/core/src/tests/helpers.rs @@ -5,13 +5,34 @@ use crossbeam_channel::Sender; use solana_clock::Clock; use solana_epoch_info::EpochInfo; use solana_transaction::versioned::VersionedTransaction; -use surfpool_types::{CheatcodeConfig, RpcConfig, SimnetCommand}; +use surfpool_types::{CheatcodeConfig, RpcConfig, RpcProfileResultConfig, SimnetCommand, UiKeyedProfileResult, types::UuidOrSignature}; use crate::{ rpc::RunloopContext, surfnet::{PluginCommand, locker::SurfnetSvmLocker, svm::SurfnetSvm}, }; +/// Polls `get_profile_result` until `instruction_profiles` is populated or the timeout is reached. +/// Instruction profiles are appended asynchronously, so tests need to wait for them. +pub async fn poll_for_instruction_profiles( + svm_locker: &SurfnetSvmLocker, + key: UuidOrSignature, + config: &RpcProfileResultConfig, + timeout: std::time::Duration, +) -> UiKeyedProfileResult { + let start = std::time::Instant::now(); + loop { + let result = svm_locker + .get_profile_result(key.clone(), config) + .unwrap() + .expect("Profile result should exist"); + if result.instruction_profiles.is_some() || start.elapsed() >= timeout { + return result; + } + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + } +} + pub fn get_free_port() -> Result { let listener = TcpListener::bind("127.0.0.1:0").map_err(|e| format!("Failed to bind to port 0: {}", e))?; diff --git a/crates/core/src/tests/integration.rs b/crates/core/src/tests/integration.rs index 61169152..8208e4af 100644 --- a/crates/core/src/tests/integration.rs +++ b/crates/core/src/tests/integration.rs @@ -54,7 +54,7 @@ use crate::{ minimal::MinimalClient, surfnet_cheatcodes::{SurfnetCheatcodes, SurfnetCheatcodesRpc}, }, - runloops::start_local_surfnet_runloop, + runloops::{setup_profiling, start_local_surfnet_runloop}, storage::tests::TestType, surfnet::{ PluginCommand, SignatureSubscriptionType, locker::SurfnetSvmLocker, svm::SurfnetSvm, @@ -1843,6 +1843,7 @@ async fn test_profile_transaction_basic(test_type: TestType) { // Set up test environment let (svm_instance, _simnet_events_rx, _geyser_events_rx) = test_type.initialize_svm(); let svm_locker = SurfnetSvmLocker::new(svm_instance); + setup_profiling(&svm_locker); // Set up test accounts let payer = Keypair::new(); @@ -1930,6 +1931,7 @@ async fn test_profile_transaction_basic(test_type: TestType) { async fn test_profile_transaction_multi_instruction_basic(test_type: TestType) { let (svm_instance, _simnet_events_rx, _geyser_events_rx) = test_type.initialize_svm(); let svm_locker = SurfnetSvmLocker::new(svm_instance); + setup_profiling(&svm_locker); let payer = Keypair::new(); let lamports_to_send = 1_000_000; @@ -2109,10 +2111,13 @@ async fn test_profile_transaction_multi_instruction_basic(test_type: TestType) { encoding: Some(UiAccountEncoding::JsonParsed), depth: Some(RpcProfileDepth::Instruction), }; - let profile_result = svm_locker - .get_profile_result(key, &rpc_profile_config) - .unwrap() - .expect("Profile result should exist"); + let profile_result = crate::tests::helpers::poll_for_instruction_profiles( + &svm_locker, + key, + &rpc_profile_config, + Duration::from_secs(10), + ) + .await; println!( "Profile result with Instruction depth: {}", @@ -2328,6 +2333,7 @@ async fn test_profile_transaction_with_tag(test_type: TestType) { // Set up test environment let (svm_instance, _simnet_events_rx, _geyser_events_rx) = test_type.initialize_svm(); let svm_locker = SurfnetSvmLocker::new(svm_instance); + setup_profiling(&svm_locker); // Set up test accounts let payer = Keypair::new(); @@ -2488,6 +2494,7 @@ async fn test_profile_transaction_with_tag(test_type: TestType) { async fn test_profile_transaction_token_transfer(test_type: TestType) { let (svm_instance, _simnet_events_rx, _geyser_events_rx) = test_type.initialize_svm(); let svm_locker = SurfnetSvmLocker::new(svm_instance); + setup_profiling(&svm_locker); // Set up test accounts let payer = Keypair::new(); @@ -2595,10 +2602,13 @@ async fn test_profile_transaction_token_transfer(test_type: TestType) { depth: Some(RpcProfileDepth::Instruction), }; let key = UuidOrSignature::Uuid(profile_result.inner); - let ui_profile_result = svm_locker - .get_profile_result(key, &rpc_profile_config) - .unwrap() - .expect("Profile result should exist"); + let ui_profile_result = crate::tests::helpers::poll_for_instruction_profiles( + &svm_locker, + key, + &rpc_profile_config, + Duration::from_secs(10), + ) + .await; assert!( ui_profile_result @@ -2928,6 +2938,7 @@ async fn test_profile_transaction_token_transfer(test_type: TestType) { async fn test_profile_transaction_insufficient_funds(test_type: TestType) { let (svm_instance, _simnet_events_rx, _geyser_events_rx) = test_type.initialize_svm(); let svm_locker = SurfnetSvmLocker::new(svm_instance); + setup_profiling(&svm_locker); // Set up test accounts with insufficient funds let payer = Keypair::new(); @@ -2998,6 +3009,7 @@ async fn test_profile_transaction_insufficient_funds(test_type: TestType) { async fn test_profile_transaction_multi_instruction_failure(test_type: TestType) { let (svm_instance, _simnet_events_rx, _geyser_events_rx) = test_type.initialize_svm(); let svm_locker = SurfnetSvmLocker::new(svm_instance); + setup_profiling(&svm_locker); // Set up test accounts let payer = Keypair::new(); @@ -3034,10 +3046,13 @@ async fn test_profile_transaction_multi_instruction_failure(test_type: TestType) .inner; let key = UuidOrSignature::Uuid(uuid); - let profile_result = svm_locker - .get_profile_result(key, &RpcProfileResultConfig::default()) - .unwrap() - .expect("Profile result should exist"); + let profile_result = crate::tests::helpers::poll_for_instruction_profiles( + &svm_locker, + key, + &RpcProfileResultConfig::default(), + Duration::from_secs(10), + ) + .await; // Verify transaction profile shows failure assert!( @@ -3082,6 +3097,7 @@ async fn test_profile_transaction_multi_instruction_failure(test_type: TestType) async fn test_profile_transaction_with_encoding(test_type: TestType) { let (svm_instance, _simnet_events_rx, _geyser_events_rx) = test_type.initialize_svm(); let svm_locker = SurfnetSvmLocker::new(svm_instance); + setup_profiling(&svm_locker); // Set up test accounts let payer = Keypair::new(); @@ -3154,6 +3170,7 @@ async fn test_profile_transaction_with_encoding(test_type: TestType) { async fn test_profile_transaction_with_tag_and_retrieval(test_type: TestType) { let (svm_instance, _simnet_events_rx, _geyser_events_rx) = test_type.initialize_svm(); let svm_locker = SurfnetSvmLocker::new(svm_instance); + setup_profiling(&svm_locker); // Set up test accounts let payer = Keypair::new(); @@ -3258,6 +3275,7 @@ async fn test_profile_transaction_with_tag_and_retrieval(test_type: TestType) { async fn test_profile_transaction_empty_instruction(test_type: TestType) { let (svm_instance, _simnet_events_rx, _geyser_events_rx) = test_type.initialize_svm(); let svm_locker = SurfnetSvmLocker::new(svm_instance); + setup_profiling(&svm_locker); // Set up test accounts let payer = Keypair::new(); @@ -3318,6 +3336,7 @@ async fn test_profile_transaction_empty_instruction(test_type: TestType) { async fn test_profile_transaction_versioned_message(test_type: TestType) { let (svm_instance, _simnet_events_rx, _geyser_events_rx) = test_type.initialize_svm(); let svm_locker = SurfnetSvmLocker::new(svm_instance); + setup_profiling(&svm_locker); // Set up test accounts let payer = Keypair::new(); @@ -4180,13 +4199,13 @@ async fn test_ix_profiling_with_alt_tx(test_type: TestType) { .unwrap(); let profile_result_uuid = binding.inner(); - let profile_result = svm_locker - .get_profile_result( - UuidOrSignature::Uuid(*profile_result_uuid), - &RpcProfileResultConfig::default(), - ) - .unwrap() - .unwrap(); + let profile_result = crate::tests::helpers::poll_for_instruction_profiles( + &svm_locker, + UuidOrSignature::Uuid(*profile_result_uuid), + &RpcProfileResultConfig::default(), + Duration::from_secs(10), + ) + .await; let ix_profiles = profile_result .instruction_profiles .as_ref() @@ -4391,13 +4410,13 @@ async fn test_compute_budget_profiling(test_type: TestType) { .await .unwrap() .inner; - let profile_result = svm_locker - .get_profile_result( - UuidOrSignature::Uuid(uuid), - &RpcProfileResultConfig::default(), - ) - .unwrap() - .unwrap(); + let profile_result = crate::tests::helpers::poll_for_instruction_profiles( + &svm_locker, + UuidOrSignature::Uuid(uuid), + &RpcProfileResultConfig::default(), + Duration::from_secs(10), + ) + .await; let ix_profile = profile_result.instruction_profiles.unwrap(); assert_eq!(ix_profile.len(), 3, "Should have 3 instruction profiles"); @@ -7769,6 +7788,7 @@ async fn test_profile_transaction_does_not_mutate_state(test_type: TestType) { // Create the locker and runloop context let svm_locker = SurfnetSvmLocker::new(svm_instance); + setup_profiling(&svm_locker); let (simnet_cmd_tx, _simnet_cmd_rx) = crossbeam_unbounded::(); let (plugin_commands_tx, _plugin_commands_rx) = crossbeam_channel::unbounded::(); @@ -7965,12 +7985,15 @@ async fn test_instruction_profiling_does_not_mutate_state(test_type: TestType) { "process_transaction should complete (success or failure)" ); - // Retrieve the profile result using the signature + // Retrieve the profile result using the signature, polling for instruction profiles let key = UuidOrSignature::Signature(signature); - let profile_result = svm_locker - .get_profile_result(key, &RpcProfileResultConfig::default()) - .unwrap() - .expect("Profile result should exist for executed transaction"); + let profile_result = crate::tests::helpers::poll_for_instruction_profiles( + &svm_locker, + key, + &RpcProfileResultConfig::default(), + Duration::from_secs(10), + ) + .await; // Verify the overall transaction failed (due to second instruction) assert!( diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index 3f53a256..8b7cb10e 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -1262,7 +1262,7 @@ impl std::fmt::Display for TimeTravelError { impl std::error::Error for TimeTravelError {} -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] /// Tracks the loaded addresses with its associated index within an Address Lookup Table pub struct IndexedLoadedAddresses { pub writable: Vec<(u8, Pubkey)>, @@ -1284,7 +1284,7 @@ impl IndexedLoadedAddresses { } } -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] /// Maps an Address Lookup Table entry to its indexed loaded addresses pub struct TransactionLoadedAddresses(IndexMap);