Skip to content

feat: passes sim result to the submit tasks #100

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: prestwich/simrevert
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ signet-tx-cache = { git = "https://github.com/init4tech/signet-sdk", branch = "m
signet-types = { git = "https://github.com/init4tech/signet-sdk", branch = "main" }
signet-zenith = { git = "https://github.com/init4tech/signet-sdk", branch = "main" }

trevm = { version = "0.23.4", features = ["concurrent-db", "test-utils"] }
trevm = { version = "0.23.6", features = ["concurrent-db", "test-utils"] }

alloy = { version = "1.0.5", features = [
"full",
Expand Down
2 changes: 1 addition & 1 deletion bin/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async fn main() -> eyre::Result<()> {
let constants = SignetSystemConstants::pecorino();

// Spawn the EnvTask
let env_task = config.env_task();
let env_task = config.env_task().await;
let (block_env, env_jh) = env_task.spawn();

// Spawn the cache system
Expand Down
20 changes: 18 additions & 2 deletions bin/submit_transaction.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! A simple transaction submitter that sends a transaction to a recipient address
//! on a regular interval for the purposes of roughly testing rollup mining.
use alloy::{
network::{EthereumWallet, TransactionBuilder},
primitives::{Address, U256},
Expand Down Expand Up @@ -67,18 +69,29 @@ async fn main() {
}
}

/// Sends a transaction to the specified recipient address
async fn send_transaction(provider: &HostProvider, recipient_address: Address) {
// construct simple transaction to send ETH to a recipient
let nonce = match provider.get_transaction_count(provider.default_signer_address()).await {
Ok(count) => count,
Err(e) => {
error!(error = ?e, "failed to get transaction count");
return;
}
};

let tx = TransactionRequest::default()
.with_from(provider.default_signer_address())
.with_to(recipient_address)
.with_value(U256::from(1))
.with_nonce(nonce)
.with_gas_limit(30_000);

// start timer to measure how long it takes to mine the transaction
let dispatch_start_time: Instant = Instant::now();

// dispatch the transaction
debug!(?tx.nonce, "sending transaction with nonce");
let result = provider.send_transaction(tx).await.unwrap();

// wait for the transaction to mine
Expand All @@ -95,10 +108,13 @@ async fn send_transaction(provider: &HostProvider, recipient_address: Address) {
}
};

let hash = receipt.transaction_hash.to_string();
record_metrics(dispatch_start_time, receipt);
}

// record metrics for how long it took to mine the transaction
/// Record metrics for how long it took to mine the transaction
fn record_metrics(dispatch_start_time: Instant, receipt: alloy::rpc::types::TransactionReceipt) {
let mine_time = dispatch_start_time.elapsed().as_secs();
let hash = receipt.transaction_hash.to_string();
debug!(success = receipt.status(), mine_time, hash, "transaction mined");
histogram!("txn_submitter.tx_mine_time").record(mine_time as f64);
}
13 changes: 7 additions & 6 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
tasks::{
block::cfg::SignetCfgEnv,
cache::{BundlePoller, CacheSystem, CacheTask, TxPoller},
env::EnvTask,
env::{EnvTask, SimEnv},
},
};
use alloy::{
Expand All @@ -29,7 +29,6 @@ use init4_bin_base::{
use signet_zenith::Zenith;
use std::borrow::Cow;
use tokio::sync::watch;
use trevm::revm::context::BlockEnv;

/// Type alias for the provider used to simulate against rollup state.
pub type RuProvider = RootProvider<Ethereum>;
Expand Down Expand Up @@ -246,17 +245,19 @@ impl BuilderConfig {
}

/// Create an [`EnvTask`] using this config.
pub fn env_task(&self) -> EnvTask {
let provider = self.connect_ru_provider();
EnvTask::new(self.clone(), provider)
pub async fn env_task(&self) -> EnvTask {
let ru_provider = self.connect_ru_provider();
let host_provider =
self.connect_host_provider().await.expect("failed to configure host provider");
EnvTask::new(self.clone(), ru_provider, host_provider)
}

/// Spawn a new [`CacheSystem`] using this config. This contains the
/// joinhandles for [`TxPoller`] and [`BundlePoller`] and [`CacheTask`], as
/// well as the [`SimCache`] and the block env watcher.
///
/// [`SimCache`]: signet_sim::SimCache
pub fn spawn_cache_system(&self, block_env: watch::Receiver<Option<BlockEnv>>) -> CacheSystem {
pub fn spawn_cache_system(&self, block_env: watch::Receiver<Option<SimEnv>>) -> CacheSystem {
// Tx Poller pulls transactions from the cache
let tx_poller = TxPoller::new(self);
let (tx_receiver, tx_poller) = tx_poller.spawn();
Expand Down
92 changes: 59 additions & 33 deletions src/tasks/block/sim.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
//! `block.rs` contains the Simulator and everything that wires it into an
//! actor that handles the simulation of a stream of bundles and transactions
//! and turns them into valid Pecorino blocks for network submission.
use crate::config::{BuilderConfig, RuProvider};
use crate::{
config::{BuilderConfig, RuProvider},
tasks::env::SimEnv,
};
use alloy::{eips::BlockId, network::Ethereum, providers::Provider};
use init4_bin_base::{
deps::tracing::{debug, error},
Expand All @@ -17,6 +20,7 @@ use tokio::{
},
task::JoinHandle,
};
use tracing::info;
use trevm::revm::{
context::BlockEnv,
database::{AlloyDB, WrapDatabaseAsync},
Expand All @@ -34,9 +38,17 @@ pub struct Simulator {
pub config: BuilderConfig,
/// A provider that cannot sign transactions, used for interacting with the rollup.
pub ru_provider: RuProvider,

/// The block configuration environment on which to simulate
pub block_env: watch::Receiver<Option<BlockEnv>>,
pub sim_env: watch::Receiver<Option<SimEnv>>,
}

/// SimResult bundles a BuiltBlock to the BlockEnv it was simulated against.
#[derive(Debug, Clone)]
pub struct SimResult {
/// The block built with the successfully simulated transactions
pub block: BuiltBlock,
/// The block environment the transactions were simulated against.
pub env: SimEnv,
}

impl Simulator {
Expand All @@ -46,16 +58,17 @@ impl Simulator {
///
/// - `config`: The configuration for the builder.
/// - `ru_provider`: A provider for interacting with the rollup.
/// - `block_env`: A receiver for the block environment to simulate against.
///
/// # Returns
///
/// A new `Simulator` instance.
pub fn new(
config: &BuilderConfig,
ru_provider: RuProvider,
block_env: watch::Receiver<Option<BlockEnv>>,
sim_env: watch::Receiver<Option<SimEnv>>,
) -> Self {
Self { config: config.clone(), ru_provider, block_env }
Self { config: config.clone(), ru_provider, sim_env }
}

/// Get the slot calculator.
Expand All @@ -65,11 +78,16 @@ impl Simulator {

/// Handles building a single block.
///
/// Builds a block in the block environment with items from the simulation cache
/// against the database state. When the `finish_by` deadline is reached, it
/// stops simulating and returns the block.
///
/// # Arguments
///
/// - `constants`: The system constants for the rollup.
/// - `sim_items`: The simulation cache containing transactions and bundles.
/// - `finish_by`: The deadline by which the block must be built.
/// - `block_env`: The block environment to simulate against.
///
/// # Returns
///
Expand All @@ -79,28 +97,35 @@ impl Simulator {
constants: SignetSystemConstants,
sim_items: SimCache,
finish_by: Instant,
block: BlockEnv,
block_env: BlockEnv,
) -> eyre::Result<BuiltBlock> {
debug!(block_number = block_env.number, tx_count = sim_items.len(), "starting block build",);

let db = self.create_db().await.unwrap();

let block_build: BlockBuild<_, NoOpInspector> = BlockBuild::new(
db,
constants,
self.config.cfg_env(),
block,
block_env,
finish_by,
self.config.concurrency_limit,
sim_items,
self.config.rollup_block_gas_limit,
);

let built_block = block_build.build().await;
debug!(block_number = ?built_block.block_number(), "finished building block");
debug!(
tx_count = built_block.tx_count(),
block_number = built_block.block_number(),
"block simulation completed",
);

Ok(built_block)
}

/// Spawns the simulator task, which handles the setup and sets the deadline
/// for the each round of simulation.
/// Spawns the simulator task, which ticks along the simulation loop
/// as it receives block environments.
///
/// # Arguments
///
Expand All @@ -115,21 +140,23 @@ impl Simulator {
self,
constants: SignetSystemConstants,
cache: SimCache,
submit_sender: mpsc::UnboundedSender<BuiltBlock>,
submit_sender: mpsc::UnboundedSender<SimResult>,
) -> JoinHandle<()> {
debug!("starting simulator task");

tokio::spawn(async move { self.run_simulator(constants, cache, submit_sender).await })
}

/// Continuously runs the block simulation and submission loop.
/// This function runs indefinitely, waiting for the block environment to be set and checking
/// if the current slot is valid before building a block and sending it along for to the submit channel.
///
/// This function clones the simulation cache, calculates a deadline for block building,
/// attempts to build a block using the latest cache and constants, and submits the built
/// block through the provided channel. If an error occurs during block building or submission,
/// it logs the error and continues the loop.
/// If it is authorized for the current slot, then the simulator task
/// - clones the simulation cache,
/// - calculates a deadline for block building,
/// - attempts to build a block using the latest cache and constants,
/// - then submits the built block through the provided channel.
///
/// This function runs indefinitely and never returns.
/// If an error occurs during block building or submission, it logs the error and continues the loop.
///
/// # Arguments
///
Expand All @@ -140,26 +167,26 @@ impl Simulator {
mut self,
constants: SignetSystemConstants,
cache: SimCache,
submit_sender: mpsc::UnboundedSender<BuiltBlock>,
submit_sender: mpsc::UnboundedSender<SimResult>,
) {
loop {
let sim_cache = cache.clone();
let finish_by = self.calculate_deadline();

// Wait for the block environment to be set
if self.block_env.changed().await.is_err() {
error!("block_env channel closed");
if self.sim_env.changed().await.is_err() {
error!("block_env channel closed - shutting down simulator task");
return;
}
let Some(sim_env) = self.sim_env.borrow_and_update().clone() else { return };
info!(block_number = sim_env.signet.number, "new block environment received");

// If no env, skip this run
let Some(block_env) = self.block_env.borrow_and_update().clone() else { return };
debug!(block_env = ?block_env, "building on block env");

match self.handle_build(constants, sim_cache, finish_by, block_env).await {
// Calculate the deadline for this block simulation.
// NB: This must happen _after_ taking a reference to the sim cache,
// waiting for a new block, and checking current slot authorization.
let finish_by = self.calculate_deadline();
let sim_cache = cache.clone();
match self.handle_build(constants, sim_cache, finish_by, sim_env.signet.clone()).await {
Ok(block) => {
debug!(block = ?block, "built block");
let _ = submit_sender.send(block);
debug!(block = ?block.block_number(), tx_count = block.transactions().len(), "built simulated block");
let _ = submit_sender.send(SimResult { block, env: sim_env });
}
Err(e) => {
error!(err = %e, "failed to build block");
Expand All @@ -184,11 +211,10 @@ impl Simulator {
let remaining = self.slot_calculator().slot_duration() - timepoint;

// We add a 1500 ms buffer to account for sequencer stopping signing.

let candidate =
let deadline =
Instant::now() + Duration::from_secs(remaining) - Duration::from_millis(1500);

candidate.max(Instant::now())
deadline.max(Instant::now())
}

/// Creates an `AlloyDB` instance from the rollup provider.
Expand Down
14 changes: 7 additions & 7 deletions src/tasks/cache/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use tokio::{
sync::{mpsc, watch},
task::JoinHandle,
};
use trevm::revm::context::BlockEnv;

use crate::tasks::env::SimEnv;

/// Cache task for the block builder.
///
Expand All @@ -16,8 +17,7 @@ use trevm::revm::context::BlockEnv;
#[derive(Debug)]
pub struct CacheTask {
/// The channel to receive the block environment.
env: watch::Receiver<Option<BlockEnv>>,

env: watch::Receiver<Option<SimEnv>>,
/// The channel to receive the transaction bundles.
bundles: mpsc::UnboundedReceiver<TxCacheBundle>,
/// The channel to receive the transactions.
Expand All @@ -27,7 +27,7 @@ pub struct CacheTask {
impl CacheTask {
/// Create a new cache task with the given cache and channels.
pub const fn new(
env: watch::Receiver<Option<BlockEnv>>,
env: watch::Receiver<Option<SimEnv>>,
bundles: mpsc::UnboundedReceiver<TxCacheBundle>,
txns: mpsc::UnboundedReceiver<TxEnvelope>,
) -> Self {
Expand All @@ -45,10 +45,10 @@ impl CacheTask {
break;
}
if let Some(env) = self.env.borrow_and_update().as_ref() {
basefee = env.basefee;
info!(basefee, number = env.number, timestamp = env.timestamp, "block env changed, clearing cache");
basefee = env.signet.basefee;
info!(basefee, env.signet.number, env.signet.timestamp, "rollup block env changed, clearing cache");
cache.clean(
env.number, env.timestamp
env.signet.number, env.signet.timestamp
);
}
}
Expand Down
Loading